连续复制
一键复制
一键打包

Watcher.php

<?php

namespace think\swoole;

use think\swoole\watcher\Driver;

/**
 * @mixin Driver
 */
class Watcher extends \think\Manager
{
    protected $namespace = '\\think\\swoole\\watcher\\';

    protected function getConfig(string $name, $default = null)
    {
        return $this->app->config->get('swoole.hot_update.' . $name, $default);
    }

    protected function resolveParams($name): array
    {
        return [
            $this->getConfig('include', []),
            $this->getConfig('exclude', []),
            $this->getConfig('name', []),
        ];
    }

    public function getDefaultDriver()
    {
        return $this->getConfig('type', 'scan');
    }
}

Websocket.php

<?php

namespace think\swoole;

use Swoole\Http\Response;
use think\Event;
use think\swoole\websocket\Pusher;
use think\swoole\websocket\Room;

/**
 * Class Websocket
 */
class Websocket
{
    /**
     * @var \think\App
     */
    protected $app;

    /**
     * @var Room
     */
    protected $room;

    /**
     * Scoket sender's fd.
     *
     * @var string
     */
    protected $sender;

    /** @var Event */
    protected $event;

    /** @var Response */
    protected $client;

    /**
     * Websocket constructor.
     *
     * @param \think\App $app
     * @param Room $room
     * @param Event $event
     */
    public function __construct(\think\App $app, Room $room, Event $event)
    {
        $this->app   = $app;
        $this->room  = $room;
        $this->event = $event;
    }

    /**
     * @return Pusher
     */
    protected function makePusher()
    {
        return $this->app->invokeClass(Pusher::class);
    }

    public function to(...$values)
    {
        return $this->makePusher()->to(...$values);
    }

    public function push($data)
    {
        $this->makePusher()->to($this->getSender())->push($data);
    }

    public function emit(string $event, ...$data)
    {
        $this->makePusher()->to($this->getSender())->emit($event, ...$data);
    }

    /**
     * Join sender to multiple rooms.
     *
     * @param string|integer|array $rooms
     *
     * @return $this
     */
    public function join($rooms): self
    {
        $rooms = is_string($rooms) || is_int($rooms) ? func_get_args() : $rooms;

        $this->room->add($this->getSender(), $rooms);

        return $this;
    }

    /**
     * Make sender leave multiple rooms.
     *
     * @param array|string|integer $rooms
     *
     * @return $this
     */
    public function leave($rooms = []): self
    {
        $rooms = is_string($rooms) || is_int($rooms) ? func_get_args() : $rooms;

        $this->room->delete($this->getSender(), $rooms);

        return $this;
    }

    public function isEstablished()
    {
        return !!$this->client;
    }

    /**
     * Close current connection.
     */
    public function close()
    {
        if ($this->client) {
            $this->client->close();
        }
    }

    /**
     * @param Response $response
     */
    public function setClient($response)
    {
        $this->client = $response;
    }

    /**
     * Set sender fd.
     *
     * @param string
     *
     * @return $this
     */
    public function setSender(string $fd)
    {
        $this->sender = $fd;
        return $this;
    }

    /**
     * Get current sender fd.
     */
    public function getSender()
    {
        return $this->sender;
    }
}

App.php

<?php

namespace think\swoole;

class App extends \think\App
{
    protected $inConsole = true;

    public function setInConsole($inConsole = true)
    {
        $this->inConsole = $inConsole;
    }

    public function runningInConsole(): bool
    {
        return $this->inConsole;
    }

    public function clearInstances()
    {
        $this->instances = [];
    }
}

Service.php

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\swoole;

use think\swoole\command\RpcInterface;
use think\swoole\command\Server as ServerCommand;

class Service extends \think\Service
{

    public function boot()
    {
        $this->commands(
            ServerCommand::class,
            RpcInterface::class,
        );
    }

}

Pool.php

<?php

namespace think\swoole;

use Smf\ConnectionPool\ConnectionPool;
use think\helper\Arr;

class Pool
{
    protected $pools = [];

    /**
     * @param string         $name
     * @param ConnectionPool $pool
     *
     * @return Pool
     */
    public function add(string $name, ConnectionPool $pool)
    {
        $pool->init();
        $this->pools[$name] = $pool;

        return $this;
    }

    /**
     * @param string $name
     *
     * @return ConnectionPool
     */
    public function get(string $name)
    {
        return $this->pools[$name] ?? null;
    }

    public function close(string $key)
    {
        return $this->pools[$key]->close();
    }

    /**
     * @return array
     */
    public function getAll()
    {
        return $this->pools;
    }

    public function closeAll()
    {
        foreach ($this->pools as $pool) {
            $pool->close();
        }
    }

    /**
     * @param string $key
     *
     * @return ConnectionPool
     */
    public function __get($key)
    {
        return $this->get($key);
    }

    public static function pullPoolConfig(&$config)
    {
        return [
            'minActive'         => Arr::pull($config, 'min_active', 0),
            'maxActive'         => Arr::pull($config, 'max_active', 10),
            'maxWaitTime'       => Arr::pull($config, 'max_wait_time', 5),
            'maxIdleTime'       => Arr::pull($config, 'max_idle_time', 20),
            'idleCheckInterval' => Arr::pull($config, 'idle_check_interval', 10),
        ];
    }
}

Http.php

<?php

namespace think\swoole;

/**
 * Class Http
 * @package think\swoole
 * @property $request
 */
class Http extends \think\Http
{

    protected function loadMiddleware(): void
    {

    }

    protected function loadRoutes(): void
    {

    }
}

Sandbox.php

<?php

namespace think\swoole;

use Closure;
use InvalidArgumentException;
use ReflectionObject;
use RuntimeException;
use Swoole\Coroutine;
use think\App;
use think\Config;
use think\Container;
use think\Event;
use think\swoole\App as SwooleApp;
use think\swoole\concerns\ModifyProperty;
use think\swoole\contract\ResetterInterface;
use think\swoole\coroutine\Context;
use think\swoole\resetters\ClearInstances;
use think\swoole\resetters\ResetConfig;
use think\swoole\resetters\ResetEvent;
use think\swoole\resetters\ResetModel;
use think\swoole\resetters\ResetPaginator;
use think\swoole\resetters\ResetService;
use Throwable;

class Sandbox
{
    use ModifyProperty;

    /**
     * The app containers in different coroutine environment.
     *
     * @var SwooleApp[]
     */
    protected $snapshots = [];

    /** @var SwooleApp */
    protected $app;

    /** @var Config */
    protected $config;

    /** @var Event */
    protected $event;

    /** @var ResetterInterface[] */
    protected $resetters = [];
    protected $services  = [];

    public function __construct(Container $app)
    {
        $this->setBaseApp($app);
        $this->initialize();
    }

    public function setBaseApp(Container $app)
    {
        $this->app = $app;

        return $this;
    }

    public function getBaseApp()
    {
        return $this->app;
    }

    protected function initialize()
    {
        Container::setInstance(function () {
            return $this->getApplication();
        });

        $this->setInitialConfig();
        $this->setInitialServices();
        $this->setInitialEvent();
        $this->setInitialResetters();

        return $this;
    }

    public function run(Closure $callable)
    {
        $this->init();

        try {
            $this->getApplication()->invoke($callable, [$this]);
        } catch (Throwable $e) {
            throw $e;
        } finally {
            $this->clear();
        }
    }

    public function init()
    {
        $app = $this->getApplication(true);
        $this->setInstance($app);
        $this->resetApp($app);
    }

    public function clear()
    {
        if ($app = $this->getSnapshot()) {
            $app->clearInstances();
            unset($this->snapshots[$this->getSnapshotId()]);
        }

        Context::clear();
        $this->setInstance($this->getBaseApp());
    }

    public function getApplication($init = false)
    {
        $snapshot = $this->getSnapshot($init);
        if ($snapshot instanceof Container) {
            return $snapshot;
        }

        if ($init) {
            $snapshot = clone $this->getBaseApp();
            $this->setSnapshot($snapshot);

            return $snapshot;
        }
        throw new InvalidArgumentException('The app object has not been initialized');
    }

    protected function getSnapshotId($init = false)
    {
        if ($init) {
            Coroutine::getContext()->offsetSet('#root', true);
            return Coroutine::getCid();
        } else {
            $cid = Coroutine::getCid();
            while (!Coroutine::getContext($cid)->offsetExists('#root')) {
                $cid = Coroutine::getPcid($cid);
                if ($cid < 1) {
                    break;
                }
            }

            return $cid;
        }
    }

    /**
     * Get current snapshot.
     * @return App|null
     */
    public function getSnapshot($init = false)
    {
        return $this->snapshots[$this->getSnapshotId($init)] ?? null;
    }

    public function setSnapshot(Container $snapshot)
    {
        $this->snapshots[$this->getSnapshotId()] = $snapshot;

        return $this;
    }

    public function setInstance(Container $app)
    {
        $app->instance('app', $app);
        $app->instance(Container::class, $app);

        $reflectObject   = new ReflectionObject($app);
        $reflectProperty = $reflectObject->getProperty('services');
        $reflectProperty->setAccessible(true);
        $services = $reflectProperty->getValue($app);

        foreach ($services as $service) {
            $this->modifyProperty($service, $app);
        }
    }

    /**
     * Set initial config.
     */
    protected function setInitialConfig()
    {
        $this->config = clone $this->getBaseApp()->config;
    }

    protected function setInitialEvent()
    {
        $this->event = clone $this->getBaseApp()->event;
    }

    /**
     * Get config snapshot.
     */
    public function getConfig()
    {
        return $this->config;
    }

    public function getEvent()
    {
        return $this->event;
    }

    public function getServices()
    {
        return $this->services;
    }

    protected function setInitialServices()
    {
        $app = $this->getBaseApp();

        $services = $this->config->get('swoole.services', []);

        foreach ($services as $service) {
            if (class_exists($service) && !in_array($service, $this->services)) {
                $serviceObj               = new $service($app);
                $this->services[$service] = $serviceObj;
            }
        }
    }

    /**
     * Initialize resetters.
     */
    protected function setInitialResetters()
    {
        $app = $this->getBaseApp();

        $resetters = [
            ClearInstances::class,
            ResetConfig::class,
            ResetEvent::class,
            ResetService::class,
            ResetModel::class,
            ResetPaginator::class,
        ];

        $resetters = array_merge($resetters, $this->config->get('swoole.resetters', []));

        foreach ($resetters as $resetter) {
            $resetterClass = $app->make($resetter);
            if (!$resetterClass instanceof ResetterInterface) {
                throw new RuntimeException("{$resetter} must implement " . ResetterInterface::class);
            }
            $this->resetters[$resetter] = $resetterClass;
        }
    }

    /**
     * Reset Application.
     *
     * @param App $app
     */
    protected function resetApp(App $app)
    {
        foreach ($this->resetters as $resetter) {
            $resetter->handle($app, $this);
        }
    }

}

Manager.php

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\swoole;

use think\swoole\concerns\InteractsWithHttp;
use think\swoole\concerns\InteractsWithPools;
use think\swoole\concerns\InteractsWithQueue;
use think\swoole\concerns\InteractsWithRpcClient;
use think\swoole\concerns\InteractsWithRpcServer;
use think\swoole\concerns\InteractsWithServer;
use think\swoole\concerns\InteractsWithSwooleTable;
use think\swoole\concerns\InteractsWithTracing;
use think\swoole\concerns\WithApplication;
use think\swoole\concerns\WithContainer;

/**
 * Class Manager
 */
class Manager
{
    use InteractsWithServer,
        InteractsWithSwooleTable,
        InteractsWithHttp,
        InteractsWithPools,
        InteractsWithRpcClient,
        InteractsWithRpcServer,
        InteractsWithQueue,
        InteractsWithTracing,
        WithContainer,
        WithApplication;

    /**
     * Initialize.
     */
    protected function initialize(): void
    {
        $this->prepareTables();
        $this->preparePools();
        $this->prepareHttp();
        $this->prepareRpcServer();
        $this->prepareQueue();
        $this->prepareRpcClient();
        $this->prepareTracing();
    }

}

Middleware.php

<?php

namespace think\swoole;

use Closure;
use InvalidArgumentException;
use think\App;
use think\Pipeline;

class Middleware
{
    /**
     * 中间件执行队列
     * @var array
     */
    protected $queue = [];

    /**
     * @var App
     */
    protected $app;

    public function __construct(App $app, $middlewares = [])
    {
        $this->app = $app;

        foreach ($middlewares as $middleware) {
            $this->queue[] = $this->buildMiddleware($middleware);
        }
    }

    public static function make(App $app, $middlewares = [])
    {
        return new self($app, $middlewares);
    }

    /**
     * 调度管道
     * @return Pipeline
     */
    public function pipeline()
    {
        return (new Pipeline())
            ->through(array_map(function ($middleware) {
                return function ($request, $next) use ($middleware) {
                    [$call, $params] = $middleware;

                    if (is_array($call) && is_string($call[0])) {
                        $call = [$this->app->make($call[0]), $call[1]];
                    }
                    return call_user_func($call, $request, $next, ...$params);
                };
            }, $this->queue));
    }

    /**
     * 解析中间件
     * @param mixed $middleware
     * @return array
     */
    protected function buildMiddleware($middleware): array
    {
        if (is_array($middleware)) {
            [$middleware, $params] = $middleware;
        }

        if ($middleware instanceof Closure) {
            return [$middleware, $params ?? []];
        }

        if (!is_string($middleware)) {
            throw new InvalidArgumentException('The middleware is invalid');
        }

        return [[$middleware, 'handle'], $params ?? []];
    }
}

helpers.php

<?php

namespace {

    if (!function_exists('swoole_cpu_num')) {
        function swoole_cpu_num(): int
        {
            return 1;
        }
    }

    if (!defined('SWOOLE_SOCK_TCP')) {
        define('SWOOLE_SOCK_TCP', 1);
    }

    if (!defined('SWOOLE_PROCESS')) {
        define('SWOOLE_PROCESS', 3);
    }

    if (!defined('SWOOLE_HOOK_ALL')) {
        define('SWOOLE_HOOK_ALL', 1879048191);
    }
}

namespace think\swoole\helper {

    use think\swoole\response\File;

    function download(string $filename, string $name = '', $disposition = File::DISPOSITION_ATTACHMENT): File
    {
        $response = new File($filename, $disposition);

        if ($name) {
            $response->setContentDisposition($disposition, $name);
        }

        return $response;
    }

    function file(string $filename)
    {
        return new File($filename);
    }
}

Table.php

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\swoole;

use Swoole\Table as SwooleTable;

class Table
{
    public const TYPE_INT = 1;

    public const TYPE_STRING = 3;

    public const TYPE_FLOAT = 2;

    /**
     * Registered swoole tables.
     *
     * @var array
     */
    protected $tables = [];

    /**
     * Add a swoole table to existing tables.
     *
     * @param string $name
     * @param SwooleTable $table
     *
     * @return Table
     */
    public function add(string $name, SwooleTable $table)
    {
        $this->tables[$name] = $table;

        return $this;
    }

    /**
     * Get a swoole table by its name from existing tables.
     *
     * @param string $name
     *
     * @return SwooleTable $table
     */
    public function get(string $name)
    {
        return $this->tables[$name] ?? null;
    }

    /**
     * Get all existing swoole tables.
     *
     * @return array
     */
    public function getAll()
    {
        return $this->tables;
    }

    /**
     * Dynamically access table.
     *
     * @param string $key
     *
     * @return SwooleTable
     */
    public function __get(string $key)
    {
        return $this->get($key);
    }
}

command

Server.php

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: liu21st <liu21st@gmail.com>
// +----------------------------------------------------------------------

namespace think\swoole\command;

use think\console\Command;
use think\console\input\Option;
use think\swoole\Manager;

class Server extends Command
{
    public function configure()
    {
        $this->setName('swoole')
            ->addOption(
                'env',
                'E',
                Option::VALUE_REQUIRED,
                'Environment name',
                ''
            )
            ->setDescription('Swoole Server for ThinkPHP');
    }

    public function handle(Manager $manager)
    {
        $this->checkEnvironment();

        $this->output->writeln('Starting swoole server...');

        $this->output->writeln('You can exit with <info>`CTRL-C`</info>');

        $envName = $this->input->getOption('env');
        $manager->start($envName);
    }

    /**
     * 检查环境
     */
    protected function checkEnvironment()
    {
        if (!extension_loaded('swoole')) {
            $this->output->error('Can\'t detect Swoole extension installed.');

            exit(1);
        }

        if (!version_compare(swoole_version(), '4.6.0', 'ge')) {
            $this->output->error('Your Swoole version must be higher than `4.6.0`.');

            exit(1);
        }
    }
}

RpcInterface.php

<?php

namespace think\swoole\command;

use Nette\PhpGenerator\ClassType;
use Nette\PhpGenerator\Dumper;
use Nette\PhpGenerator\Helpers;
use Nette\PhpGenerator\PhpFile;
use think\console\Command;
use think\helper\Arr;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\rpc\client\Gateway;
use think\swoole\rpc\client\Service;
use think\swoole\rpc\JsonParser;
use function Swoole\Coroutine\run;

class RpcInterface extends Command
{
    public function configure()
    {
        $this->setName('rpc:interface')
            ->setDescription('Generate Rpc Service Interfaces');
    }

    public function handle()
    {
        run(function () {
            $file = new PhpFile;
            $file->addComment('This file is auto-generated.');
            $file->setStrictTypes();
            $services = [];

            $clients = $this->app->config->get('swoole.rpc.client', []);

            foreach ($clients as $name => $config) {

                $parserClass = Arr::get($config, 'parser', JsonParser::class);
                /** @var ParserInterface $parser */
                $parser = new $parserClass;

                $gateway = new Gateway($config, $parser, Arr::get($config, 'tries', 2));

                $result = $gateway->getServices();

                $namespace = $file->addNamespace("rpc\\contract\\{$name}");

                $namespace->addUse(Service::class);

                foreach ($result as $interface => $methods) {

                    $services[$name][] = $namespace->getName() . "\\{$interface}";

                    $class = $namespace->addInterface($interface);

                    $class->addExtend(Service::class);

                    foreach ($methods as $methodName => ['parameters' => $parameters, 'returnType' => $returnType, 'comment' => $comment]) {
                        $method = $class->addMethod($methodName)
                            ->setVisibility(ClassType::VISIBILITY_PUBLIC)
                            ->setComment(Helpers::unformatDocComment($comment))
                            ->setReturnType($returnType);

                        foreach ($parameters as $parameter) {
                            if ($parameter['type'] && (class_exists($parameter['type']) || interface_exists($parameter['type']))) {
                                $namespace->addUse($parameter['type']);
                            }
                            $param = $method->addParameter($parameter['name'])
                                ->setType($parameter['type']);

                            if (array_key_exists('default', $parameter)) {
                                $param->setDefaultValue($parameter['default']);
                            }

                            if (array_key_exists('nullable', $parameter)) {
                                $param->setNullable();
                            }
                        }
                    }
                }
            }

            $dumper = new Dumper();

            $services = 'return ' . $dumper->dump($services) . ';';

            file_put_contents($this->app->getBasePath() . 'rpc.php', $file . $services);

            $this->output->writeln('<info>Succeed!</info>');
        });
    }
}

concerns

WithApplication.php

<?php

namespace think\swoole\concerns;

use Closure;
use think\App;
use think\swoole\App as SwooleApp;
use think\swoole\Manager;
use think\swoole\pool\Cache;
use think\swoole\pool\Db;
use think\swoole\Sandbox;
use Throwable;

/**
 * Trait WithApplication
 * @package think\swoole\concerns
 * @property App $container
 */
trait WithApplication
{
    /**
     * @var SwooleApp
     */
    protected $app;

    protected function prepareApplication(string $envName)
    {
        if (!$this->app instanceof SwooleApp) {
            $this->app = new SwooleApp($this->container->getRootPath());
            $this->app->setEnvName($envName);
            $this->app->bind(SwooleApp::class, App::class);
            $this->app->bind(Manager::class, $this);
            //绑定连接池
            if ($this->getConfig('pool.db.enable', true)) {
                $this->app->bind('db', Db::class);
                $this->app->resolving(Db::class, function (Db $db) {
                    $db->setLog($this->container->log);
                });
            }
            if ($this->getConfig('pool.cache.enable', true)) {
                $this->app->bind('cache', Cache::class);
            }
            $this->app->initialize();
            $this->app->instance('request', $this->container->request);
            $this->prepareConcretes();
        }
    }

    /**
     * 预加载
     */
    protected function prepareConcretes()
    {
        $defaultConcretes = ['db', 'cache', 'event'];

        $concretes = array_merge($defaultConcretes, $this->getConfig('concretes', []));

        foreach ($concretes as $concrete) {
            if ($this->app->has($concrete)) {
                $this->app->make($concrete);
            }
        }
    }

    public function getApplication()
    {
        return $this->app;
    }

    /**
     * 获取沙箱
     * @return Sandbox
     */
    protected function getSandbox()
    {
        return $this->app->make(Sandbox::class);
    }

    /**
     * 在沙箱中执行
     * @param Closure $callable
     */
    public function runInSandbox(Closure $callable)
    {
        try {
            $this->getSandbox()->run($callable);
        } catch (Throwable $e) {
            $this->logServerError($e);
        }
    }
}

InteractsWithRpcServer.php

<?php

namespace think\swoole\concerns;

use Swoole\Coroutine\Server;
use Swoole\Coroutine\Server\Connection;
use think\App;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\Pool;
use think\swoole\rpc\Error;
use think\swoole\rpc\File;
use think\swoole\rpc\JsonParser;
use think\swoole\rpc\Packer;
use think\swoole\rpc\server\Dispatcher;
use Throwable;

/**
 * Trait InteractsWithRpc
 * @package think\swoole\concerns
 * @property App $app
 * @property App $container
 * @method Pool getPools()
 */
trait InteractsWithRpcServer
{
    protected function createRpcServer()
    {
        $this->bindRpcParser();
        $this->bindRpcDispatcher();

        $host = $this->getConfig('rpc.server.host', '0.0.0.0');
        $port = $this->getConfig('rpc.server.port', 9000);

        $server = new Server($host, $port, false, true);

        $server->handle(function (Connection $conn) {

            $this->runInSandbox(function (App $app, Dispatcher $dispatcher) use ($conn) {
                $files = [];
                while (true) {
                    //接收数据
                    $data = $conn->recv();

                    if ($data === '' || $data === false) {
                        break;
                    }
                    begin:
                    if (!isset($handler)) {
                        try {
                            [$handler, $data] = Packer::unpack($data);
                        } catch (Throwable $e) {
                            //错误的包头
                            $result = Error::make(Dispatcher::INVALID_REQUEST, $e->getMessage());
                            $this->runWithBarrier(function () use ($dispatcher, $app, $conn, $result) {
                                $dispatcher->dispatch($app, $conn, $result);
                            });
                            break;
                        }
                    }

                    $result = $handler->write($data);

                    if (!empty($result)) {
                        $handler = null;
                        if ($result instanceof File) {
                            $files[] = $result;
                        } else {
                            $this->runWithBarrier(function () use ($dispatcher, $app, $conn, $result, $files) {
                                $dispatcher->dispatch($app, $conn, $result, $files);
                            });
                            $files = [];
                        }
                    }

                    if (!empty($data)) {
                        goto begin;
                    }
                }

                $conn->close();
            });
        });

        $server->start();
    }

    protected function prepareRpcServer()
    {
        if ($this->getConfig('rpc.server.enable', false)) {

            $workerNum = $this->getConfig('rpc.server.worker_num', swoole_cpu_num());

            $this->addBatchWorker($workerNum, [$this, 'createRpcServer'], 'rpc server');
        }
    }

    protected function bindRpcDispatcher()
    {
        $services   = $this->getConfig('rpc.server.services', []);
        $middleware = $this->getConfig('rpc.server.middleware', []);

        $this->app->make(Dispatcher::class, [$services, $middleware]);
    }

    protected function bindRpcParser()
    {
        $parserClass = $this->getConfig('rpc.server.parser', JsonParser::class);

        $this->app->bind(ParserInterface::class, $parserClass);
        $this->app->make(ParserInterface::class);
    }

}

InteractsWithSwooleTable.php

<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\swoole\concerns;

use Swoole\Table as SwooleTable;
use think\App;
use think\swoole\Table;

/**
 * Trait InteractsWithSwooleTable
 *
 * @property App $container
 * @property App $app
 */
trait InteractsWithSwooleTable
{
    /**
     * @var Table
     */
    protected $currentTable;

    /**
     * Register customized swoole tables.
     */
    protected function prepareTables()
    {
        $this->currentTable = new Table();
        $this->registerTables();
        $this->onEvent('workerStart', function () {
            $this->app->instance(Table::class, $this->currentTable);
            foreach ($this->currentTable->getAll() as $name => $table) {
                $this->app->instance("swoole.table.{$name}", $table);
            }
        });
    }

    /**
     * Register user-defined swoole tables.
     */
    protected function registerTables()
    {
        $tables = $this->container->make('config')->get('swoole.tables', []);

        foreach ($tables as $key => $value) {
            $table   = new SwooleTable($value['size']);
            $columns = $value['columns'] ?? [];
            foreach ($columns as $column) {
                if (isset($column['size'])) {
                    $table->column($column['name'], $column['type'], $column['size']);
                } else {
                    $table->column($column['name'], $column['type']);
                }
            }
            $table->create();

            $this->currentTable->add($key, $table);
        }
    }
}

InteractsWithWebsocket.php

<?php

namespace think\swoole\concerns;

use Swoole\Atomic;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\CloseFrame;
use Swoole\WebSocket\Frame;
use think\App;
use think\helper\Str;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\contract\websocket\RoomInterface;
use think\swoole\Middleware;
use think\swoole\Websocket;
use think\swoole\websocket\message\PushMessage;
use think\swoole\websocket\Room;

/**
 * Trait InteractsWithWebsocket
 * @package think\swoole\concerns
 *
 * @property App $app
 * @property App $container
 */
trait InteractsWithWebsocket
{

    /**
     * @var RoomInterface
     */
    protected $wsRoom;

    /**
     * @var Channel[]
     */
    protected $wsMessageChannel = [];

    protected $wsEnable = false;

    /** @var Atomic */
    protected $wsIdAtomic;

    /**
     * "onHandShake" listener.
     *
     * @param Request $req
     * @param Response $res
     */
    public function onHandShake($req, $res)
    {
        $this->runInSandbox(function (App $app, Websocket $websocket, HandlerInterface $handler) use ($req, $res) {
            $res->upgrade();

            $websocket->setClient($res);

            $request = $this->prepareRequest($req);
            $request = $this->setRequestThroughMiddleware($app, $request);

            $fd = $this->wsIdAtomic->add();

            $this->wsMessageChannel[$fd] = new Channel(1);

            Coroutine::create(function () use ($res, $fd) {
                //推送消息
                while ($message = $this->wsMessageChannel[$fd]->pop()) {
                    $res->push($message);
                }
            });

            try {
                $id = "{$this->workerId}.{$fd}";

                $websocket->setSender($id);
                $websocket->join($id);

                $this->runWithBarrier(function () use ($request, $handler) {
                    $handler->onOpen($request);
                });

                $this->runWithBarrier(function () use ($handler, $res) {

                    $cid      = Coroutine::getCid();
                    $messages = 0;
                    $wait     = false;

                    $frame = null;
                    while (true) {
                        /** @var Frame|false|string $recv */
                        $recv = $res->recv();
                        if ($recv === '' || $recv === false || $recv instanceof CloseFrame) {
                            break;
                        }

                        if (empty($frame)) {
                            $frame         = new Frame();
                            $frame->opcode = $recv->opcode;
                            $frame->flags  = $recv->flags;
                            $frame->fd     = $recv->fd;
                            $frame->finish = false;
                        }

                        $frame->data .= $recv->data;

                        $frame->finish = $recv->finish;

                        if ($frame->finish) {
                            Coroutine::create(function () use (&$wait, &$messages, $cid, $frame, $handler) {
                                ++$messages;
                                Coroutine::defer(function () use (&$wait, &$messages, $cid) {
                                    --$messages;
                                    if ($wait) {
                                        Coroutine::resume($cid);
                                    }
                                });
                                $handler->onMessage($frame);
                            });
                            $frame = null;
                        }
                    }

                    //等待消息执行完毕
                    while ($messages > 0) {
                        $wait = true;
                        Coroutine::yield();
                    }
                });

                //关闭连接
                $res->close();
                $this->runWithBarrier(function () use ($handler) {
                    $handler->onClose();
                });
            } finally {
                // leave all rooms
                $websocket->leave();
                if (isset($this->wsMessageChannel[$fd])) {
                    $this->wsMessageChannel[$fd]->close();
                    unset($this->wsMessageChannel[$fd]);
                }
                $websocket->setClient(null);
            }
        });
    }

    /**
     * @param App $app
     * @param \think\Request $request
     * @return \think\Request
     */
    protected function setRequestThroughMiddleware(App $app, \think\Request $request)
    {
        $app->instance('request', $request);
        return Middleware::make($app, $this->getConfig('websocket.middleware', []))
            ->pipeline()
            ->send($request)
            ->then(function ($request) {
                return $request;
            });
    }

    /**
     * Prepare settings if websocket is enabled.
     */
    protected function prepareWebsocket()
    {
        $this->prepareWebsocketIdAtomic();
        $this->prepareWebsocketRoom();

        $this->onEvent('message', function ($message) {
            if ($message instanceof PushMessage) {
                if (isset($this->wsMessageChannel[$message->fd])) {
                    $this->wsMessageChannel[$message->fd]->push($message->data);
                }
            }
        });

        $this->onEvent('workerStart', function () {
            $this->bindWebsocketRoom();
            $this->bindWebsocketHandler();
            $this->prepareWebsocketListener();
        });
    }

    protected function prepareWebsocketIdAtomic()
    {
        $this->wsIdAtomic = new Atomic();
    }

    /**
     * Prepare websocket room.
     */
    protected function prepareWebsocketRoom()
    {
        $this->wsRoom = $this->container->make(Room::class);
        $this->wsRoom->prepare();
    }

    protected function prepareWebsocketListener()
    {
        $listeners = $this->getConfig('websocket.listen', []);

        foreach ($listeners as $event => $listener) {
            $this->app->event->listen('swoole.websocket.' . Str::studly($event), $listener);
        }

        $subscribers = $this->getConfig('websocket.subscribe', []);

        foreach ($subscribers as $subscriber) {
            $this->app->event->observe($subscriber, 'swoole.websocket.');
        }
    }

    /**
     * Prepare websocket handler for onOpen and onClose callback
     */
    protected function bindWebsocketHandler()
    {
        $handlerClass = $this->getConfig('websocket.handler');
        $this->app->bind(HandlerInterface::class, $handlerClass);
    }

    /**
     * Bind room instance to app container.
     */
    protected function bindWebsocketRoom(): void
    {
        $this->app->instance(Room::class, $this->wsRoom);
    }

}

WithRpcClient.php

<?php

namespace think\swoole\concerns;

use think\App;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\helper\Arr;
use think\swoole\rpc\client\Gateway;
use think\swoole\rpc\client\Proxy;
use think\swoole\rpc\JsonParser;
use Throwable;
use function Swoole\Coroutine\run;

/**
 * 在命令行里使用RPC
 * @property App $app
 */
trait WithRpcClient
{
    public function __construct()
    {
        if (!($this instanceof Command)) {
            throw new \RuntimeException('Trait `WithRpcClient` only can be used in Command');
        }
        parent::__construct();
    }

    protected function execute(Input $input, Output $output)
    {
        $this->bindRpcInterface();
        run(function () {
            $this->app->invoke([$this, 'handle']);
        });
    }

    protected function bindRpcInterface()
    {
        //引入rpc接口文件
        if (file_exists($rpc = $this->app->getBasePath() . 'rpc.php')) {
            $rpcServices = (array) include $rpc;

            //绑定rpc接口
            try {
                foreach ($rpcServices as $name => $abstracts) {

                    $config = $this->app->config->get("swoole.rpc.client.{$name}", []);

                    $parserClass = Arr::pull($config, 'parser', JsonParser::class);
                    $tries       = Arr::pull($config, 'tries', 2);
                    $middleware  = Arr::pull($config, 'middleware', []);

                    $parser  = $this->app->make($parserClass);
                    $gateway = new Gateway($config, $parser, $tries);

                    foreach ($abstracts as $abstract) {
                        $this->app->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) {
                            return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]);
                        });
                    }
                }
            } catch (Throwable $e) {
            }
        }
    }
}

ModifyProperty.php

<?php

namespace think\swoole\concerns;

use ReflectionObject;

trait ModifyProperty
{
    protected function modifyProperty($object, $value, $property = 'app')
    {
        $reflectObject = new ReflectionObject($object);
        if ($reflectObject->hasProperty($property)) {
            $reflectProperty = $reflectObject->getProperty($property);
            $reflectProperty->setAccessible(true);
            $reflectProperty->setValue($object, $value);
        }
    }
}

InteractsWithServer.php

<?php

namespace think\swoole\concerns;

use Swoole\Constant;
use Swoole\Coroutine;
use Swoole\Event;
use Swoole\Process;
use Swoole\Process\Pool;
use Swoole\Runtime;
use think\App;
use think\swoole\Watcher;

/**
 * Trait InteractsWithServer
 * @package think\swoole\concerns
 * @property App $container
 */
trait InteractsWithServer
{

    /**
     * @var array
     */
    protected $startFuncMap = [];

    protected $workerId;

    /** @var Pool */
    protected $pool;

    public function addBatchWorker(int $workerNum, callable $func, $name = null)
    {
        for ($i = 0; $i < $workerNum; $i++) {
            $this->addWorker($func, $name ? "{$name} #{$i}" : null);
        }
        return $this;
    }

    public function addWorker(callable $func, $name = null): self
    {
        $this->startFuncMap[] = [$func, $name];
        return $this;
    }

    /**
     * 启动服务
     * @param string $envName 环境变量标识
     */
    public function start(string $envName): void
    {
        $this->setProcessName('manager process');

        $this->initialize();
        $this->triggerEvent('init');

        //热更新
        if ($this->getConfig('hot_update.enable', false)) {
            $this->addHotUpdateProcess();
        }

        $pool = new Pool(count($this->startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, true);

        $pool->on(Constant::EVENT_WORKER_START, function ($pool, $workerId) use ($envName) {

            Runtime::enableCoroutine();

            $this->pool     = $pool;
            $this->workerId = $workerId;

            [$func, $name] = $this->startFuncMap[$workerId];

            if ($name) {
                $this->setProcessName($name);
            }

            Process::signal(SIGTERM, function () {
                $this->pool->getProcess()->exit();
            });

            /** @var Coroutine\Socket $socket */
            $socket = $this->pool->getProcess()->exportSocket();

            //启动消息监听
            Event::add($socket, function (Coroutine\Socket $socket) {
                $recv    = $socket->recv();
                $message = unserialize($recv);
                $this->triggerEvent('message', $message);
            });

            $this->clearCache();
            $this->prepareApplication($envName);

            $this->triggerEvent(Constant::EVENT_WORKER_START);

            $func($pool, $workerId);
        });

        $pool->start();
    }

    public function getWorkerId()
    {
        return $this->workerId;
    }
    
    /**
     * 获取当前工作进程池对象
     * @return Pool
     */
    public function getPool()
    {
        return $this->pool;
    }

    public function sendMessage($workerId, $message)
    {
        if ($workerId === $this->workerId) {
            $this->triggerEvent('message', $message);
        } else {
            /** @var Process $process */
            $process = $this->pool->getProcess($workerId);
            $socket  = $process->exportSocket();
            $socket->send(serialize($message));
        }
    }

    public function runWithBarrier(callable $func, ...$params)
    {
        $channel = new Coroutine\Channel(1);

        Coroutine::create(function (...$params) use ($channel, $func) {
            Coroutine::defer(function () use ($channel) {
                $channel->close();
            });

            call_user_func_array($func, $params);
        }, ...$params);

        $channel->pop();
    }

    /**
     * 热更新
     */
    protected function addHotUpdateProcess()
    {
        $this->addWorker(function (Process\Pool $pool) {

            $watcher = $this->container->make(Watcher::class);

            $watcher->watch(function () use ($pool) {
                Process::kill($pool->master_pid, SIGUSR1);
            });
        }, 'hot update');
    }

    /**
     * 清除apc、op缓存
     */
    protected function clearCache()
    {
        if (extension_loaded('apc')) {
            apc_clear_cache();
        }

        if (extension_loaded('Zend OPcache')) {
            opcache_reset();
        }
    }

    /**
     * Set process name.
     *
     * @param $process
     */
    protected function setProcessName($process)
    {
        $appName = $this->container->config->get('app.name', 'ThinkPHP');

        $name = sprintf('swoole: %s process for %s', $process, $appName);

        @cli_set_process_title($name);
    }
}

WithMiddleware.php

<?php

namespace think\swoole\concerns;

trait WithMiddleware
{
    /**
     * 中间件
     * @var array
     */
    protected $middleware = [];

    protected function middleware($middleware, ...$params)
    {
        $options = [];

        $this->middleware[] = [
            'middleware' => [$middleware, $params],
            'options'    => &$options,
        ];

        return new class($options) {
            protected $options;

            public function __construct(array &$options)
            {
                $this->options = &$options;
            }

            public function only($methods)
            {
                $this->options['only'] = is_array($methods) ? $methods : func_get_args();
                return $this;
            }

            public function except($methods)
            {
                $this->options['except'] = is_array($methods) ? $methods : func_get_args();

                return $this;
            }
        };
    }
}

InteractsWithHttp.php

<?php

namespace think\swoole\concerns;

use Swoole\Coroutine\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\Http\Status;
use think\App;
use think\Cookie;
use think\Event;
use think\exception\Handle;
use think\helper\Arr;
use think\helper\Str;
use think\Http;
use think\swoole\App as SwooleApp;
use think\swoole\Http as SwooleHttp;
use think\swoole\response\File as FileResponse;
use Throwable;
use function substr;

/**
 * Trait InteractsWithHttp
 * @package think\swoole\concerns
 * @property App $app
 * @property App $container
 */
trait InteractsWithHttp
{
    use InteractsWithWebsocket, ModifyProperty;

    public function createHttpServer()
    {
        $this->preloadHttp();

        $host    = $this->getConfig('http.host');
        $port    = $this->getConfig('http.port');
        $options = $this->getConfig('http.options', []);

        $server = new Server($host, $port, false, true);
        $server->set($options);

        $server->handle('/', function (Request $req, Response $res) {
            if ($this->wsEnable && $this->isWebsocketRequest($req)) {
                $this->onHandShake($req, $res);
            } else {
                $this->onRequest($req, $res);
            }
        });

        $server->start();
    }

    protected function preloadHttp()
    {
        $http = $this->app->http;
        $this->app->invokeMethod([$http, 'loadMiddleware'], [], true);

        if ($this->app->config->get('app.with_route', true)) {
            $this->app->invokeMethod([$http, 'loadRoutes'], [], true);
            $route = clone $this->app->route;
            $this->modifyProperty($route, null);
            unset($this->app->route);

            $this->app->resolving(SwooleHttp::class, function ($http, App $app) use ($route) {
                $newRoute = clone $route;
                $this->modifyProperty($newRoute, $app);
                $app->instance('route', $newRoute);
            });
        }

        $middleware = clone $this->app->middleware;
        $this->modifyProperty($middleware, null);
        unset($this->app->middleware);

        $this->app->resolving(SwooleHttp::class, function ($http, App $app) use ($middleware) {
            $newMiddleware = clone $middleware;
            $this->modifyProperty($newMiddleware, $app);
            $app->instance('middleware', $newMiddleware);
        });

        unset($this->app->http);
        $this->app->bind(Http::class, SwooleHttp::class);
    }

    protected function isWebsocketRequest(Request $req)
    {
        $header = $req->header;
        return strcasecmp(Arr::get($header, 'connection', ''), 'upgrade') === 0 &&
            strcasecmp(Arr::get($header, 'upgrade', ''), 'websocket') === 0;
    }

    protected function prepareHttp()
    {
        if ($this->getConfig('http.enable', true)) {

            $this->wsEnable = $this->getConfig('websocket.enable', false);

            if ($this->wsEnable) {
                $this->prepareWebsocket();
            }

            $workerNum = $this->getConfig('http.worker_num', swoole_cpu_num());

            $this->addBatchWorker($workerNum, [$this, 'createHttpServer'], 'http server');
        }
    }

    /**
     * "onRequest" listener.
     *
     * @param Request $req
     * @param Response $res
     */
    public function onRequest($req, $res)
    {
        $this->runWithBarrier([$this, 'runInSandbox'], function (Http $http, Event $event, SwooleApp $app) use ($req, $res) {
            $app->setInConsole(false);

            $request = $this->prepareRequest($req);

            try {
                $response = $this->handleRequest($http, $request);
            } catch (Throwable $e) {
                $response = $this->app
                    ->make(Handle::class)
                    ->render($request, $e);
            }

            $this->setCookie($res, $app->cookie);
            $this->sendResponse($res, $request, $response);
        });
    }

    protected function handleRequest(Http $http, $request)
    {
        $level = ob_get_level();
        ob_start();

        $response = $http->run($request);

        $content = $response->getContent();

        if (ob_get_level() == 0) {
            ob_start();
        }

        $http->end($response);

        if (ob_get_length() > 0) {
            $response->content(ob_get_contents() . $content);
        }

        while (ob_get_level() > $level) {
            ob_end_clean();
        }

        return $response;
    }

    protected function prepareRequest(Request $req)
    {
        $header = $req->header ?: [];
        $server = $req->server ?: [];

        foreach ($header as $key => $value) {
            $server['http_' . str_replace('-', '_', $key)] = $value;
        }

        // 重新实例化请求对象 处理swoole请求数据
        /** @var \think\Request $request */
        $request = $this->app->make('request', [], true);

        return $request
            ->withHeader($header)
            ->withServer($server)
            ->withGet($req->get ?: [])
            ->withPost($req->post ?: [])
            ->withCookie($req->cookie ?: [])
            ->withFiles($this->getFiles($req))
            ->withInput($req->rawContent())
            ->setBaseUrl($req->server['request_uri'])
            ->setUrl($req->server['request_uri'] . (!empty($req->server['query_string']) ? '?' . $req->server['query_string'] : ''))
            ->setPathinfo(ltrim($req->server['path_info'], '/'));
    }

    protected function getFiles(Request $req)
    {
        if (empty($req->files)) {
            return [];
        }

        return array_map(function ($file) {
            if (!Arr::isAssoc($file)) {
                $files = [];
                foreach ($file as $f) {
                    $files['name'][]     = $f['name'];
                    $files['type'][]     = $f['type'];
                    $files['tmp_name'][] = $f['tmp_name'];
                    $files['error'][]    = $f['error'];
                    $files['size'][]     = $f['size'];
                }
                return $files;
            }
            return $file;
        }, $req->files);
    }

    protected function setCookie(Response $res, Cookie $cookie)
    {
        foreach ($cookie->getCookie() as $name => $val) {
            [$value, $expire, $option] = $val;

            $res->cookie($name, $value, $expire, $option['path'], $option['domain'], (bool) $option['secure'], (bool) $option['httponly'], $option['samesite']);
        }
    }

    protected function setHeader(Response $res, array $headers)
    {
        foreach ($headers as $key => $val) {
            $res->header($key, $val);
        }
    }

    protected function setStatus(Response $res, $code)
    {
        $res->status($code, Status::getReasonPhrase($code));
    }

    protected function sendResponse(Response $res, \think\Request $request, \think\Response $response)
    {
        switch (true) {
            case $response instanceof FileResponse:
                $this->sendFile($res, $request, $response);
                break;
            default:
                $this->sendContent($res, $response);
        }
    }

    protected function sendFile(Response $res, \think\Request $request, FileResponse $response)
    {
        $ifNoneMatch = $request->header('If-None-Match');
        $ifRange     = $request->header('If-Range');

        $code         = $response->getCode();
        $file         = $response->getFile();
        $eTag         = $response->getHeader('ETag');
        $lastModified = $response->getHeader('Last-Modified');

        $fileSize = $file->getSize();
        $offset   = 0;
        $length   = -1;

        if ($ifNoneMatch == $eTag) {
            $code = 304;
        } elseif (!$ifRange || $ifRange === $eTag || $ifRange === $lastModified) {
            $range = $request->header('Range', '');
            if (Str::startsWith($range, 'bytes=')) {
                [$start, $end] = explode('-', substr($range, 6), 2) + [0];

                $end = ('' === $end) ? $fileSize - 1 : (int) $end;

                if ('' === $start) {
                    $start = $fileSize - $end;
                    $end   = $fileSize - 1;
                } else {
                    $start = (int) $start;
                }

                if ($start <= $end) {
                    $end = min($end, $fileSize - 1);
                    if ($start < 0 || $start > $end) {
                        $code = 416;
                        $response->header([
                            'Content-Range' => sprintf('bytes */%s', $fileSize),
                        ]);
                    } elseif ($end - $start < $fileSize - 1) {
                        $length = $end < $fileSize ? $end - $start + 1 : -1;
                        $offset = $start;
                        $code   = 206;
                        $response->header([
                            'Content-Range'  => sprintf('bytes %s-%s/%s', $start, $end, $fileSize),
                            'Content-Length' => $end - $start + 1,
                        ]);
                    }
                }
            }
        }

        $this->setStatus($res, $code);
        $this->setHeader($res, $response->getHeader());

        if ($code >= 200 && $code < 300 && $length !== 0) {
            $res->sendfile($file->getPathname(), $offset, $length);
        } else {
            $res->end();
        }
    }

    protected function sendContent(Response $res, \think\Response $response)
    {
        // 由于开启了 Transfer-Encoding: chunked,根据 HTTP 规范,不再需要设置 Content-Length
        $response->header(['Content-Length' => null]);

        $this->setStatus($res, $response->getCode());
        $this->setHeader($res, $response->getHeader());

        $content = $response->getContent();
        if ($content) {
            $contentSize = strlen($content);
            $chunkSize   = 8192;

            if ($contentSize > $chunkSize) {
                $sendSize = 0;
                do {
                    if (!$res->write(substr($content, $sendSize, $chunkSize))) {
                        break;
                    }
                } while (($sendSize += $chunkSize) < $contentSize);
            } else {
                $res->write($content);
            }
        }
        $res->end();
    }
}

InteractsWithQueue.php

<?php

namespace think\swoole\concerns;

use Swoole\Process;
use Swoole\Timer;
use think\helper\Arr;
use think\queue\event\JobFailed;
use think\queue\Worker;

trait InteractsWithQueue
{
    protected function createQueueWorkers()
    {
        $workers = $this->getConfig('queue.workers', []);

        foreach ($workers as $queue => $options) {

            if (strpos($queue, '@') !== false) {
                [$queue, $connection] = explode('@', $queue);
            } else {
                $connection = null;
            }

            $workerNum = Arr::get($options, 'worker_num', 1);

            $this->addBatchWorker($workerNum, function (Process\Pool $pool) use ($options, $connection, $queue) {
                $delay   = Arr::get($options, 'delay', 0);
                $sleep   = Arr::get($options, 'sleep', 3);
                $tries   = Arr::get($options, 'tries', 0);
                $timeout = Arr::get($options, 'timeout', 60);

                /** @var Worker $worker */
                $worker = $this->app->make(Worker::class);

                while (true) {
                    $timer = Timer::after($timeout * 1000, function () use ($pool) {
                        $pool->getProcess()->exit();
                    });

                    $this->runWithBarrier([$this, 'runInSandbox'], function () use ($connection, $queue, $delay, $sleep, $tries, $worker) {
                        $worker->runNextJob($connection, $queue, $delay, $sleep, $tries);
                    });

                    Timer::clear($timer);
                }
            }, "queue [$queue]");
        }
    }

    public function prepareQueue()
    {
        if ($this->getConfig('queue.enable', false)) {
            $this->listenForEvents();
            $this->createQueueWorkers();
        }
    }

    /**
     * 注册事件
     */
    protected function listenForEvents()
    {
        $this->container->event->listen(JobFailed::class, function (JobFailed $event) {
            $this->logFailedJob($event);
        });
    }

    /**
     * 记录失败任务
     * @param JobFailed $event
     */
    protected function logFailedJob(JobFailed $event)
    {
        $this->container['queue.failer']->log(
            $event->connection,
            $event->job->getQueue(),
            $event->job->getRawBody(),
            $event->exception
        );
    }
}

InteractsWithRpcConnector.php

<?php

namespace think\swoole\concerns;

use Generator;
use Swoole\Coroutine\Client;
use think\swoole\exception\RpcClientException;
use think\swoole\rpc\File;
use think\swoole\rpc\Packer;
use think\swoole\rpc\Protocol;

trait InteractsWithRpcConnector
{
    abstract protected function runWithClient($callback);

    protected function recv(Client $client, callable $decoder)
    {
        $handler = null;
        $file    = null;

        while ($data = $client->recv()) {
            begin:
            if (empty($handler)) {
                [$handler, $data] = Packer::unpack($data);
            }

            $response = $handler->write($data);

            if (!empty($response)) {
                $handler = null;

                if ($response instanceof File) {
                    $file = $response;
                } else {
                    $result = $decoder($response);
                    if ($result === Protocol::FILE) {
                        $result = $file;
                    }
                    return $result;
                }
            }

            if (!empty($data)) {
                goto begin;
            }
        }

        if ($data === '') {
            throw new RpcClientException('Connection is closed. ' . $client->errMsg, $client->errCode);
        }
        if ($data === false) {
            throw new RpcClientException('Error receiving data, errno=' . $client->errCode . ' errmsg=' . swoole_strerror($client->errCode), $client->errCode);
        }
    }

    public function sendAndRecv($data, callable $decoder)
    {
        if (!$data instanceof Generator) {
            $data = [$data];
        }

        return $this->runWithClient(function (Client $client) use ($decoder, $data) {
            try {
                foreach ($data as $string) {
                    if (!empty($string)) {
                        if ($client->send($string) === false) {
                            throw new RpcClientException('Send data failed. ' . $client->errMsg, $client->errCode);
                        }
                    }
                }
                return $this->recv($client, $decoder);
            } catch (RpcClientException $e) {
                $client->close();
                throw $e;
            }
        });
    }

}

InteractsWithPools.php

<?php

namespace think\swoole\concerns;

use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\ConnectorInterface;
use think\App;
use think\helper\Arr;
use think\swoole\Pool;

/**
 * Trait InteractsWithRpc
 * @package think\swoole\concerns
 * @property App $app
 */
trait InteractsWithPools
{
    /**
     * @return Pool
     */
    public function getPools()
    {
        return $this->app->make(Pool::class);
    }

    protected function preparePools()
    {
        $createPools = function () {
            /** @var Pool $pool */
            $pools = $this->getPools();

            foreach ($this->getConfig('pool', []) as $name => $config) {
                $type = Arr::pull($config, 'type');
                if ($type && is_subclass_of($type, ConnectorInterface::class)) {
                    $pool = new ConnectionPool(
                        Pool::pullPoolConfig($config),
                        $this->app->make($type),
                        $config
                    );
                    $pools->add($name, $pool);
                    //注入到app
                    $this->app->instance("swoole.pool.{$name}", $pool);
                }
            }
        };

        $this->onEvent('workerStart', $createPools);
    }
}

InteractsWithRpcClient.php

<?php

namespace think\swoole\concerns;

use Smf\ConnectionPool\ConnectionPool;
use think\App;
use think\swoole\Pool;
use think\swoole\pool\Client;
use think\swoole\rpc\client\Connector;
use think\swoole\rpc\client\Gateway;
use think\swoole\rpc\client\Proxy;
use think\swoole\rpc\JsonParser;
use Throwable;

/**
 * Trait InteractsWithRpcClient
 * @package think\swoole\concerns
 * @property App $app
 * @property App $container
 * @method Pool getPools()
 */
trait InteractsWithRpcClient
{
    protected function prepareRpcClient()
    {
        $this->onEvent('workerStart', function () {
            $this->bindRpcClientPool();
            $this->bindRpcInterface();
        });
    }

    protected function bindRpcInterface()
    {
        //引入rpc接口文件
        if (file_exists($rpc = $this->container->getBasePath() . 'rpc.php')) {

            $rpcServices = (array) include $rpc;

            //绑定rpc接口
            try {
                foreach ($rpcServices as $name => $abstracts) {
                    $parserClass = $this->getConfig("rpc.client.{$name}.parser", JsonParser::class);
                    $tries       = $this->getConfig("rpc.client.{$name}.tries", 2);
                    $middleware  = $this->getConfig("rpc.client.{$name}.middleware", []);

                    $parser  = $this->getApplication()->make($parserClass);
                    $gateway = new Gateway($this->createRpcConnector($name), $parser, $tries);

                    foreach ($abstracts as $abstract) {
                        $this->getApplication()
                            ->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) {
                                return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]);
                            });
                    }
                }
            } catch (Throwable $e) {
            }
        }
    }

    protected function bindRpcClientPool()
    {
        if (!empty($clients = $this->getConfig('rpc.client'))) {
            //创建client连接池
            foreach ($clients as $name => $config) {
                $pool = new ConnectionPool(
                    Pool::pullPoolConfig($config),
                    new Client(),
                    $config
                );
                $this->getPools()->add("rpc.client.{$name}", $pool);
            }
        }
    }

    protected function createRpcConnector($name)
    {
        $pool = $this->getPools()->get("rpc.client.{$name}");

        return new class($pool) implements Connector {

            use InteractsWithRpcConnector;

            protected $pool;

            public function __construct(ConnectionPool $pool)
            {
                $this->pool = $pool;
            }

            protected function runWithClient($callback)
            {
                /** @var \Swoole\Coroutine\Client $client */
                $client = $this->pool->borrow();
                try {
                    return $callback($client);
                } finally {
                    $this->pool->return($client);
                }
            }
        };
    }
}

WithContainer.php

<?php

namespace think\swoole\concerns;

use think\App;
use think\console\Output;
use think\exception\Handle;
use Throwable;

trait WithContainer
{

    /**
     * @var App
     */
    protected $container;

    /**
     * Manager constructor.
     * @param App $container
     */
    public function __construct(App $container)
    {
        $this->container = $container;
    }

    protected function getContainer()
    {
        return $this->container;
    }

    /**
     * 获取配置
     * @param string $name
     * @param null $default
     * @return mixed
     */
    public function getConfig(string $name, $default = null)
    {
        return $this->container->config->get("swoole.{$name}", $default);
    }

    /**
     * 触发事件
     * @param string $event
     * @param null $params
     */
    public function triggerEvent(string $event, $params = null): void
    {
        $this->container->event->trigger("swoole.{$event}", $params);
    }

    /**
     * 监听事件
     * @param string $event
     * @param        $listener
     * @param bool $first
     */
    public function onEvent(string $event, $listener, bool $first = false): void
    {
        $this->container->event->listen("swoole.{$event}", $listener, $first);
    }

    /**
     * Log server error.
     *
     * @param Throwable $e
     */
    public function logServerError(Throwable $e)
    {
        /** @var Handle $handle */
        $handle = $this->container->make(Handle::class);

        $handle->renderForConsole(new Output(), $e);

        $handle->report($e);
    }
}

InteractsWithTracing.php

<?php

namespace think\swoole\concerns;

use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\PhpRedisConnector;
use Swoole\Coroutine;
use think\helper\Arr;
use think\swoole\coroutine\Context;
use think\swoole\Pool;
use think\tracing\reporter\RedisReporter;
use think\tracing\Tracer;

/**
 * 链路追踪上报进程
 */
trait InteractsWithTracing
{
    protected function prepareTracing()
    {
        if (class_exists(Tracer::class)) {
            $tracers  = $this->container->config->get('tracing.tracers');
            $hasAsync = false;
            foreach ($tracers as $name => $tracer) {
                if (Arr::get($tracer, 'async', false)) {
                    $this->addWorker(function () use ($name) {
                        $tracer = $this->app->make(Tracer::class)->tracer($name);

                        $tracer->report();
                    }, "tracing [{$name}]");
                    $hasAsync = true;
                }
            }

            if ($hasAsync) {
                $this->onEvent('workerStart', function () {
                    $this->bindTracingRedisPool();
                    $this->bindTracingRedisReporter();
                });
            }
        }
    }

    protected function bindTracingRedisReporter()
    {
        $this->getApplication()->bind(RedisReporter::class, function ($name) {

            $pool = $this->getPools()->get("tracing.redis");

            $redis = Context::rememberData('tracing.redis', function () use ($pool) {
                $redis = $pool->borrow();
                Coroutine::defer(function () use ($pool, $redis) {
                    $pool->return($redis);
                });

                return $redis;
            });

            return new RedisReporter($name, $redis);
        });
    }

    protected function bindTracingRedisPool()
    {
        $config = $this->container->config->get('tracing.redis');

        $pool = new ConnectionPool(
            Pool::pullPoolConfig($config),
            new PhpRedisConnector(),
            $config
        );
        $this->getPools()->add("tracing.redis", $pool);
    }
}

config

swoole.php

<?php

use think\swoole\websocket\socketio\Handler;

return [
    'http'       => [
        'enable'     => true,
        'host'       => '0.0.0.0',
        'port'       => 80,
        'worker_num' => swoole_cpu_num(),
        'options'    => [],
    ],
    'websocket'  => [
        'enable'        => false,
        'handler'       => Handler::class,
        'ping_interval' => 25000,
        'ping_timeout'  => 60000,
        'room'          => [
            'type'  => 'table',
            'table' => [
                'room_rows'   => 8192,
                'room_size'   => 2048,
                'client_rows' => 4096,
                'client_size' => 2048,
            ],
            'redis' => [
                'host'          => '127.0.0.1',
                'port'          => 6379,
                'max_active'    => 3,
                'max_wait_time' => 5,
            ],
        ],
        'listen'        => [],
        'subscribe'     => [],
    ],
    'rpc'        => [
        'server' => [
            'enable'     => false,
            'host'       => '0.0.0.0',
            'port'       => 9000,
            'worker_num' => swoole_cpu_num(),
            'services'   => [],
        ],
        'client' => [],
    ],
    //队列
    'queue'      => [
        'enable'  => false,
        'workers' => [],
    ],
    'hot_update' => [
        'enable'  => env('APP_DEBUG', false),
        'name'    => ['*.php'],
        'include' => [app_path()],
        'exclude' => [],
    ],
    //连接池
    'pool'       => [
        'db'    => [
            'enable'        => true,
            'max_active'    => 3,
            'max_wait_time' => 5,
        ],
        'cache' => [
            'enable'        => true,
            'max_active'    => 3,
            'max_wait_time' => 5,
        ],
        //自定义连接池
    ],
    'tables'     => [],
    //每个worker里需要预加载以共用的实例
    'concretes'  => [],
    //重置器
    'resetters'  => [],
    //每次请求前需要清空的实例
    'instances'  => [],
    //每次请求前需要重新执行的服务
    'services'   => [],
];

contract

websocket

HandlerInterface.php
<?php

namespace think\swoole\contract\websocket;

use Swoole\WebSocket\Frame;
use think\Request;

interface HandlerInterface
{
    /**
     * "onOpen" listener.
     *
     * @param Request $request
     */
    public function onOpen(Request $request);

    /**
     * "onMessage" listener.
     *
     * @param Frame $frame
     */
    public function onMessage(Frame $frame);

    /**
     * "onClose" listener.
     */
    public function onClose();

    public function encodeMessage($message);

}
RoomInterface.php
<?php

namespace think\swoole\contract\websocket;

interface RoomInterface
{
    /**
     * Rooms key
     *
     * @const string
     */
    public const ROOMS_KEY = 'rooms';

    /**
     * Descriptors key
     *
     * @const string
     */
    public const DESCRIPTORS_KEY = 'fds';

    /**
     * Do some init stuffs before workers started.
     *
     * @return RoomInterface
     */
    public function prepare(): RoomInterface;

    /**
     * Add multiple socket fds to a room.
     *
     * @param string fd
     * @param array|string $roomNames
     */
    public function add($fd, $roomNames);

    /**
     * Delete multiple socket fds from a room.
     *
     * @param string fd
     * @param array|string $roomNames
     */
    public function delete($fd, $roomNames);

    /**
     * Get all sockets by a room key.
     *
     * @param string room
     *
     * @return array
     */
    public function getClients(string $room);

    /**
     * Get all rooms by a fd.
     *
     * @param string fd
     *
     * @return array
     */
    public function getRooms($fd);
}

ResetterInterface.php

<?php

namespace think\swoole\contract;

use think\App;
use think\swoole\Sandbox;

interface ResetterInterface
{
    /**
     * "handle" function for resetting app.
     *
     * @param \think\App $app
     * @param Sandbox $sandbox
     */
    public function handle(App $app, Sandbox $sandbox);
}

rpc

ParserInterface.php
<?php

namespace think\swoole\contract\rpc;

use think\swoole\rpc\Protocol;

interface ParserInterface
{

    const EOF = "\r\n\r\n";

    /**
     * @param Protocol $protocol
     *
     * @return string
     */
    public function encode(Protocol $protocol): string;

    /**
     * @param string $string
     *
     * @return Protocol
     */
    public function decode(string $string): Protocol;

    /**
     * @param string $string
     *
     * @return mixed
     */
    public function decodeResponse(string $string);

    /**
     * @param mixed $result
     *
     * @return string
     */
    public function encodeResponse($result): string;
}

coroutine

Context.php

<?php

namespace think\swoole\coroutine;

use ArrayObject;
use Closure;
use Swoole\Coroutine;

class Context
{

    /**
     * 获取协程上下文
     * @param int $cid
     * @return Coroutine\Context
     */
    public static function get($cid = 0)
    {
        return Coroutine::getContext($cid);
    }

    public static function getDataObject()
    {
        $context = self::get();
        if (!isset($context['#data'])) {
            $context['#data'] = new ArrayObject();
        }
        return $context['#data'];
    }

    /**
     * 获取当前协程临时数据
     * @param string $key
     * @param null $default
     * @return mixed|null
     */
    public static function getData(string $key, $default = null)
    {
        if (self::hasData($key)) {
            return self::getDataObject()->offsetGet($key);
        }
        return $default;
    }

    /**
     * 判断是否存在临时数据
     * @param string $key
     * @return bool
     */
    public static function hasData(string $key)
    {
        return self::getDataObject()->offsetExists($key);
    }

    /**
     * 写入临时数据
     * @param string $key
     * @param $value
     */
    public static function setData(string $key, $value)
    {
        self::getDataObject()->offsetSet($key, $value);
    }

    /**
     * 删除数据
     * @param string $key
     */
    public static function removeData(string $key)
    {
        if (self::hasData($key)) {
            self::getDataObject()->offsetUnset($key);
        }
    }

    /**
     * 如果不存在则写入数据
     * @param string $key
     * @param $value
     * @return mixed|null
     */
    public static function rememberData(string $key, $value)
    {
        if (self::hasData($key)) {
            return self::getData($key);
        }

        if ($value instanceof Closure) {
            // 获取缓存数据
            $value = $value();
        }

        self::setData($key, $value);

        return $value;
    }

    /**
     * @internal
     * 清空数据
     */
    public static function clear()
    {
        self::getDataObject()->exchangeArray([]);
    }

    /**
     * 获取当前协程ID
     * @return mixed
     */
    public static function getCoroutineId()
    {
        return Coroutine::getuid();
    }
}

exception

RpcResponseException.php

<?php

namespace think\swoole\exception;

use Exception;
use think\swoole\rpc\Error;

class RpcResponseException extends Exception
{
    protected $error;

    public function __construct(Error $error)
    {
        parent::__construct($error->getMessage(), $error->getCode());
        $this->error = $error;
    }

    public function getError()
    {
        return $this->error;
    }
}

RpcClientException.php

<?php

namespace think\swoole\exception;

use Exception;

class RpcClientException extends Exception
{

}

middleware

TraceRpcServer.php

<?php

namespace think\swoole\middleware;

use think\swoole\rpc\Protocol;
use think\tracing\Tracer;
use Throwable;
use const OpenTracing\Formats\TEXT_MAP;
use const OpenTracing\Tags\ERROR;
use const OpenTracing\Tags\SPAN_KIND;
use const OpenTracing\Tags\SPAN_KIND_RPC_SERVER;

class TraceRpcServer
{
    protected $tracer;

    public function __construct(Tracer $tracer)
    {
        $this->tracer = $tracer;
    }

    public function handle(Protocol $protocol, $next)
    {
        $context = $this->tracer->extract(TEXT_MAP, $protocol->getContext());
        $scope   = $this->tracer->startActiveSpan(
            'rpc.server:' . $protocol->getInterface() . '@' . $protocol->getMethod(),
            [
                'child_of' => $context,
                'tags'     => [
                    SPAN_KIND => SPAN_KIND_RPC_SERVER,
                ],
            ]
        );
        $span    = $scope->getSpan();

        try {
            return $next($protocol);
        } catch (Throwable $e) {
            $span->setTag(ERROR, $e);
            throw $e;
        } finally {
            $scope->close();
            $this->tracer->flush();
        }
    }
}

TraceRpcClient.php

<?php

namespace think\swoole\middleware;

use think\swoole\exception\RpcResponseException;
use think\swoole\rpc\Protocol;
use think\tracing\Tracer;
use Throwable;
use const OpenTracing\Formats\TEXT_MAP;
use const OpenTracing\Tags\ERROR;
use const OpenTracing\Tags\SPAN_KIND;
use const OpenTracing\Tags\SPAN_KIND_RPC_CLIENT;

class TraceRpcClient
{
    protected $tracer;

    public function __construct(Tracer $tracer)
    {
        $this->tracer = $tracer;
    }

    public function handle(Protocol $protocol, $next)
    {
        $scope   = $this->tracer->startActiveSpan(
            'rpc.client:' . $protocol->getInterface() . '@' . $protocol->getMethod(),
            [
                'tags' => [
                    SPAN_KIND => SPAN_KIND_RPC_CLIENT,
                ],
            ]
        );
        $span    = $scope->getSpan();
        $context = $protocol->getContext();
        $this->tracer->inject($span->getContext(), TEXT_MAP, $context);
        $protocol->setContext($context);

        try {
            return $next($protocol);
        } catch (Throwable $e) {
            if (!$e instanceof RpcResponseException) {
                $span->setTag(ERROR, $e);
            }
            throw $e;
        } finally {
            $scope->close();
        }
    }
}

InteractsWithVarDumper.php

<?php

namespace think\swoole\middleware;

use Closure;
use Symfony\Component\VarDumper\Cloner\VarCloner;
use Symfony\Component\VarDumper\Dumper\HtmlDumper;
use Symfony\Component\VarDumper\VarDumper;
use think\Request;
use think\Response;

class InteractsWithVarDumper
{
    /**
     * @param Request $request
     * @param Closure $next
     * @return Response
     */
    public function handle($request, Closure $next)
    {
        if (class_exists(VarDumper::class)) {
            $cloner = new VarCloner();
            $dumper = new HtmlDumper();

            $prevHandler = VarDumper::setHandler(function ($var) use ($dumper, $cloner) {
                $dumper->dump($cloner->cloneVar($var));
            });

            /** @var Response $response */
            $response = $next($request);

            VarDumper::setHandler($prevHandler);
            return $response;
        }
        return $next($request);
    }
}

pool

proxy

Connection.php
<?php

namespace think\swoole\pool\proxy;

use Psr\SimpleCache\CacheInterface;
use think\db\BaseQuery;
use think\db\ConnectionInterface;
use think\DbManager;
use think\swoole\pool\Proxy;

/**
 * Class Connection
 * @package think\swoole\pool\db
 * @property ConnectionInterface $handler
 */
class Connection extends Proxy implements ConnectionInterface
{

    /**
     * 获取当前连接器类对应的Query类
     * @access public
     * @return string
     */
    public function getQueryClass(): string
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 连接数据库方法
     * @access public
     * @param array $config 接参数
     * @param integer $linkNum 连接序号
     * @return mixed
     */
    public function connect(array $config = [], $linkNum = 0)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 设置当前的数据库Db对象
     * @access public
     * @param DbManager $db
     * @return void
     */
    public function setDb(DbManager $db)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 设置当前的缓存对象
     * @access public
     * @param CacheInterface $cache
     * @return void
     */
    public function setCache(CacheInterface $cache)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 获取数据库的配置参数
     * @access public
     * @param string $config 配置名称
     * @return mixed
     */
    public function getConfig(string $config = '')
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 关闭数据库(或者重新连接)
     * @access public
     */
    public function close()
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 查找单条记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @return array
     */
    public function find(BaseQuery $query): array
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 查找记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @return array
     */
    public function select(BaseQuery $query): array
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 插入记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @param boolean $getLastInsID 返回自增主键
     * @return mixed
     */
    public function insert(BaseQuery $query, bool $getLastInsID = false)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 批量插入记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @param mixed $dataSet 数据集
     * @return integer
     * @throws \Exception
     * @throws \Throwable
     */
    public function insertAll(BaseQuery $query, array $dataSet = []): int
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 更新记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @return integer
     */
    public function update(BaseQuery $query): int
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 删除记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @return int
     */
    public function delete(BaseQuery $query): int
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 得到某个字段的值
     * @access public
     * @param BaseQuery $query 查询对象
     * @param string $field 字段名
     * @param mixed $default 默认值
     * @return mixed
     */
    public function value(BaseQuery $query, string $field, $default = null)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 得到某个列的数组
     * @access public
     * @param BaseQuery $query 查询对象
     * @param string|array $column 字段名 多个字段用逗号分隔
     * @param string $key 索引
     * @return array
     */
    public function column(BaseQuery $query, $column, string $key = ''): array
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 执行数据库事务
     * @access public
     * @param callable $callback 数据操作方法回调
     * @return mixed
     * @throws \Throwable
     */
    public function transaction(callable $callback)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 启动事务
     * @access public
     * @return void
     * @throws \Exception
     */
    public function startTrans()
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 用于非自动提交状态下面的查询提交
     * @access public
     * @return void
     */
    public function commit()
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 事务回滚
     * @access public
     * @return void
     */
    public function rollback()
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * 获取最近一次查询的sql语句
     * @access public
     * @return string
     */
    public function getLastSql(): string
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    public function table($table)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    public function name($name)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }
}
Store.php
<?php

namespace think\swoole\pool\proxy;

use Psr\SimpleCache\CacheInterface;
use think\contract\CacheHandlerInterface;
use think\swoole\pool\Proxy;

class Store extends Proxy implements CacheHandlerInterface, CacheInterface
{
    /**
     * @inheritDoc
     */
    public function has($name)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function get($name, $default = null)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function set($name, $value, $expire = null)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function inc(string $name, int $step = 1)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function dec(string $name, int $step = 1)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function delete($name)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function clear()
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function clearTag(array $keys)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function getMultiple($keys, $default = null)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function setMultiple($values, $ttl = null)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function deleteMultiple($keys)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }

    /**
     * @inheritDoc
     */
    public function tag($name)
    {
        return $this->__call(__FUNCTION__, func_get_args());
    }
}

Cache.php

<?php

namespace think\swoole\pool;

use think\swoole\pool\proxy\Store;

class Cache extends \think\Cache
{
    protected function createDriver(string $name)
    {
        return new Store(function () use ($name) {
            return parent::createDriver($name);
        }, $this->app->config->get('swoole.pool.cache', []));
    }

}

Connector.php

<?php

namespace think\swoole\pool;

use Smf\ConnectionPool\Connectors\ConnectorInterface;

class Connector implements ConnectorInterface
{
    protected $connector;

    public function __construct($connector)
    {
        $this->connector = $connector;
    }

    public function connect(array $config)
    {
        return call_user_func($this->connector, $config);
    }

    public function disconnect($connection)
    {

    }

    public function isConnected($connection): bool
    {
        return !property_exists($connection, Proxy::KEY_DISCONNECTED);
    }

    public function reset($connection, array $config)
    {

    }

    public function validate($connection): bool
    {
        return true;
    }
}

Db.php

<?php

namespace think\swoole\pool;

use think\Config;
use think\db\ConnectionInterface;
use think\swoole\pool\proxy\Connection;

/**
 * Class Db
 * @package think\swoole\pool
 * @property Config $config
 */
class Db extends \think\Db
{

    protected function createConnection(string $name): ConnectionInterface
    {
        return new Connection(new class(function () use ($name) {
            return parent::createConnection($name);
        }) extends Connector {
            public function disconnect($connection)
            {
                if ($connection instanceof ConnectionInterface) {
                    $connection->close();
                }
            }
        }, $this->config->get('swoole.pool.db', []));
    }

}

Proxy.php

<?php

namespace think\swoole\pool;

use Closure;
use Exception;
use RuntimeException;
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\ConnectorInterface;
use Swoole\Coroutine;
use think\swoole\coroutine\Context;
use think\swoole\Pool;
use Throwable;

abstract class Proxy
{
    const KEY_RELEASED = '__released';

    const KEY_DISCONNECTED = '__disconnected';

    protected $pool;

    /**
     * Proxy constructor.
     * @param Closure|ConnectorInterface $connector
     * @param array                      $config
     */
    public function __construct($connector, $config, array $connectionConfig = [])
    {
        if ($connector instanceof Closure) {
            $connector = new Connector($connector);
        }

        $this->pool = new ConnectionPool(
            Pool::pullPoolConfig($config),
            $connector,
            $connectionConfig
        );

        $this->pool->init();
    }

    protected function getPoolConnection()
    {
        return Context::rememberData('connection.' . spl_object_id($this), function () {
            $connection = $this->pool->borrow();

            $connection->{static::KEY_RELEASED} = false;

            Coroutine::defer(function () use ($connection) {
                //自动释放
                $this->releaseConnection($connection);
            });

            return $connection;
        });
    }

    protected function releaseConnection($connection)
    {
        if ($connection->{static::KEY_RELEASED}) {
            return;
        }
        $connection->{static::KEY_RELEASED} = true;
        $this->pool->return($connection);
    }

    public function release()
    {
        $connection = $this->getPoolConnection();
        $this->releaseConnection($connection);
    }

    public function __call($method, $arguments)
    {
        $connection = $this->getPoolConnection();
        if ($connection->{static::KEY_RELEASED}) {
            throw new RuntimeException('Connection already has been released!');
        }

        try {
            return $connection->{$method}(...$arguments);
        } catch (Exception|Throwable $e) {
            $connection->{static::KEY_DISCONNECTED} = true;
            throw $e;
        }
    }

}

Client.php

<?php

namespace think\swoole\pool;

use Smf\ConnectionPool\Connectors\ConnectorInterface;
use think\helper\Arr;

class Client implements ConnectorInterface
{

    /**
     * Connect to the specified Server and returns the connection resource
     * @param array $config
     * @return \Swoole\Coroutine\Client
     */
    public function connect(array $config)
    {
        $client = new \Swoole\Coroutine\Client(SWOOLE_SOCK_TCP);

        $host    = Arr::pull($config, 'host');
        $port    = Arr::pull($config, 'port');
        $timeout = Arr::pull($config, 'timeout', 5);

        $client->set($config);

        $client->connect($host, $port, $timeout);

        return $client;
    }

    /**
     * Disconnect and free resources
     * @param \Swoole\Coroutine\Client $connection
     * @return mixed
     */
    public function disconnect($connection)
    {
        $connection->close();
    }

    /**
     * Whether the connection is established
     * @param \Swoole\Coroutine\Client $connection
     * @return bool
     */
    public function isConnected($connection): bool
    {
        return $connection->isConnected() && $connection->peek() !== '';
    }

    /**
     * Reset the connection
     * @param \Swoole\Coroutine\Client $connection
     * @param array $config
     * @return mixed
     */
    public function reset($connection, array $config)
    {
    }

    /**
     * Validate the connection
     *
     * @param \Swoole\Coroutine\Client $connection
     * @return bool
     */
    public function validate($connection): bool
    {
        return $connection instanceof \Swoole\Coroutine\Client;
    }
}

resetters

ResetModel.php

<?php

namespace think\swoole\resetters;

use think\App;
use think\Model;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

class ResetModel implements ResetterInterface
{

    public function handle(App $app, Sandbox $sandbox)
    {
        if (class_exists(Model::class)) {
            Model::setInvoker(function (...$args) use ($sandbox) {
                return $sandbox->getApplication()->invoke(...$args);
            });
        }
    }
}

ResetConfig.php

<?php

namespace think\swoole\resetters;

use think\App;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

class ResetConfig implements ResetterInterface
{

    public function handle(App $app, Sandbox $sandbox)
    {
        $app->instance('config', clone $sandbox->getConfig());

        return $app;
    }
}

ClearInstances.php

<?php

namespace think\swoole\resetters;

use think\App;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

class ClearInstances implements ResetterInterface
{
    public function handle(App $app, Sandbox $sandbox)
    {
        $instances = ['log', 'session', 'view', 'response', 'cookie'];

        $instances = array_merge($instances, $sandbox->getConfig()->get('swoole.instances', []));

        foreach ($instances as $instance) {
            $app->delete($instance);
        }

        return $app;
    }
}

ResetEvent.php

<?php

namespace think\swoole\resetters;

use think\App;
use think\swoole\concerns\ModifyProperty;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

/**
 * Class ResetEvent
 * @package think\swoole\resetters
 */
class ResetEvent implements ResetterInterface
{
    use ModifyProperty;

    public function handle(App $app, Sandbox $sandbox)
    {
        $event = clone $sandbox->getEvent();
        $this->modifyProperty($event, $app);
        $app->instance('event', $event);

        return $app;
    }
}

ResetService.php

<?php

namespace think\swoole\resetters;

use think\App;
use think\swoole\concerns\ModifyProperty;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

/**
 * Class ResetService
 * @package think\swoole\resetters
 */
class ResetService implements ResetterInterface
{
    use ModifyProperty;

    /**
     * "handle" function for resetting app.
     *
     * @param App $app
     * @param Sandbox $sandbox
     */
    public function handle(App $app, Sandbox $sandbox)
    {
        foreach ($sandbox->getServices() as $service) {
            $this->modifyProperty($service, $app);
            if (method_exists($service, 'register')) {
                $service->register();
            }
            if (method_exists($service, 'boot')) {
                $app->invoke([$service, 'boot']);
            }
        }
    }

}

ResetPaginator.php

<?php

namespace think\swoole\resetters;

use think\App;
use think\Paginator;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

class ResetPaginator implements ResetterInterface
{

    public function handle(App $app, Sandbox $sandbox)
    {
        Paginator::currentPathResolver(function () use ($sandbox) {
            return $sandbox->getApplication()->request->baseUrl();
        });

        Paginator::currentPageResolver(function ($varPage = 'page') use ($sandbox) {

            $page = $sandbox->getApplication()->request->param($varPage);

            if (filter_var($page, FILTER_VALIDATE_INT) !== false && (int) $page >= 1) {
                return (int) $page;
            }

            return 1;
        });
    }
}

response

File.php

<?php

namespace think\swoole\response;

use DateTime;
use RuntimeException;
use SplFileInfo;
use think\Response;

class File extends Response
{
    public const DISPOSITION_ATTACHMENT = 'attachment';
    public const DISPOSITION_INLINE     = 'inline';

    protected $header = [
        'Content-Type'  => 'application/octet-stream',
        'Accept-Ranges' => 'bytes',
    ];

    /**
     * @var SplFileInfo
     */
    protected $file;

    public function __construct($file, string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true)
    {
        $this->setFile($file, $contentDisposition, $autoEtag, $autoLastModified, $autoContentType);
    }

    public function getFile()
    {
        return $this->file;
    }

    public function setFile($file, string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true)
    {
        if (!$file instanceof SplFileInfo) {
            $file = new SplFileInfo((string) $file);
        }

        if (!$file->isReadable()) {
            throw new RuntimeException('File must be readable.');
        }

        $this->header['Content-Length'] = $file->getSize();

        $this->file = $file;

        if ($autoEtag) {
            $this->setAutoEtag();
        }

        if ($autoLastModified) {
            $this->setAutoLastModified();
        }

        if ($contentDisposition) {
            $this->setContentDisposition($contentDisposition);
        }

        if ($autoContentType) {
            $this->setAutoContentType();
        }

        return $this;
    }

    public function setAutoContentType()
    {
        $finfo = finfo_open(FILEINFO_MIME_TYPE);

        $mimeType = finfo_file($finfo, $this->file->getPathname());
        if ($mimeType) {
            $this->header['Content-Type'] = $mimeType;
        }
    }

    public function setContentDisposition(string $disposition, string $filename = '')
    {
        if ('' === $filename) {
            $filename = $this->file->getFilename();
        }

        $this->header['Content-Disposition'] = "{$disposition}; filename=\"{$filename}\"";

        return $this;
    }

    public function setAutoLastModified()
    {
        $date = DateTime::createFromFormat('U', $this->file->getMTime());
        return $this->lastModified($date->format('D, d M Y H:i:s') . ' GMT');
    }

    public function setAutoEtag()
    {
        $eTag = "W/\"" . sha1_file($this->file->getPathname()) . "\"";

        return $this->eTag($eTag);
    }

    protected function sendData(string $data): void
    {
        readfile($this->file->getPathname());
    }
}

rpc

Protocol.php

<?php

namespace think\swoole\rpc;

class Protocol
{
    const ACTION_INTERFACE = '@action_interface';
    const FILE             = '@file';

    /**
     * @var string
     */
    private $interface = '';

    /**
     * @var string
     */
    private $method = '';

    /**
     * @var array
     */
    private $params = [];

    /**
     * @var array
     */
    private $context = [];

    /**
     * Replace constructor
     *
     * @param string $interface
     * @param string $method
     * @param array $params
     * @param array $context
     * @return Protocol
     */
    public static function make(string $interface, string $method, array $params, array $context = [])
    {
        $instance = new static();

        $instance->interface = $interface;
        $instance->method    = $method;
        $instance->params    = $params;
        $instance->context   = $context;

        return $instance;
    }

    /**
     * @return string
     */
    public function getInterface(): string
    {
        return $this->interface;
    }

    /**
     * @return string
     */
    public function getMethod(): string
    {
        return $this->method;
    }

    /**
     * @return array
     */
    public function getParams(): array
    {
        return $this->params;
    }

    /**
     * @return array
     */
    public function getContext(): array
    {
        return $this->context;
    }

    /**
     * @param string $interface
     */
    public function setInterface(string $interface): void
    {
        $this->interface = $interface;
    }

    /**
     * @param string $method
     */
    public function setMethod(string $method): void
    {
        $this->method = $method;
    }

    /**
     * @param array $params
     */
    public function setParams(array $params): void
    {
        $this->params = $params;
    }

    /**
     * @param array $context
     */
    public function setContext(array $context): void
    {
        $this->context = $context;
    }

}

File.php

<?php

namespace think\swoole\rpc;

use Throwable;

class File extends \think\File
{
    public function __destruct()
    {
        //销毁时删除临时文件
        try {
            if (file_exists($this->getPathname())) {
                unlink($this->getPathname());
            }
        } catch (Throwable $e) {

        }
    }
}

server

Dispatcher.php
<?php

namespace think\swoole\rpc\server;

use ReflectionClass;
use ReflectionException;
use ReflectionMethod;
use ReflectionNamedType;
use RuntimeException;
use Swoole\Coroutine\Server\Connection;
use think\App;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\Middleware;
use think\swoole\rpc\Error;
use think\swoole\rpc\Packer;
use think\swoole\rpc\Protocol;
use think\swoole\rpc\Sendfile;
use Throwable;

class Dispatcher
{
    use Sendfile;

    /**
     * Parser error
     */
    const PARSER_ERROR = -32700;

    /**
     * Invalid Request
     */
    const INVALID_REQUEST = -32600;

    /**
     * Method not found
     */
    const METHOD_NOT_FOUND = -32601;

    /**
     * Invalid params
     */
    const INVALID_PARAMS = -32602;

    /**
     * Internal error
     */
    const INTERNAL_ERROR = -32603;

    protected $parser;

    protected $services = [];

    protected $middleware = [];

    public function __construct(ParserInterface $parser, $services, $middleware = [])
    {
        $this->parser = $parser;
        $this->prepareServices($services);
        $this->middleware = $middleware;
    }

    /**
     * 获取服务接口
     * @param $services
     * @throws ReflectionException
     */
    protected function prepareServices($services)
    {
        foreach ($services as $className) {
            $reflectionClass = new ReflectionClass($className);
            $interfaces      = $reflectionClass->getInterfaceNames();
            if (!empty($interfaces)) {
                foreach ($interfaces as $interface) {
                    $this->services[class_basename($interface)] = [
                        'interface' => $interface,
                        'class'     => $className,
                    ];
                }
            } else {
                $this->services[class_basename($className)] = [
                    'interface' => $className,
                    'class'     => $className,
                ];
            }
        }
    }

    /**
     * 获取接口信息
     * @return array
     */
    protected function getInterfaces()
    {
        $interfaces = [];
        foreach ($this->services as $key => ['interface' => $interface]) {
            $interfaces[$key] = $this->getMethods($interface);
        }
        return $interfaces;
    }

    protected function getMethods($interface)
    {
        $methods = [];

        $reflection = new ReflectionClass($interface);
        foreach ($reflection->getMethods(ReflectionMethod::IS_PUBLIC) as $method) {
            if ($method->isConstructor() || $method->isDestructor()) {
                continue;
            }
            $returnType = $method->getReturnType();
            if ($returnType instanceof ReflectionNamedType) {
                $returnType = $returnType->getName();
            }
            $methods[$method->getName()] = [
                'parameters' => $this->getParameters($method),
                'returnType' => $returnType,
                'comment'    => $method->getDocComment(),
            ];
        }
        return $methods;
    }

    protected function getParameters(ReflectionMethod $method)
    {
        $parameters = [];
        foreach ($method->getParameters() as $parameter) {
            $type = $parameter->getType();
            if ($type instanceof ReflectionNamedType) {
                $type = $type->getName();
            }
            $param = [
                'name' => $parameter->getName(),
                'type' => $type,
            ];

            if ($parameter->isOptional()) {
                $param['default'] = $parameter->getDefaultValue();
            }

            if ($parameter->allowsNull()) {
                $param['nullable'] = true;
            }

            $parameters[] = $param;
        }
        return $parameters;
    }

    /**
     * 调度
     * @param App          $app
     * @param Connection   $conn
     * @param string|Error $data
     * @param array        $files
     */
    public function dispatch(App $app, Connection $conn, $data, $files = [])
    {
        try {
            switch (true) {
                case $data instanceof Error:
                    $result = $data;
                    break;
                case $data === Protocol::ACTION_INTERFACE:
                    $result = $this->getInterfaces();
                    break;
                default:
                    $protocol = $this->parser->decode($data);
                    $result   = $this->dispatchWithMiddleware($app, $protocol, $files);
            }
        } catch (Throwable $e) {
            $result = Error::make($e->getCode(), $e->getMessage());
        }

        //传输文件
        if ($result instanceof \think\File) {
            foreach ($this->fread($result) as $string) {
                if (!empty($string)) {
                    $conn->send($string);
                }
            }
            $result = Protocol::FILE;
        }

        $data = $this->parser->encodeResponse($result);

        $conn->send(Packer::pack($data));
    }

    protected function dispatchWithMiddleware(App $app, Protocol $protocol, $files)
    {
        $interface = $protocol->getInterface();
        $method    = $protocol->getMethod();
        $params    = $protocol->getParams();

        //文件参数
        foreach ($params as $index => $param) {
            if ($param === Protocol::FILE) {
                $params[$index] = array_shift($files);
            }
        }

        $service = $this->services[$interface] ?? null;
        if (empty($service)) {
            throw new RuntimeException(
                sprintf('Service %s is not founded!', $interface),
                self::METHOD_NOT_FOUND
            );
        }

        $instance    = $app->make($service['class']);
        $middlewares = array_merge($this->middleware, $this->getServiceMiddlewares($instance, $method));

        return Middleware::make($app, $middlewares)
                         ->pipeline()
                         ->send($protocol)
                         ->then(function () use ($instance, $method, $params) {
                             return call_user_func_array([$instance, $method], $params);
                         });
    }

    protected function getServiceMiddlewares($service, $method)
    {
        $middlewares = [];

        $class = new ReflectionClass($service);

        if ($class->hasProperty('middleware')) {
            $reflectionProperty = $class->getProperty('middleware');
            $reflectionProperty->setAccessible(true);

            foreach ($reflectionProperty->getValue($service) as $key => $val) {
                if (!is_int($key)) {
                    $middleware = $key;
                    $options    = $val;
                } elseif (isset($val['middleware'])) {
                    $middleware = $val['middleware'];
                    $options    = $val['options'] ?? [];
                } else {
                    $middleware = $val;
                    $options    = [];
                }

                if ((isset($options['only']) && !in_array($method, (array) $options['only'])) ||
                    (!empty($options['except']) && in_array($method, (array) $options['except']))) {
                    continue;
                }

                if (is_string($middleware) && strpos($middleware, ':')) {
                    $middleware = explode(':', $middleware);
                    if (count($middleware) > 1) {
                        $middleware = [$middleware[0], array_slice($middleware, 1)];
                    }
                }

                $middlewares[] = $middleware;
            }
        }

        return $middlewares;
    }
}
Channel.php
<?php

namespace think\swoole\rpc\server;

use Swoole\Coroutine;
use think\swoole\rpc\packer\Buffer;
use think\swoole\rpc\packer\File;

class Channel
{
    protected $header;

    protected $queue;

    public function __construct($handler)
    {
        $this->queue = new Coroutine\Channel(1);
        Coroutine::create(function () use ($handler) {
            $this->queue->push($handler);
        });
    }

    /**
     * @return File|Buffer
     */
    public function pop()
    {
        return $this->queue->pop();
    }

    public function push($handle)
    {
        return $this->queue->push($handle);
    }

    public function close()
    {
        return $this->queue->close();
    }

}

Packer.php

<?php

namespace think\swoole\rpc;

use RuntimeException;
use think\swoole\rpc\packer\Buffer;
use think\swoole\rpc\packer\File;

class Packer
{
    public const HEADER_SIZE   = 8;
    public const HEADER_STRUCT = 'Nlength/Ntype';
    public const HEADER_PACK   = 'NN';

    public const TYPE_BUFFER = 0;
    public const TYPE_FILE   = 1;

    public static function pack($data, $type = self::TYPE_BUFFER)
    {
        return pack(self::HEADER_PACK, strlen($data), $type) . $data;
    }

    /**
     * @param $data
     * @return array<Buffer|File|string>
     */
    public static function unpack($data)
    {
        $header = unpack(self::HEADER_STRUCT, substr($data, 0, self::HEADER_SIZE));
        if ($header === false) {
            throw new RuntimeException('Invalid Header');
        }

        switch ($header['type']) {
            case Packer::TYPE_BUFFER:
                $handler = new Buffer($header['length']);
                break;
            case Packer::TYPE_FILE:
                $handler = new File($header['length']);
                break;
            default:
                throw new RuntimeException("unsupported data type: [{$header['type']}");
        }

        $data = substr($data, self::HEADER_SIZE);

        return [$handler, $data];
    }
}

Sendfile.php

<?php

namespace think\swoole\rpc;

trait Sendfile
{
    protected function fread(\think\File $file)
    {
        try {
            $handle = fopen($file->getPathname(), 'rb');
            yield pack(Packer::HEADER_PACK, $file->getSize(), Packer::TYPE_FILE);
            while (!feof($handle)) {
                yield fread($handle, 8192);
            }
        } finally {
            fclose($handle);
        }
    }
}

Error.php

<?php

namespace think\swoole\rpc;

class Error implements \JsonSerializable
{
    /**
     * @var int
     */
    protected $code = 0;

    /**
     * @var string
     */
    protected $message = '';

    /**
     * @var mixed
     */
    protected $data;

    /**
     * @param int    $code
     * @param string $message
     * @param mixed  $data
     *
     * @return Error
     */
    public static function make(int $code, string $message, $data = null): self
    {
        $instance = new static();

        $instance->code    = $code;
        $instance->message = $message;
        $instance->data    = $data;

        return $instance;
    }

    /**
     * @return int
     */
    public function getCode(): int
    {
        return $this->code;
    }

    /**
     * @return string
     */
    public function getMessage(): string
    {
        return $this->message;
    }

    /**
     * @return mixed
     */
    public function getData()
    {
        return $this->data;
    }

    public function jsonSerialize()
    {
        return [
            'code'    => $this->code,
            'message' => $this->message,
            'data'    => $this->data,
        ];
    }
}

JsonParser.php

<?php

namespace think\swoole\rpc;

use Exception;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\rpc\server\Dispatcher;

class JsonParser implements ParserInterface
{
    /**
     * Json-rpc version
     */
    const VERSION = '2.0';

    const DELIMITER = "@";

    /**
     * @param Protocol $protocol
     *
     * @return string
     */
    public function encode(Protocol $protocol): string
    {
        $interface  = $protocol->getInterface();
        $methodName = $protocol->getMethod();

        $method = $interface . self::DELIMITER . $methodName;
        $data   = [
            'jsonrpc' => self::VERSION,
            'method'  => $method,
            'params'  => $protocol->getParams(),
            'context' => $protocol->getContext(),
            'id'      => '',
        ];

        return json_encode($data, JSON_UNESCAPED_UNICODE);
    }

    /**
     * @param string $string
     *
     * @return Protocol
     */
    public function decode(string $string): Protocol
    {
        $data = json_decode($string, true);

        $error = json_last_error();
        if ($error != JSON_ERROR_NONE) {
            throw new Exception(
                sprintf('Data(%s) is not json format!', $string),
                Dispatcher::PARSER_ERROR
            );
        }

        $method  = $data['method'] ?? '';
        $params  = $data['params'] ?? [];
        $context = $data['context'] ?? [];

        if (empty($method)) {
            throw new Exception(
                sprintf('Method(%s) cant not be empty!', $string),
                Dispatcher::INVALID_PARAMS
            );
        }

        $methodAry = explode(self::DELIMITER, $method);
        if (count($methodAry) < 2) {
            throw new Exception(
                sprintf('Method(%s) is bad format!', $method),
                Dispatcher::INVALID_PARAMS
            );
        }

        [$interfaceClass, $methodName] = $methodAry;

        if (empty($interfaceClass) || empty($methodName)) {
            throw new Exception(
                sprintf('Interface(%s) or Method(%s) can not be empty!', $interfaceClass, $method),
                Dispatcher::INVALID_PARAMS
            );
        }

        return Protocol::make($interfaceClass, $methodName, $params, $context);
    }

    /**
     * @param string $string
     *
     * @return mixed
     */
    public function decodeResponse(string $string)
    {
        $data = json_decode($string, true);

        if (array_key_exists('result', $data)) {
            return $data['result'];
        }

        $code    = $data['error']['code'] ?? 0;
        $message = $data['error']['message'] ?? '';
        $data    = $data['error']['data'] ?? null;

        return Error::make($code, $message, $data);
    }

    /**
     * @param mixed $result
     *
     * @return string
     */
    public function encodeResponse($result): string
    {
        $data = [
            'jsonrpc' => self::VERSION,
            'id'      => '',
        ];

        if ($result instanceof Error) {
            $data['error'] = $result;
        } else {
            $data['result'] = $result;
        }

        return json_encode($data);
    }
}

client

Service.php
<?php

namespace think\swoole\rpc\client;

interface Service
{
    public function withContext($context): self;
}
Connector.php
<?php

namespace think\swoole\rpc\client;

use Generator;

interface Connector
{
    /**
     * @param Generator|string $data
     * @param callable $decoder
     * @return string
     */
    public function sendAndRecv($data, callable $decoder);
}
Proxy.php
<?php

namespace think\swoole\rpc\client;

use InvalidArgumentException;
use Nette\PhpGenerator\Factory;
use Nette\PhpGenerator\PhpNamespace;
use ReflectionClass;
use think\App;
use think\swoole\Middleware;
use think\swoole\rpc\Protocol;

abstract class Proxy implements Service
{
    protected $interface;

    /** @var Gateway */
    protected $gateway;

    /** @var App */
    protected $app;

    protected $middleware = [];

    protected $context = [];

    final public function __construct(App $app, Gateway $gateway, $middleware)
    {
        $this->app        = $app;
        $this->gateway    = $gateway;
        $this->middleware = $middleware;
    }

    final protected function proxyCall($method, $params)
    {
        $protocol = Protocol::make($this->interface, $method, $params, $this->context);

        return Middleware::make($this->app, $this->middleware)
                         ->pipeline()
                         ->send($protocol)
                         ->then(function (Protocol $protocol) {
                             return $this->gateway->call($protocol);
                         });
    }

    final public function withContext($context): self
    {
        $this->context = $context;
        return $this;
    }

    final public static function getClassName($client, $interface)
    {
        if (!interface_exists($interface)) {
            throw new InvalidArgumentException(
                sprintf('%s must be exist interface!', $interface)
            );
        }

        $proxyName = class_basename($interface) . "Service";
        $className = "rpc\\service\\{$client}\\{$proxyName}";

        if (!class_exists($className, false)) {

            $namespace = new PhpNamespace("rpc\\service\\{$client}");
            $namespace->addUse(Proxy::class);
            $namespace->addUse($interface);

            $class = $namespace->addClass($proxyName);

            $class->setExtends(Proxy::class);
            $class->addImplement($interface);
            $class->addProperty('interface', class_basename($interface));

            $reflection = new ReflectionClass($interface);

            foreach ($reflection->getMethods() as $methodRef) {
                if ($methodRef->getDeclaringClass()->name == Service::class) {
                    continue;
                }
                $method = (new Factory)->fromMethodReflection($methodRef);
                $body   = "\$this->proxyCall('{$methodRef->getName()}', func_get_args());";
                if ($method->getReturnType() != 'void') {
                    $body = "return {$body}";
                }
                $method->setBody($body);
                $class->addMember($method);
            }

            eval($namespace);
        }
        return $className;
    }
}
Gateway.php
<?php

namespace think\swoole\rpc\client;

use Closure;
use Swoole\Coroutine\Client;
use think\File;
use think\helper\Arr;
use think\swoole\concerns\InteractsWithRpcConnector;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\exception\RpcClientException;
use think\swoole\exception\RpcResponseException;
use think\swoole\rpc\Error;
use think\swoole\rpc\Packer;
use think\swoole\rpc\Protocol;
use think\swoole\rpc\Sendfile;

class Gateway
{
    use Sendfile;

    /** @var Connector */
    protected $connector;

    /** @var ParserInterface */
    protected $parser;

    protected $tries;

    /**
     * Gateway constructor.
     * @param Connector|array $connector
     * @param ParserInterface $parser
     */
    public function __construct($connector, ParserInterface $parser, $tries = 2)
    {
        if (is_array($connector)) {
            $connector = $this->createDefaultConnector($connector);
        }
        $this->connector = $connector;
        $this->parser    = $parser;
        $this->tries     = $tries;
    }

    protected function encodeData(Protocol $protocol)
    {
        $params = $protocol->getParams();

        //有文件,先传输
        foreach ($params as $index => $param) {
            if ($param instanceof File) {
                yield from $this->fread($param);
                $params[$index] = Protocol::FILE;
            }
        }

        $protocol->setParams($params);

        $data = $this->parser->encode($protocol);

        yield Packer::pack($data);
    }

    protected function decodeResponse($response)
    {
        $result = $this->parser->decodeResponse($response);

        if ($result instanceof Error) {
            throw new RpcResponseException($result);
        }

        return $result;
    }

    protected function sendAndRecv($data)
    {
        return $this->connector->sendAndRecv($data, Closure::fromCallable([$this, 'decodeResponse']));
    }

    public function call(Protocol $protocol)
    {
        if ($this->tries > 1) {
            $result = backoff(function () use ($protocol) {
                try {
                    return $this->sendAndRecv($this->encodeData($protocol));
                } catch (RpcResponseException $e) {
                    return $e;
                }
            }, $this->tries);

            if ($result instanceof RpcResponseException) {
                throw $result;
            }

            return $result;
        } else {
            return $this->sendAndRecv($this->encodeData($protocol));
        }
    }

    public function getServices()
    {
        return $this->sendAndRecv(Packer::pack(Protocol::ACTION_INTERFACE));
    }

    protected function createDefaultConnector($config)
    {
        return new class($config) implements Connector {

            use InteractsWithRpcConnector;

            /** @var Client */
            protected $client;
            protected $config;

            /**
             *  constructor.
             * @param [] $config
             */
            public function __construct($config)
            {
                $this->config = $config;
            }

            protected function isConnected(): bool
            {
                return $this->client && $this->client->isConnected() && $this->client->peek() !== '';
            }

            protected function getClient()
            {
                if (!$this->isConnected()) {
                    $client = new Client(SWOOLE_SOCK_TCP);

                    $config = $this->config;

                    $host    = Arr::pull($config, 'host');
                    $port    = Arr::pull($config, 'port');
                    $timeout = Arr::pull($config, 'timeout', 5);

                    $client->set($config);

                    if (!$client->connect($host, $port, $timeout)) {
                        throw new RpcClientException(
                            sprintf('Connect failed host=%s port=%d', $host, $port)
                        );
                    }

                    $this->client = $client;
                }
                return $this->client;
            }

            protected function runWithClient($callback)
            {
                return $callback($this->getClient());
            }
        };
    }
}

packer

File.php
<?php

namespace think\swoole\rpc\packer;

class File
{
    protected $name;
    protected $handle;
    protected $length;

    public function __construct($length)
    {
        $this->length = $length;
    }

    public function write(&$data)
    {
        if (!$this->handle) {
            $this->name   = tempnam(sys_get_temp_dir(), 'swoole_rpc_');
            $this->handle = fopen($this->name, 'ab');
        }

        $size   = fstat($this->handle)['size'];
        $string = substr($data, 0, $this->length - $size);

        fwrite($this->handle, $string);

        if (strlen($data) >= $this->length - $size) {
            fclose($this->handle);
            $data = substr($data, $this->length - $size);

            return new \think\swoole\rpc\File($this->name);
        } else {
            $data = '';
        }
    }
}
Buffer.php
<?php

namespace think\swoole\rpc\packer;

class Buffer
{
    protected $data = '';
    protected $length;

    public function __construct($length)
    {
        $this->length = $length;
    }

    public function write(&$data)
    {
        $size   = strlen($this->data);
        $string = substr($data, 0, $this->length - $size);

        $this->data .= $string;

        if (strlen($data) >= $this->length - $size) {
            $data = substr($data, $this->length - $size);

            return $this->data;
        } else {
            $data = '';
        }
    }
}

watcher

Driver.php

<?php

namespace think\swoole\watcher;

interface Driver
{
    public function watch(callable $callback);
}

Find.php

<?php

namespace think\swoole\watcher;

use InvalidArgumentException;
use Swoole\Coroutine\System;
use Swoole\Timer;
use think\helper\Str;

class Find implements Driver
{
    protected $name;
    protected $directory;
    protected $exclude;

    public function __construct($directory, $exclude, $name)
    {
        $ret = System::exec('which find');
        if (empty($ret['output'])) {
            throw new InvalidArgumentException('find not exists.');
        }
        $ret = System::exec('find --help', true);
        if (Str::contains($ret['output'] ?? '', 'BusyBox')) {
            throw new InvalidArgumentException('find version not support.');
        }

        $this->directory = $directory;
        $this->exclude   = $exclude;
        $this->name      = $name;
    }

    public function watch(callable $callback)
    {
        $ms      = 2000;
        $seconds = ceil(($ms + 1000) / 1000);
        $minutes = sprintf('-%.2f', $seconds / 60);

        $dest = implode(' ', $this->directory);

        $name    = empty($this->name) ? '' : ' \( ' . join(' -o ', array_map(fn ($v) => "-name \"{$v}\"", $this->name)) . ' \)';
        $notName = '';
        $notPath = '';
        if (!empty($this->exclude)) {
            $excludeDirs = $excludeFiles = [];
            foreach ($this->exclude as $directory) {
                $directory = rtrim($directory, '/');
                if (is_dir($directory)) {
                    $excludeDirs[] = $directory;
                } else {
                    $excludeFiles[] = $directory;
                }
            }

            if (!empty($excludeFiles)) {
                $notPath = ' -not \( ' . join(' -and ', array_map(fn ($v) => "-name \"{$v}\"", $excludeFiles)) . ' \)';
            }

            if (!empty($excludeDirs)) {
                $notPath = ' -not \( ' . join(' -and ', array_map(fn ($v) => "-path \"{$v}/*\"", $excludeDirs)) . ' \)';
            }
        }

        $command = "find {$dest}{$name}{$notName}{$notPath} -mmin {$minutes} -type f -print";

        Timer::tick($ms, function () use ($callback, $command) {
            $ret = System::exec($command);
            if ($ret['code'] === 0 && strlen($ret['output'])) {
                $stdout = trim($ret['output']);
                if (!empty($stdout)) {
                    call_user_func($callback);
                }
            }
        });
    }

}

Scan.php

<?php

namespace think\swoole\watcher;

use Swoole\Timer;
use Symfony\Component\Finder\Finder;
use Symfony\Component\Finder\SplFileInfo;

class Scan implements Driver
{
    protected $finder;

    protected $files = [];

    public function __construct($directory, $exclude, $name)
    {
        $this->finder = new Finder();
        $this->finder
            ->files()
            ->name($name)
            ->in(array_filter((array) $directory, function ($dir) {
                return is_dir($dir);
            }))
            ->exclude($exclude);
    }

    protected function findFiles()
    {
        $files = [];
        /** @var SplFileInfo $f */
        foreach ($this->finder as $f) {
            $files[$f->getRealpath()] = $f->getMTime();
        }
        return $files;
    }

    public function watch(callable $callback)
    {
        $this->files = $this->findFiles();

        Timer::tick(2000, function () use ($callback) {

            $files = $this->findFiles();

            foreach ($files as $path => $time) {
                if (empty($this->files[$path]) || $this->files[$path] != $time) {
                    call_user_func($callback);
                    break;
                }
            }

            $this->files = $files;
        });
    }
}

websocket

middleware

SessionInit.php
<?php

namespace think\swoole\websocket\middleware;

use Closure;
use think\App;
use think\Request;
use think\Response;
use think\Session;

class SessionInit
{
    /** @var App */
    protected $app;

    /** @var Session */
    protected $session;

    public function __construct(App $app, Session $session)
    {
        $this->app     = $app;
        $this->session = $session;
    }

    /**
     * Session初始化
     * @access public
     * @param Request $request
     * @param Closure $next
     * @return Response
     */
    public function handle($request, Closure $next)
    {
        // Session初始化
        $varSessionId = $this->app->config->get('session.var_session_id');
        $cookieName   = $this->session->getName();

        if ($varSessionId && $request->request($varSessionId)) {
            $sessionId = $request->request($varSessionId);
        } else {
            $sessionId = $request->cookie($cookieName);
        }

        if ($sessionId) {
            $this->session->setId($sessionId);
        }

        $this->session->init();

        $request->withSession($this->session);

        return $next($request);
    }
}

Event.php

<?php

namespace think\swoole\websocket;

class Event
{
    public $type;
    public $data;

    public function __construct($type, $data)
    {
        $this->type = $type;
        $this->data = $data;
    }
}

room

Redis.php
<?php

namespace think\swoole\websocket\room;

use InvalidArgumentException;
use Redis as PHPRedis;
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\PhpRedisConnector;
use think\helper\Arr;
use think\swoole\contract\websocket\RoomInterface;
use think\swoole\Manager;
use think\swoole\Pool;

/**
 * Class RedisRoom
 */
class Redis implements RoomInterface
{
    /**
     * @var array
     */
    protected $config;

    /**
     * @var string
     */
    protected $prefix = 'swoole:';

    /** @var Manager */
    protected $manager;

    /** @var ConnectionPool */
    protected $pool;

    /**
     * RedisRoom constructor.
     *
     * @param Manager $manager
     * @param array $config
     */
    public function __construct(Manager $manager, array $config)
    {
        $this->manager = $manager;
        $this->config  = $config;

        if ($prefix = Arr::get($this->config, 'prefix')) {
            $this->prefix = $prefix;
        }
    }

    /**
     * @return RoomInterface
     */
    public function prepare(): RoomInterface
    {
        $this->initData();
        $this->prepareRedis();
        return $this;
    }

    protected function prepareRedis()
    {
        $this->manager->onEvent('workerStart', function () {
            $config     = $this->config;
            $this->pool = new ConnectionPool(
                Pool::pullPoolConfig($config),
                new PhpRedisConnector(),
                $config
            );
            $this->manager->getPools()->add('websocket.room', $this->pool);
        });
    }

    protected function initData()
    {
        $connector = new PhpRedisConnector();

        $connection = $connector->connect($this->config);

        if (count($keys = $connection->keys("{$this->prefix}*"))) {
            $connection->del($keys);
        }

        $connector->disconnect($connection);
    }

    /**
     * Add multiple socket fds to a room.
     *
     * @param string $fd
     * @param array|string $roomNames
     */
    public function add($fd, $roomNames)
    {
        $rooms = is_array($roomNames) ? $roomNames : [$roomNames];

        $this->addValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);

        foreach ($rooms as $room) {
            $this->addValue($room, [$fd], RoomInterface::ROOMS_KEY);
        }
    }

    /**
     * Delete multiple socket fds from a room.
     *
     * @param string fd
     * @param array|string rooms
     */
    public function delete($fd, $rooms)
    {
        $rooms = is_array($rooms) ? $rooms : [$rooms];
        $rooms = count($rooms) ? $rooms : $this->getRooms($fd);

        $this->removeValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);

        foreach ($rooms as $room) {
            $this->removeValue($room, [$fd], RoomInterface::ROOMS_KEY);
        }
    }

    protected function runWithRedis(\Closure $callable)
    {
        $redis = $this->pool->borrow();
        try {
            return $callable($redis);
        } finally {
            $this->pool->return($redis);
        }
    }

    /**
     * Add value to redis.
     *
     * @param        $key
     * @param array $values
     * @param string $table
     *
     * @return $this
     */
    protected function addValue($key, array $values, string $table)
    {
        $this->checkTable($table);
        $redisKey = $this->getKey($key, $table);

        $this->runWithRedis(function (PHPRedis $redis) use ($redisKey, $values) {
            $pipe = $redis->multi(PHPRedis::PIPELINE);

            foreach ($values as $value) {
                $pipe->sadd($redisKey, $value);
            }

            $pipe->exec();
        });

        return $this;
    }

    /**
     * Remove value from redis.
     *
     * @param        $key
     * @param array $values
     * @param string $table
     *
     * @return $this
     */
    protected function removeValue($key, array $values, string $table)
    {
        $this->checkTable($table);
        $redisKey = $this->getKey($key, $table);

        $this->runWithRedis(function (PHPRedis $redis) use ($redisKey, $values) {
            $pipe = $redis->multi(PHPRedis::PIPELINE);
            foreach ($values as $value) {
                $pipe->srem($redisKey, $value);
            }
            $pipe->exec();
        });

        return $this;
    }

    /**
     * Get all sockets by a room key.
     *
     * @param string room
     *
     * @return array
     */
    public function getClients(string $room)
    {
        return $this->getValue($room, RoomInterface::ROOMS_KEY) ?? [];
    }

    /**
     * Get all rooms by a fd.
     *
     * @param string fd
     *
     * @return array
     */
    public function getRooms($fd)
    {
        return $this->getValue($fd, RoomInterface::DESCRIPTORS_KEY) ?? [];
    }

    /**
     * Check table for rooms and descriptors.
     *
     * @param string $table
     */
    protected function checkTable(string $table)
    {
        if (!in_array($table, [RoomInterface::ROOMS_KEY, RoomInterface::DESCRIPTORS_KEY])) {
            throw new InvalidArgumentException("Invalid table name: `{$table}`.");
        }
    }

    /**
     * Get value.
     *
     * @param string $key
     * @param string $table
     *
     * @return array
     */
    protected function getValue(string $key, string $table)
    {
        $this->checkTable($table);

        return $this->runWithRedis(function (PHPRedis $redis) use ($table, $key) {
            return $redis->smembers($this->getKey($key, $table));
        });
    }

    /**
     * Get key.
     *
     * @param string $key
     * @param string $table
     *
     * @return string
     */
    protected function getKey(string $key, string $table)
    {
        return "{$this->prefix}{$table}:{$key}";
    }

}
Table.php
<?php

namespace think\swoole\websocket\room;

use InvalidArgumentException;
use Swoole\Table as SwooleTable;
use think\swoole\contract\websocket\RoomInterface;

class Table implements RoomInterface
{
    /**
     * @var array
     */
    protected $config = [
        'room_rows'   => 8192,
        'room_size'   => 2048,
        'client_rows' => 4096,
        'client_size' => 2048,
    ];

    /**
     * @var SwooleTable
     */
    protected $rooms;

    /**
     * @var SwooleTable
     */
    protected $fds;

    /**
     * TableRoom constructor.
     *
     * @param array $config
     */
    public function __construct(array $config)
    {
        $this->config = array_merge($this->config, $config);
    }

    /**
     * Do some init stuffs before workers started.
     *
     * @return RoomInterface
     */
    public function prepare(): RoomInterface
    {
        $this->initRoomsTable();
        $this->initFdsTable();

        return $this;
    }

    /**
     * Add multiple socket fds to a room.
     *
     * @param string fd
     * @param array|string $roomNames
     */
    public function add($fd, $roomNames)
    {
        $rooms     = $this->getRooms($fd);
        $roomNames = is_array($roomNames) ? $roomNames : [$roomNames];

        foreach ($roomNames as $room) {
            $fds = $this->getClients($room);

            if (in_array($fd, $fds)) {
                continue;
            }

            $fds[]   = $fd;
            $rooms[] = $room;

            $this->setClients($room, $fds);
        }

        $this->setRooms($fd, $rooms);
    }

    /**
     * Delete multiple socket fds from a room.
     *
     * @param string fd
     * @param array|string $roomNames
     */
    public function delete($fd, $roomNames = [])
    {
        $allRooms  = $this->getRooms($fd);
        $roomNames = is_array($roomNames) ? $roomNames : [$roomNames];
        $rooms     = count($roomNames) ? $roomNames : $allRooms;

        $removeRooms = [];
        foreach ($rooms as $room) {
            $fds = $this->getClients($room);

            if (!in_array($fd, $fds)) {
                continue;
            }

            $this->setClients($room, array_values(array_diff($fds, [$fd])));
            $removeRooms[] = $room;
        }

        $this->setRooms($fd, collect($allRooms)->diff($removeRooms)->values()->toArray());
    }

    /**
     * Get all sockets by a room key.
     *
     * @param string room
     *
     * @return array
     */
    public function getClients(string $room)
    {
        return $this->getValue($room, RoomInterface::ROOMS_KEY) ?? [];
    }

    /**
     * Get all rooms by a fd.
     *
     * @param string fd
     *
     * @return array
     */
    public function getRooms($fd)
    {
        return $this->getValue($fd, RoomInterface::DESCRIPTORS_KEY) ?? [];
    }

    /**
     * @param string $room
     * @param array $fds
     *
     * @return $this
     */
    protected function setClients(string $room, array $fds)
    {
        return $this->setValue($room, $fds, RoomInterface::ROOMS_KEY);
    }

    /**
     * @param string $fd
     * @param array $rooms
     *
     * @return $this
     */
    protected function setRooms($fd, array $rooms)
    {
        return $this->setValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);
    }

    /**
     * Init rooms table
     */
    protected function initRoomsTable(): void
    {
        $this->rooms = new SwooleTable($this->config['room_rows']);
        $this->rooms->column('value', SwooleTable::TYPE_STRING, $this->config['room_size']);
        $this->rooms->create();
    }

    /**
     * Init descriptors table
     */
    protected function initFdsTable()
    {
        $this->fds = new SwooleTable($this->config['client_rows']);
        $this->fds->column('value', SwooleTable::TYPE_STRING, $this->config['client_size']);
        $this->fds->create();
    }

    /**
     * Set value to table
     *
     * @param        $key
     * @param array $value
     * @param string $table
     *
     * @return $this
     */
    public function setValue($key, array $value, string $table)
    {
        $this->checkTable($table);

        if (empty($value)) {
            $this->$table->del($key);
        } else {
            $this->$table->set($key, ['value' => json_encode($value)]);
        }

        return $this;
    }

    /**
     * Get value from table
     *
     * @param string $key
     * @param string $table
     *
     * @return array|mixed
     */
    public function getValue(string $key, string $table)
    {
        $this->checkTable($table);

        $value = $this->$table->get($key);

        return $value ? json_decode($value['value'], true) : [];
    }

    /**
     * Check table for exists
     *
     * @param string $table
     */
    protected function checkTable(string $table)
    {
        if (!property_exists($this, $table) || !$this->$table instanceof SwooleTable) {
            throw new InvalidArgumentException("Invalid table name: `{$table}`.");
        }
    }
}

Handler.php

<?php

namespace think\swoole\websocket;

use Swoole\WebSocket\Frame;
use think\Event;
use think\Request;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\websocket\Event as WsEvent;

class Handler implements HandlerInterface
{
    protected $event;

    public function __construct(Event $event)
    {
        $this->event = $event;
    }

    /**
     * "onOpen" listener.
     *
     * @param Request $request
     */
    public function onOpen(Request $request)
    {
        $this->event->trigger('swoole.websocket.Open', $request);
    }

    /**
     * "onMessage" listener.
     *
     * @param Frame $frame
     */
    public function onMessage(Frame $frame)
    {
        $this->event->trigger('swoole.websocket.Message', $frame);

        $this->event->trigger('swoole.websocket.Event', $this->decode($frame->data));
    }

    /**
     * "onClose" listener.
     */
    public function onClose()
    {
        $this->event->trigger('swoole.websocket.Close');
    }

    protected function decode($payload)
    {
        $data = json_decode($payload, true);

        return new WsEvent($data['type'] ?? null, $data['data'] ?? null);
    }

    public function encodeMessage($message)
    {
        if ($message instanceof WsEvent) {
            return json_encode([
                'type' => $message->type,
                'data' => $message->data,
            ]);
        }
        return $message;
    }
}

Room.php

<?php

namespace think\swoole\websocket;

use think\Manager;
use think\swoole\websocket\room\Table;

/**
 * Class Room
 * @package think\swoole\websocket
 * @mixin Table
 */
class Room extends Manager
{
    protected $namespace = "\\think\\swoole\\websocket\\room\\";

    protected function resolveConfig(string $name)
    {
        return $this->app->config->get("swoole.websocket.room.{$name}", []);
    }

    /**
     * 默认驱动
     * @return string|null
     */
    public function getDefaultDriver()
    {
        return $this->app->config->get('swoole.websocket.room.type', 'table');
    }
}

message

PushMessage.php
<?php

namespace think\swoole\websocket\message;

class PushMessage
{
    public $fd;
    public $data;

    public function __construct($fd, $data)
    {
        $this->fd   = $fd;
        $this->data = $data;
    }
}

socketio

EnginePacket.php
<?php

namespace think\swoole\websocket\socketio;

class EnginePacket
{
    /**
     * Engine.io packet type `open`.
     */
    const OPEN = 0;

    /**
     * Engine.io packet type `close`.
     */
    const CLOSE = 1;

    /**
     * Engine.io packet type `ping`.
     */
    const PING = 2;

    /**
     * Engine.io packet type `pong`.
     */
    const PONG = 3;

    /**
     * Engine.io packet type `message`.
     */
    const MESSAGE = 4;

    /**
     * Engine.io packet type 'upgrade'
     */
    const UPGRADE = 5;

    /**
     * Engine.io packet type `noop`.
     */
    const NOOP = 6;

    public $type;

    public $data = '';

    public function __construct($type, $data = '')
    {
        $this->type = $type;
        $this->data = $data;
    }

    public static function open($payload)
    {
        return new static(self::OPEN, $payload);
    }

    public static function pong($payload = '')
    {
        return new static(self::PONG, $payload);
    }

    public static function ping()
    {
        return new static(self::PING);
    }

    public static function message($payload)
    {
        return new static(self::MESSAGE, $payload);
    }

    public static function fromString(string $packet)
    {
        return new static(substr($packet, 0, 1), substr($packet, 1) ?? '');
    }

    public function toString()
    {
        return $this->type . $this->data;
    }
}
Handler.php
<?php

namespace think\swoole\websocket\socketio;

use Exception;
use Swoole\Timer;
use Swoole\Websocket\Frame;
use think\Config;
use think\Event;
use think\Request;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\Websocket;
use think\swoole\websocket\Event as WsEvent;

class Handler implements HandlerInterface
{
    /** @var Config */
    protected $config;

    protected $event;

    protected $websocket;

    protected $eio;

    protected $pingTimeoutTimer  = 0;
    protected $pingIntervalTimer = 0;

    protected $pingInterval;
    protected $pingTimeout;

    public function __construct(Event $event, Config $config, Websocket $websocket)
    {
        $this->event        = $event;
        $this->config       = $config;
        $this->websocket    = $websocket;
        $this->pingInterval = $this->config->get('swoole.websocket.ping_interval', 25000);
        $this->pingTimeout  = $this->config->get('swoole.websocket.ping_timeout', 60000);
    }

    /**
     * "onOpen" listener.
     *
     * @param Request $request
     */
    public function onOpen(Request $request)
    {
        $this->eio = $request->param('EIO');

        $payload = json_encode(
            [
                'sid'          => base64_encode(uniqid()),
                'upgrades'     => [],
                'pingInterval' => $this->pingInterval,
                'pingTimeout'  => $this->pingTimeout,
            ]
        );

        $this->push(EnginePacket::open($payload));

        $this->event->trigger('swoole.websocket.Open', $request);

        if ($this->eio < 4) {
            $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
            $this->onConnect();
        } else {
            $this->schedulePing();
        }
    }

    /**
     * "onMessage" listener.
     *
     * @param Frame $frame
     */
    public function onMessage(Frame $frame)
    {
        $enginePacket = EnginePacket::fromString($frame->data);

        $this->event->trigger('swoole.websocket.Message', $enginePacket);

        $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);

        switch ($enginePacket->type) {
            case EnginePacket::MESSAGE:
                $packet = Packet::fromString($enginePacket->data);
                switch ($packet->type) {
                    case Packet::CONNECT:
                        $this->onConnect($packet->data);
                        break;
                    case Packet::EVENT:
                        $type   = array_shift($packet->data);
                        $data   = $packet->data;
                        $result = $this->event->trigger('swoole.websocket.Event', new WsEvent($type, $data));

                        if ($packet->id !== null) {
                            $responsePacket = Packet::create(Packet::ACK, [
                                'id'   => $packet->id,
                                'nsp'  => $packet->nsp,
                                'data' => $result,
                            ]);

                            $this->push($responsePacket);
                        }
                        break;
                    case Packet::DISCONNECT:
                        $this->event->trigger('swoole.websocket.Disconnect');
                        $this->websocket->close();
                        break;
                    default:
                        $this->websocket->close();
                        break;
                }
                break;
            case EnginePacket::PING:
                $this->push(EnginePacket::pong($enginePacket->data));
                break;
            case EnginePacket::PONG:
                $this->schedulePing();
                break;
            default:
                $this->websocket->close();
                break;
        }
    }

    /**
     * "onClose" listener.
     */
    public function onClose()
    {
        Timer::clear($this->pingTimeoutTimer);
        Timer::clear($this->pingIntervalTimer);
        $this->event->trigger('swoole.websocket.Close');
    }

    protected function onConnect($data = null)
    {
        try {
            $this->event->trigger('swoole.websocket.Connect', $data);
            $packet = Packet::create(Packet::CONNECT);
            if ($this->eio >= 4) {
                $packet->data = ['sid' => base64_encode(uniqid())];
            }
        } catch (Exception $exception) {
            $packet = Packet::create(Packet::CONNECT_ERROR, [
                'data' => ['message' => $exception->getMessage()],
            ]);
        }

        $this->push($packet);
    }

    protected function resetPingTimeout($timeout)
    {
        Timer::clear($this->pingTimeoutTimer);
        $this->pingTimeoutTimer = Timer::after($timeout, function () {
            $this->websocket->close();
        });
    }

    protected function schedulePing()
    {
        Timer::clear($this->pingIntervalTimer);
        $this->pingIntervalTimer = Timer::after($this->pingInterval, function () {
            $this->push(EnginePacket::ping());
            $this->resetPingTimeout($this->pingTimeout);
        });
    }

    public function encodeMessage($message)
    {
        if ($message instanceof WsEvent) {
            $message = Packet::create(Packet::EVENT, [
                'data' => array_merge([$message->type], $message->data),
            ]);
        }

        if ($message instanceof Packet) {
            $message = EnginePacket::message($message->toString());
        }

        if ($message instanceof EnginePacket) {
            $message = $message->toString();
        }

        return $message;
    }

    protected function push($data)
    {
        $this->websocket->push($data);
    }

}
Packet.php
<?php

namespace think\swoole\websocket\socketio;

/**
 * Class Packet
 */
class Packet
{
    /**
     * Socket.io packet type `connect`.
     */
    const CONNECT = 0;

    /**
     * Socket.io packet type `disconnect`.
     */
    const DISCONNECT = 1;

    /**
     * Socket.io packet type `event`.
     */
    const EVENT = 2;

    /**
     * Socket.io packet type `ack`.
     */
    const ACK = 3;

    /**
     * Socket.io packet type `connect_error`.
     */
    const CONNECT_ERROR = 4;

    /**
     * Socket.io packet type 'binary event'
     */
    const BINARY_EVENT = 5;

    /**
     * Socket.io packet type `binary ack`. For acks with binary arguments.
     */
    const BINARY_ACK = 6;

    public $type;
    public $nsp  = '/';
    public $data = null;
    public $id   = null;

    public function __construct(int $type)
    {
        $this->type = $type;
    }

    public static function create($type, array $decoded = [])
    {
        $new     = new static($type);
        $new->id = $decoded['id'] ?? null;
        if (isset($decoded['nsp'])) {
            $new->nsp = $decoded['nsp'] ?: '/';
        } else {
            $new->nsp = '/';
        }
        $new->data = $decoded['data'] ?? null;
        return $new;
    }

    public function toString()
    {
        $str = '' . $this->type;
        if ($this->nsp && '/' !== $this->nsp) {
            $str .= $this->nsp . ',';
        }

        if ($this->id !== null) {
            $str .= $this->id;
        }

        if (null !== $this->data) {
            $str .= json_encode($this->data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
        }
        return $str;
    }

    public static function fromString(string $str)
    {
        $i = 0;

        $packet = new Packet((int) substr($str, 0, 1));

        // look up namespace (if any)
        if ('/' === substr($str, $i + 1, 1)) {
            $nsp = '';
            while (++$i) {
                $c = substr($str, $i, 1);
                if (',' === $c) {
                    break;
                }
                $nsp .= $c;
                if ($i === strlen($str)) {
                    break;
                }
            }
            $packet->nsp = $nsp;
        } else {
            $packet->nsp = '/';
        }

        // look up id
        $next = substr($str, $i + 1, 1);
        if ('' !== $next && is_numeric($next)) {
            $id = '';
            while (++$i) {
                $c = substr($str, $i, 1);
                if (null == $c || !is_numeric($c)) {
                    --$i;
                    break;
                }
                $id .= substr($str, $i, 1);
                if ($i === strlen($str)) {
                    break;
                }
            }
            $packet->id = intval($id);
        }

        // look up json data
        if (substr($str, ++$i, 1)) {
            $packet->data = json_decode(substr($str, $i), true);
        }

        return $packet;
    }
}

Pusher.php

<?php

namespace think\swoole\websocket;

use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\Manager;
use think\swoole\websocket\message\PushMessage;

/**
 * Class Pusher
 */
class Pusher
{

    /** @var Room */
    protected $room;

    /** @var Manager */
    protected $manager;

    /** @var HandlerInterface */
    protected $handler;

    protected $to = [];

    public function __construct(Manager $manager, Room $room, HandlerInterface $handler)
    {
        $this->manager = $manager;
        $this->room    = $room;
        $this->handler = $handler;
    }

    public function to(...$values)
    {
        foreach ($values as $value) {
            if (is_array($value)) {
                $this->to(...$value);
            } elseif (!in_array($value, $this->to)) {
                $this->to[] = $value;
            }
        }

        return $this;
    }

    /**
     * Push message to related descriptors
     * @param $data
     * @return void
     */
    public function push($data): void
    {
        $fds = [];

        foreach ($this->to as $room) {
            $clients = $this->room->getClients($room);
            if (!empty($clients)) {
                $fds = array_merge($fds, $clients);
            }
        }

        foreach (array_unique($fds) as $fd) {
            [$workerId, $fd] = explode('.', $fd);
            $data = $this->handler->encodeMessage($data);
            $this->manager->sendMessage((int) $workerId, new PushMessage((int) $fd, $data));
        }
    }

    public function emit(string $event, ...$data): void
    {
        $this->push(new Event($event, $data));
    }
}

目录

   ├── Watcher.php
   ├── Websocket.php
   ├── App.php
   ├── Service.php
   ├── Pool.php
   ├── Http.php
   ├── Sandbox.php
   ├── Manager.php
   ├── Middleware.php
   ├── helpers.php
   ├── Table.php
   └── command
       ├── Server.php
       ├── RpcInterface.php
   └── concerns
       ├── WithApplication.php
       ├── InteractsWithRpcServer.php
       ├── InteractsWithSwooleTable.php
       ├── InteractsWithWebsocket.php
       ├── WithRpcClient.php
       ├── ModifyProperty.php
       ├── InteractsWithServer.php
       ├── WithMiddleware.php
       ├── InteractsWithHttp.php
       ├── InteractsWithQueue.php
       ├── InteractsWithRpcConnector.php
       ├── InteractsWithPools.php
       ├── InteractsWithRpcClient.php
       ├── WithContainer.php
       ├── InteractsWithTracing.php
   └── config
       ├── swoole.php
   └── contract
       └── websocket
               ├── HandlerInterface.php
               ├── RoomInterface.php
       ├── ResetterInterface.php
       └── rpc
               ├── ParserInterface.php
   └── coroutine
       ├── Context.php
   └── exception
       ├── RpcResponseException.php
       ├── RpcClientException.php
   └── middleware
       ├── TraceRpcServer.php
       ├── TraceRpcClient.php
       ├── InteractsWithVarDumper.php
   └── pool
       └── proxy
               ├── Connection.php
               ├── Store.php
       ├── Cache.php
       ├── Connector.php
       ├── Db.php
       ├── Proxy.php
       ├── Client.php
   └── resetters
       ├── ResetModel.php
       ├── ResetConfig.php
       ├── ClearInstances.php
       ├── ResetEvent.php
       ├── ResetService.php
       ├── ResetPaginator.php
   └── response
       ├── File.php
   └── rpc
       ├── Protocol.php
       ├── File.php
       └── server
               ├── Dispatcher.php
               ├── Channel.php
       ├── Packer.php
       ├── Sendfile.php
       ├── Error.php
       ├── JsonParser.php
       └── client
               ├── Service.php
               ├── Connector.php
               ├── Proxy.php
               ├── Gateway.php
       └── packer
               ├── File.php
               ├── Buffer.php
   └── watcher
       ├── Driver.php
       ├── Find.php
       ├── Scan.php
   └── websocket
       └── middleware
               ├── SessionInit.php
       ├── Event.php
       └── room
               ├── Redis.php
               ├── Table.php
       ├── Handler.php
       ├── Room.php
       └── message
               ├── PushMessage.php
       └── socketio
               ├── EnginePacket.php
               ├── Handler.php
               ├── Packet.php
       ├── Pusher.php