


namespace think\swoole;

use think\swoole\watcher\Driver;

 * @mixin Driver
class Watcher extends \think\Manager
    protected $namespace = '\\think\\swoole\\watcher\\';

    protected function getConfig(string $name, $default = null)
        return $this->app->config->get('swoole.hot_update.' . $name, $default);

    protected function resolveParams($name): array
        return [
            $this->getConfig('include', []),
            $this->getConfig('exclude', []),
            $this->getConfig('name', []),

    public function getDefaultDriver()
        return $this->getConfig('type', 'scan');



namespace think\swoole;

use Swoole\Http\Response;
use think\Event;
use think\swoole\websocket\Pusher;
use think\swoole\websocket\Room;

 * Class Websocket
class Websocket
     * @var \think\App
    protected $app;

     * @var Room
    protected $room;

     * Scoket sender's fd.
     * @var string
    protected $sender;

    /** @var Event */
    protected $event;

    /** @var Response */
    protected $client;

     * Websocket constructor.
     * @param \think\App $app
     * @param Room $room
     * @param Event $event
    public function __construct(\think\App $app, Room $room, Event $event)
        $this->app   = $app;
        $this->room  = $room;
        $this->event = $event;

     * @return Pusher
    protected function makePusher()
        return $this->app->invokeClass(Pusher::class);

    public function to(...$values)
        return $this->makePusher()->to(...$values);

    public function push($data)

    public function emit(string $event, ...$data)
        $this->makePusher()->to($this->getSender())->emit($event, ...$data);

     * Join sender to multiple rooms.
     * @param string|integer|array $rooms
     * @return $this
    public function join($rooms): self
        $rooms = is_string($rooms) || is_int($rooms) ? func_get_args() : $rooms;

        $this->room->add($this->getSender(), $rooms);

        return $this;

     * Make sender leave multiple rooms.
     * @param array|string|integer $rooms
     * @return $this
    public function leave($rooms = []): self
        $rooms = is_string($rooms) || is_int($rooms) ? func_get_args() : $rooms;

        $this->room->delete($this->getSender(), $rooms);

        return $this;

    public function isEstablished()
        return !!$this->client;

     * Close current connection.
    public function close()
        if ($this->client) {

     * @param Response $response
    public function setClient($response)
        $this->client = $response;

     * Set sender fd.
     * @param string
     * @return $this
    public function setSender(string $fd)
        $this->sender = $fd;
        return $this;

     * Get current sender fd.
    public function getSender()
        return $this->sender;



namespace think\swoole;

class App extends \think\App
    protected $inConsole = true;

    public function setInConsole($inConsole = true)
        $this->inConsole = $inConsole;

    public function runningInConsole(): bool
        return $this->inConsole;

    public function clearInstances()
        $this->instances = [];


// +----------------------------------------------------------------------
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\swoole;

use think\swoole\command\RpcInterface;
use think\swoole\command\Server as ServerCommand;

class Service extends \think\Service

    public function boot()




namespace think\swoole;

use Smf\ConnectionPool\ConnectionPool;
use think\helper\Arr;

class Pool
    protected $pools = [];

     * @param string         $name
     * @param ConnectionPool $pool
     * @return Pool
    public function add(string $name, ConnectionPool $pool)
        $this->pools[$name] = $pool;

        return $this;

     * @param string $name
     * @return ConnectionPool
    public function get(string $name)
        return $this->pools[$name] ?? null;

    public function close(string $key)
        return $this->pools[$key]->close();

     * @return array
    public function getAll()
        return $this->pools;

    public function closeAll()
        foreach ($this->pools as $pool) {

     * @param string $key
     * @return ConnectionPool
    public function __get($key)
        return $this->get($key);

    public static function pullPoolConfig(&$config)
        return [
            'minActive'         => Arr::pull($config, 'min_active', 0),
            'maxActive'         => Arr::pull($config, 'max_active', 10),
            'maxWaitTime'       => Arr::pull($config, 'max_wait_time', 5),
            'maxIdleTime'       => Arr::pull($config, 'max_idle_time', 20),
            'idleCheckInterval' => Arr::pull($config, 'idle_check_interval', 10),



namespace think\swoole;

 * Class Http
 * @package think\swoole
 * @property $request
class Http extends \think\Http

    protected function loadMiddleware(): void


    protected function loadRoutes(): void




namespace think\swoole;

use Closure;
use InvalidArgumentException;
use ReflectionObject;
use RuntimeException;
use Swoole\Coroutine;
use think\App;
use think\Config;
use think\Container;
use think\Event;
use think\swoole\App as SwooleApp;
use think\swoole\concerns\ModifyProperty;
use think\swoole\contract\ResetterInterface;
use think\swoole\coroutine\Context;
use think\swoole\resetters\ClearInstances;
use think\swoole\resetters\ResetConfig;
use think\swoole\resetters\ResetEvent;
use think\swoole\resetters\ResetModel;
use think\swoole\resetters\ResetPaginator;
use think\swoole\resetters\ResetService;
use Throwable;

class Sandbox
    use ModifyProperty;

     * The app containers in different coroutine environment.
     * @var SwooleApp[]
    protected $snapshots = [];

    /** @var SwooleApp */
    protected $app;

    /** @var Config */
    protected $config;

    /** @var Event */
    protected $event;

    /** @var ResetterInterface[] */
    protected $resetters = [];
    protected $services  = [];

    public function __construct(Container $app)

    public function setBaseApp(Container $app)
        $this->app = $app;

        return $this;

    public function getBaseApp()
        return $this->app;

    protected function initialize()
        Container::setInstance(function () {
            return $this->getApplication();


        return $this;

    public function run(Closure $callable)

        try {
            $this->getApplication()->invoke($callable, [$this]);
        } catch (Throwable $e) {
            throw $e;
        } finally {

    public function init()
        $app = $this->getApplication(true);

    public function clear()
        if ($app = $this->getSnapshot()) {


    public function getApplication($init = false)
        $snapshot = $this->getSnapshot($init);
        if ($snapshot instanceof Container) {
            return $snapshot;

        if ($init) {
            $snapshot = clone $this->getBaseApp();

            return $snapshot;
        throw new InvalidArgumentException('The app object has not been initialized');

    protected function getSnapshotId($init = false)
        if ($init) {
            Coroutine::getContext()->offsetSet('#root', true);
            return Coroutine::getCid();
        } else {
            $cid = Coroutine::getCid();
            while (!Coroutine::getContext($cid)->offsetExists('#root')) {
                $cid = Coroutine::getPcid($cid);
                if ($cid < 1) {

            return $cid;

     * Get current snapshot.
     * @return App|null
    public function getSnapshot($init = false)
        return $this->snapshots[$this->getSnapshotId($init)] ?? null;

    public function setSnapshot(Container $snapshot)
        $this->snapshots[$this->getSnapshotId()] = $snapshot;

        return $this;

    public function setInstance(Container $app)
        $app->instance('app', $app);
        $app->instance(Container::class, $app);

        $reflectObject   = new ReflectionObject($app);
        $reflectProperty = $reflectObject->getProperty('services');
        $services = $reflectProperty->getValue($app);

        foreach ($services as $service) {
            $this->modifyProperty($service, $app);

     * Set initial config.
    protected function setInitialConfig()
        $this->config = clone $this->getBaseApp()->config;

    protected function setInitialEvent()
        $this->event = clone $this->getBaseApp()->event;

     * Get config snapshot.
    public function getConfig()
        return $this->config;

    public function getEvent()
        return $this->event;

    public function getServices()
        return $this->services;

    protected function setInitialServices()
        $app = $this->getBaseApp();

        $services = $this->config->get('swoole.services', []);

        foreach ($services as $service) {
            if (class_exists($service) && !in_array($service, $this->services)) {
                $serviceObj               = new $service($app);
                $this->services[$service] = $serviceObj;

     * Initialize resetters.
    protected function setInitialResetters()
        $app = $this->getBaseApp();

        $resetters = [

        $resetters = array_merge($resetters, $this->config->get('swoole.resetters', []));

        foreach ($resetters as $resetter) {
            $resetterClass = $app->make($resetter);
            if (!$resetterClass instanceof ResetterInterface) {
                throw new RuntimeException("{$resetter} must implement " . ResetterInterface::class);
            $this->resetters[$resetter] = $resetterClass;

     * Reset Application.
     * @param App $app
    protected function resetApp(App $app)
        foreach ($this->resetters as $resetter) {
            $resetter->handle($app, $this);



// +----------------------------------------------------------------------
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\swoole;

use think\swoole\concerns\InteractsWithHttp;
use think\swoole\concerns\InteractsWithPools;
use think\swoole\concerns\InteractsWithQueue;
use think\swoole\concerns\InteractsWithRpcClient;
use think\swoole\concerns\InteractsWithRpcServer;
use think\swoole\concerns\InteractsWithServer;
use think\swoole\concerns\InteractsWithSwooleTable;
use think\swoole\concerns\InteractsWithTracing;
use think\swoole\concerns\WithApplication;
use think\swoole\concerns\WithContainer;

 * Class Manager
class Manager
    use InteractsWithServer,

     * Initialize.
    protected function initialize(): void




namespace think\swoole;

use Closure;
use InvalidArgumentException;
use think\App;
use think\Pipeline;

class Middleware
     * 中间件执行队列
     * @var array
    protected $queue = [];

     * @var App
    protected $app;

    public function __construct(App $app, $middlewares = [])
        $this->app = $app;

        foreach ($middlewares as $middleware) {
            $this->queue[] = $this->buildMiddleware($middleware);

    public static function make(App $app, $middlewares = [])
        return new self($app, $middlewares);

     * 调度管道
     * @return Pipeline
    public function pipeline()
        return (new Pipeline())
            ->through(array_map(function ($middleware) {
                return function ($request, $next) use ($middleware) {
                    [$call, $params] = $middleware;

                    if (is_array($call) && is_string($call[0])) {
                        $call = [$this->app->make($call[0]), $call[1]];
                    return call_user_func($call, $request, $next, ...$params);
            }, $this->queue));

     * 解析中间件
     * @param mixed $middleware
     * @return array
    protected function buildMiddleware($middleware): array
        if (is_array($middleware)) {
            [$middleware, $params] = $middleware;

        if ($middleware instanceof Closure) {
            return [$middleware, $params ?? []];

        if (!is_string($middleware)) {
            throw new InvalidArgumentException('The middleware is invalid');

        return [[$middleware, 'handle'], $params ?? []];



namespace {

    if (!function_exists('swoole_cpu_num')) {
        function swoole_cpu_num(): int
            return 1;

    if (!defined('SWOOLE_SOCK_TCP')) {
        define('SWOOLE_SOCK_TCP', 1);

    if (!defined('SWOOLE_PROCESS')) {
        define('SWOOLE_PROCESS', 3);

    if (!defined('SWOOLE_HOOK_ALL')) {
        define('SWOOLE_HOOK_ALL', 1879048191);

namespace think\swoole\helper {

    use think\swoole\response\File;

    function download(string $filename, string $name = '', $disposition = File::DISPOSITION_ATTACHMENT): File
        $response = new File($filename, $disposition);

        if ($name) {
            $response->setContentDisposition($disposition, $name);

        return $response;

    function file(string $filename)
        return new File($filename);


// +----------------------------------------------------------------------
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\swoole;

use Swoole\Table as SwooleTable;

class Table
    public const TYPE_INT = 1;

    public const TYPE_STRING = 3;

    public const TYPE_FLOAT = 2;

     * Registered swoole tables.
     * @var array
    protected $tables = [];

     * Add a swoole table to existing tables.
     * @param string $name
     * @param SwooleTable $table
     * @return Table
    public function add(string $name, SwooleTable $table)
        $this->tables[$name] = $table;

        return $this;

     * Get a swoole table by its name from existing tables.
     * @param string $name
     * @return SwooleTable $table
    public function get(string $name)
        return $this->tables[$name] ?? null;

     * Get all existing swoole tables.
     * @return array
    public function getAll()
        return $this->tables;

     * Dynamically access table.
     * @param string $key
     * @return SwooleTable
    public function __get(string $key)
        return $this->get($key);



// +----------------------------------------------------------------------
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: liu21st <liu21st@gmail.com>
// +----------------------------------------------------------------------

namespace think\swoole\command;

use think\console\Command;
use think\console\input\Option;
use think\swoole\Manager;

class Server extends Command
    public function configure()
                'Environment name',
            ->setDescription('Swoole Server for ThinkPHP');

    public function handle(Manager $manager)

        $this->output->writeln('Starting swoole server...');

        $this->output->writeln('You can exit with <info>`CTRL-C`</info>');

        $envName = $this->input->getOption('env');

     * 检查环境
    protected function checkEnvironment()
        if (!extension_loaded('swoole')) {
            $this->output->error('Can\'t detect Swoole extension installed.');


        if (!version_compare(swoole_version(), '4.6.0', 'ge')) {
            $this->output->error('Your Swoole version must be higher than `4.6.0`.');




namespace think\swoole\command;

use Nette\PhpGenerator\ClassType;
use Nette\PhpGenerator\Dumper;
use Nette\PhpGenerator\Helpers;
use Nette\PhpGenerator\PhpFile;
use think\console\Command;
use think\helper\Arr;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\rpc\client\Gateway;
use think\swoole\rpc\client\Service;
use think\swoole\rpc\JsonParser;
use function Swoole\Coroutine\run;

class RpcInterface extends Command
    public function configure()
            ->setDescription('Generate Rpc Service Interfaces');

    public function handle()
        run(function () {
            $file = new PhpFile;
            $file->addComment('This file is auto-generated.');
            $services = [];

            $clients = $this->app->config->get('swoole.rpc.client', []);

            foreach ($clients as $name => $config) {

                $parserClass = Arr::get($config, 'parser', JsonParser::class);
                /** @var ParserInterface $parser */
                $parser = new $parserClass;

                $gateway = new Gateway($config, $parser, Arr::get($config, 'tries', 2));

                $result = $gateway->getServices();

                $namespace = $file->addNamespace("rpc\\contract\\{$name}");


                foreach ($result as $interface => $methods) {

                    $services[$name][] = $namespace->getName() . "\\{$interface}";

                    $class = $namespace->addInterface($interface);


                    foreach ($methods as $methodName => ['parameters' => $parameters, 'returnType' => $returnType, 'comment' => $comment]) {
                        $method = $class->addMethod($methodName)

                        foreach ($parameters as $parameter) {
                            if ($parameter['type'] && (class_exists($parameter['type']) || interface_exists($parameter['type']))) {
                            $param = $method->addParameter($parameter['name'])

                            if (array_key_exists('default', $parameter)) {

                            if (array_key_exists('nullable', $parameter)) {

            $dumper = new Dumper();

            $services = 'return ' . $dumper->dump($services) . ';';

            file_put_contents($this->app->getBasePath() . 'rpc.php', $file . $services);





namespace think\swoole\concerns;

use Closure;
use think\App;
use think\swoole\App as SwooleApp;
use think\swoole\Manager;
use think\swoole\pool\Cache;
use think\swoole\pool\Db;
use think\swoole\Sandbox;
use Throwable;

 * Trait WithApplication
 * @package think\swoole\concerns
 * @property App $container
trait WithApplication
     * @var SwooleApp
    protected $app;

    protected function prepareApplication(string $envName)
        if (!$this->app instanceof SwooleApp) {
            $this->app = new SwooleApp($this->container->getRootPath());
            $this->app->bind(SwooleApp::class, App::class);
            $this->app->bind(Manager::class, $this);
            if ($this->getConfig('pool.db.enable', true)) {
                $this->app->bind('db', Db::class);
                $this->app->resolving(Db::class, function (Db $db) {
            if ($this->getConfig('pool.cache.enable', true)) {
                $this->app->bind('cache', Cache::class);
            $this->app->instance('request', $this->container->request);

     * 预加载
    protected function prepareConcretes()
        $defaultConcretes = ['db', 'cache', 'event'];

        $concretes = array_merge($defaultConcretes, $this->getConfig('concretes', []));

        foreach ($concretes as $concrete) {
            if ($this->app->has($concrete)) {

    public function getApplication()
        return $this->app;

     * 获取沙箱
     * @return Sandbox
    protected function getSandbox()
        return $this->app->make(Sandbox::class);

     * 在沙箱中执行
     * @param Closure $callable
    public function runInSandbox(Closure $callable)
        try {
        } catch (Throwable $e) {



namespace think\swoole\concerns;

use Swoole\Coroutine\Server;
use Swoole\Coroutine\Server\Connection;
use think\App;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\Pool;
use think\swoole\rpc\Error;
use think\swoole\rpc\File;
use think\swoole\rpc\JsonParser;
use think\swoole\rpc\Packer;
use think\swoole\rpc\server\Dispatcher;
use Throwable;

 * Trait InteractsWithRpc
 * @package think\swoole\concerns
 * @property App $app
 * @property App $container
 * @method Pool getPools()
trait InteractsWithRpcServer
    protected function createRpcServer()

        $host = $this->getConfig('rpc.server.host', '');
        $port = $this->getConfig('rpc.server.port', 9000);

        $server = new Server($host, $port, false, true);

        $server->handle(function (Connection $conn) {

            $this->runInSandbox(function (App $app, Dispatcher $dispatcher) use ($conn) {
                $files = [];
                while (true) {
                    $data = $conn->recv();

                    if ($data === '' || $data === false) {
                    if (!isset($handler)) {
                        try {
                            [$handler, $data] = Packer::unpack($data);
                        } catch (Throwable $e) {
                            $result = Error::make(Dispatcher::INVALID_REQUEST, $e->getMessage());
                            $this->runWithBarrier(function () use ($dispatcher, $app, $conn, $result) {
                                $dispatcher->dispatch($app, $conn, $result);

                    $result = $handler->write($data);

                    if (!empty($result)) {
                        $handler = null;
                        if ($result instanceof File) {
                            $files[] = $result;
                        } else {
                            $this->runWithBarrier(function () use ($dispatcher, $app, $conn, $result, $files) {
                                $dispatcher->dispatch($app, $conn, $result, $files);
                            $files = [];

                    if (!empty($data)) {
                        goto begin;



    protected function prepareRpcServer()
        if ($this->getConfig('rpc.server.enable', false)) {

            $workerNum = $this->getConfig('rpc.server.worker_num', swoole_cpu_num());

            $this->addBatchWorker($workerNum, [$this, 'createRpcServer'], 'rpc server');

    protected function bindRpcDispatcher()
        $services   = $this->getConfig('rpc.server.services', []);
        $middleware = $this->getConfig('rpc.server.middleware', []);

        $this->app->make(Dispatcher::class, [$services, $middleware]);

    protected function bindRpcParser()
        $parserClass = $this->getConfig('rpc.server.parser', JsonParser::class);

        $this->app->bind(ParserInterface::class, $parserClass);



// +----------------------------------------------------------------------
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\swoole\concerns;

use Swoole\Table as SwooleTable;
use think\App;
use think\swoole\Table;

 * Trait InteractsWithSwooleTable
 * @property App $container
 * @property App $app
trait InteractsWithSwooleTable
     * @var Table
    protected $currentTable;

     * Register customized swoole tables.
    protected function prepareTables()
        $this->currentTable = new Table();
        $this->onEvent('workerStart', function () {
            $this->app->instance(Table::class, $this->currentTable);
            foreach ($this->currentTable->getAll() as $name => $table) {
                $this->app->instance("swoole.table.{$name}", $table);

     * Register user-defined swoole tables.
    protected function registerTables()
        $tables = $this->container->make('config')->get('swoole.tables', []);

        foreach ($tables as $key => $value) {
            $table   = new SwooleTable($value['size']);
            $columns = $value['columns'] ?? [];
            foreach ($columns as $column) {
                if (isset($column['size'])) {
                    $table->column($column['name'], $column['type'], $column['size']);
                } else {
                    $table->column($column['name'], $column['type']);

            $this->currentTable->add($key, $table);



namespace think\swoole\concerns;

use Swoole\Atomic;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\CloseFrame;
use Swoole\WebSocket\Frame;
use think\App;
use think\helper\Str;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\contract\websocket\RoomInterface;
use think\swoole\Middleware;
use think\swoole\Websocket;
use think\swoole\websocket\message\PushMessage;
use think\swoole\websocket\Room;

 * Trait InteractsWithWebsocket
 * @package think\swoole\concerns
 * @property App $app
 * @property App $container
trait InteractsWithWebsocket

     * @var RoomInterface
    protected $wsRoom;

     * @var Channel[]
    protected $wsMessageChannel = [];

    protected $wsEnable = false;

    /** @var Atomic */
    protected $wsIdAtomic;

     * "onHandShake" listener.
     * @param Request $req
     * @param Response $res
    public function onHandShake($req, $res)
        $this->runInSandbox(function (App $app, Websocket $websocket, HandlerInterface $handler) use ($req, $res) {


            $request = $this->prepareRequest($req);
            $request = $this->setRequestThroughMiddleware($app, $request);

            $fd = $this->wsIdAtomic->add();

            $this->wsMessageChannel[$fd] = new Channel(1);

            Coroutine::create(function () use ($res, $fd) {
                while ($message = $this->wsMessageChannel[$fd]->pop()) {

            try {
                $id = "{$this->workerId}.{$fd}";


                $this->runWithBarrier(function () use ($request, $handler) {

                $this->runWithBarrier(function () use ($handler, $res) {

                    $cid      = Coroutine::getCid();
                    $messages = 0;
                    $wait     = false;

                    $frame = null;
                    while (true) {
                        /** @var Frame|false|string $recv */
                        $recv = $res->recv();
                        if ($recv === '' || $recv === false || $recv instanceof CloseFrame) {

                        if (empty($frame)) {
                            $frame         = new Frame();
                            $frame->opcode = $recv->opcode;
                            $frame->flags  = $recv->flags;
                            $frame->fd     = $recv->fd;
                            $frame->finish = false;

                        $frame->data .= $recv->data;

                        $frame->finish = $recv->finish;

                        if ($frame->finish) {
                            Coroutine::create(function () use (&$wait, &$messages, $cid, $frame, $handler) {
                                Coroutine::defer(function () use (&$wait, &$messages, $cid) {
                                    if ($wait) {
                            $frame = null;

                    while ($messages > 0) {
                        $wait = true;

                $this->runWithBarrier(function () use ($handler) {
            } finally {
                // leave all rooms
                if (isset($this->wsMessageChannel[$fd])) {

     * @param App $app
     * @param \think\Request $request
     * @return \think\Request
    protected function setRequestThroughMiddleware(App $app, \think\Request $request)
        $app->instance('request', $request);
        return Middleware::make($app, $this->getConfig('websocket.middleware', []))
            ->then(function ($request) {
                return $request;

     * Prepare settings if websocket is enabled.
    protected function prepareWebsocket()

        $this->onEvent('message', function ($message) {
            if ($message instanceof PushMessage) {
                if (isset($this->wsMessageChannel[$message->fd])) {

        $this->onEvent('workerStart', function () {

    protected function prepareWebsocketIdAtomic()
        $this->wsIdAtomic = new Atomic();

     * Prepare websocket room.
    protected function prepareWebsocketRoom()
        $this->wsRoom = $this->container->make(Room::class);

    protected function prepareWebsocketListener()
        $listeners = $this->getConfig('websocket.listen', []);

        foreach ($listeners as $event => $listener) {
            $this->app->event->listen('swoole.websocket.' . Str::studly($event), $listener);

        $subscribers = $this->getConfig('websocket.subscribe', []);

        foreach ($subscribers as $subscriber) {
            $this->app->event->observe($subscriber, 'swoole.websocket.');

     * Prepare websocket handler for onOpen and onClose callback
    protected function bindWebsocketHandler()
        $handlerClass = $this->getConfig('websocket.handler');
        $this->app->bind(HandlerInterface::class, $handlerClass);

     * Bind room instance to app container.
    protected function bindWebsocketRoom(): void
        $this->app->instance(Room::class, $this->wsRoom);




namespace think\swoole\concerns;

use think\App;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\helper\Arr;
use think\swoole\rpc\client\Gateway;
use think\swoole\rpc\client\Proxy;
use think\swoole\rpc\JsonParser;
use Throwable;
use function Swoole\Coroutine\run;

 * 在命令行里使用RPC
 * @property App $app
trait WithRpcClient
    public function __construct()
        if (!($this instanceof Command)) {
            throw new \RuntimeException('Trait `WithRpcClient` only can be used in Command');

    protected function execute(Input $input, Output $output)
        run(function () {
            $this->app->invoke([$this, 'handle']);

    protected function bindRpcInterface()
        if (file_exists($rpc = $this->app->getBasePath() . 'rpc.php')) {
            $rpcServices = (array) include $rpc;

            try {
                foreach ($rpcServices as $name => $abstracts) {

                    $config = $this->app->config->get("swoole.rpc.client.{$name}", []);

                    $parserClass = Arr::pull($config, 'parser', JsonParser::class);
                    $tries       = Arr::pull($config, 'tries', 2);
                    $middleware  = Arr::pull($config, 'middleware', []);

                    $parser  = $this->app->make($parserClass);
                    $gateway = new Gateway($config, $parser, $tries);

                    foreach ($abstracts as $abstract) {
                        $this->app->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) {
                            return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]);
            } catch (Throwable $e) {



namespace think\swoole\concerns;

use ReflectionObject;

trait ModifyProperty
    protected function modifyProperty($object, $value, $property = 'app')
        $reflectObject = new ReflectionObject($object);
        if ($reflectObject->hasProperty($property)) {
            $reflectProperty = $reflectObject->getProperty($property);
            $reflectProperty->setValue($object, $value);



namespace think\swoole\concerns;

use Swoole\Constant;
use Swoole\Coroutine;
use Swoole\Event;
use Swoole\Process;
use Swoole\Process\Pool;
use Swoole\Runtime;
use think\App;
use think\swoole\Watcher;

 * Trait InteractsWithServer
 * @package think\swoole\concerns
 * @property App $container
trait InteractsWithServer

     * @var array
    protected $startFuncMap = [];

    protected $workerId;

    /** @var Pool */
    protected $pool;

    public function addBatchWorker(int $workerNum, callable $func, $name = null)
        for ($i = 0; $i < $workerNum; $i++) {
            $this->addWorker($func, $name ? "{$name} #{$i}" : null);
        return $this;

    public function addWorker(callable $func, $name = null): self
        $this->startFuncMap[] = [$func, $name];
        return $this;

     * 启动服务
     * @param string $envName 环境变量标识
    public function start(string $envName): void
        $this->setProcessName('manager process');


        if ($this->getConfig('hot_update.enable', false)) {

        $pool = new Pool(count($this->startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, true);

        $pool->on(Constant::EVENT_WORKER_START, function ($pool, $workerId) use ($envName) {


            $this->pool     = $pool;
            $this->workerId = $workerId;

            [$func, $name] = $this->startFuncMap[$workerId];

            if ($name) {

            Process::signal(SIGTERM, function () {

            /** @var Coroutine\Socket $socket */
            $socket = $this->pool->getProcess()->exportSocket();

            Event::add($socket, function (Coroutine\Socket $socket) {
                $recv    = $socket->recv();
                $message = unserialize($recv);
                $this->triggerEvent('message', $message);



            $func($pool, $workerId);


    public function getWorkerId()
        return $this->workerId;
     * 获取当前工作进程池对象
     * @return Pool
    public function getPool()
        return $this->pool;

    public function sendMessage($workerId, $message)
        if ($workerId === $this->workerId) {
            $this->triggerEvent('message', $message);
        } else {
            /** @var Process $process */
            $process = $this->pool->getProcess($workerId);
            $socket  = $process->exportSocket();

    public function runWithBarrier(callable $func, ...$params)
        $channel = new Coroutine\Channel(1);

        Coroutine::create(function (...$params) use ($channel, $func) {
            Coroutine::defer(function () use ($channel) {

            call_user_func_array($func, $params);
        }, ...$params);


     * 热更新
    protected function addHotUpdateProcess()
        $this->addWorker(function (Process\Pool $pool) {

            $watcher = $this->container->make(Watcher::class);

            $watcher->watch(function () use ($pool) {
                Process::kill($pool->master_pid, SIGUSR1);
        }, 'hot update');

     * 清除apc、op缓存
    protected function clearCache()
        if (extension_loaded('apc')) {

        if (extension_loaded('Zend OPcache')) {

     * Set process name.
     * @param $process
    protected function setProcessName($process)
        $appName = $this->container->config->get('app.name', 'ThinkPHP');

        $name = sprintf('swoole: %s process for %s', $process, $appName);




namespace think\swoole\concerns;

trait WithMiddleware
     * 中间件
     * @var array
    protected $middleware = [];

    protected function middleware($middleware, ...$params)
        $options = [];

        $this->middleware[] = [
            'middleware' => [$middleware, $params],
            'options'    => &$options,

        return new class($options) {
            protected $options;

            public function __construct(array &$options)
                $this->options = &$options;

            public function only($methods)
                $this->options['only'] = is_array($methods) ? $methods : func_get_args();
                return $this;

            public function except($methods)
                $this->options['except'] = is_array($methods) ? $methods : func_get_args();

                return $this;



namespace think\swoole\concerns;

use Swoole\Coroutine\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\Http\Status;
use think\App;
use think\Cookie;
use think\Event;
use think\exception\Handle;
use think\helper\Arr;
use think\helper\Str;
use think\Http;
use think\swoole\App as SwooleApp;
use think\swoole\Http as SwooleHttp;
use think\swoole\response\File as FileResponse;
use Throwable;
use function substr;

 * Trait InteractsWithHttp
 * @package think\swoole\concerns
 * @property App $app
 * @property App $container
trait InteractsWithHttp
    use InteractsWithWebsocket, ModifyProperty;

    public function createHttpServer()

        $host    = $this->getConfig('http.host');
        $port    = $this->getConfig('http.port');
        $options = $this->getConfig('http.options', []);

        $server = new Server($host, $port, false, true);

        $server->handle('/', function (Request $req, Response $res) {
            if ($this->wsEnable && $this->isWebsocketRequest($req)) {
                $this->onHandShake($req, $res);
            } else {
                $this->onRequest($req, $res);


    protected function preloadHttp()
        $http = $this->app->http;
        $this->app->invokeMethod([$http, 'loadMiddleware'], [], true);

        if ($this->app->config->get('app.with_route', true)) {
            $this->app->invokeMethod([$http, 'loadRoutes'], [], true);
            $route = clone $this->app->route;
            $this->modifyProperty($route, null);

            $this->app->resolving(SwooleHttp::class, function ($http, App $app) use ($route) {
                $newRoute = clone $route;
                $this->modifyProperty($newRoute, $app);
                $app->instance('route', $newRoute);

        $middleware = clone $this->app->middleware;
        $this->modifyProperty($middleware, null);

        $this->app->resolving(SwooleHttp::class, function ($http, App $app) use ($middleware) {
            $newMiddleware = clone $middleware;
            $this->modifyProperty($newMiddleware, $app);
            $app->instance('middleware', $newMiddleware);

        $this->app->bind(Http::class, SwooleHttp::class);

    protected function isWebsocketRequest(Request $req)
        $header = $req->header;
        return strcasecmp(Arr::get($header, 'connection', ''), 'upgrade') === 0 &&
            strcasecmp(Arr::get($header, 'upgrade', ''), 'websocket') === 0;

    protected function prepareHttp()
        if ($this->getConfig('http.enable', true)) {

            $this->wsEnable = $this->getConfig('websocket.enable', false);

            if ($this->wsEnable) {

            $workerNum = $this->getConfig('http.worker_num', swoole_cpu_num());

            $this->addBatchWorker($workerNum, [$this, 'createHttpServer'], 'http server');

     * "onRequest" listener.
     * @param Request $req
     * @param Response $res
    public function onRequest($req, $res)
        $this->runWithBarrier([$this, 'runInSandbox'], function (Http $http, Event $event, SwooleApp $app) use ($req, $res) {

            $request = $this->prepareRequest($req);

            try {
                $response = $this->handleRequest($http, $request);
            } catch (Throwable $e) {
                $response = $this->app
                    ->render($request, $e);

            $this->setCookie($res, $app->cookie);
            $this->sendResponse($res, $request, $response);

    protected function handleRequest(Http $http, $request)
        $level = ob_get_level();

        $response = $http->run($request);

        $content = $response->getContent();

        if (ob_get_level() == 0) {


        if (ob_get_length() > 0) {
            $response->content(ob_get_contents() . $content);

        while (ob_get_level() > $level) {

        return $response;

    protected function prepareRequest(Request $req)
        $header = $req->header ?: [];
        $server = $req->server ?: [];

        foreach ($header as $key => $value) {
            $server['http_' . str_replace('-', '_', $key)] = $value;

        // 重新实例化请求对象 处理swoole请求数据
        /** @var \think\Request $request */
        $request = $this->app->make('request', [], true);

        return $request
            ->withGet($req->get ?: [])
            ->withPost($req->post ?: [])
            ->withCookie($req->cookie ?: [])
            ->setUrl($req->server['request_uri'] . (!empty($req->server['query_string']) ? '?' . $req->server['query_string'] : ''))
            ->setPathinfo(ltrim($req->server['path_info'], '/'));

    protected function getFiles(Request $req)
        if (empty($req->files)) {
            return [];

        return array_map(function ($file) {
            if (!Arr::isAssoc($file)) {
                $files = [];
                foreach ($file as $f) {
                    $files['name'][]     = $f['name'];
                    $files['type'][]     = $f['type'];
                    $files['tmp_name'][] = $f['tmp_name'];
                    $files['error'][]    = $f['error'];
                    $files['size'][]     = $f['size'];
                return $files;
            return $file;
        }, $req->files);

    protected function setCookie(Response $res, Cookie $cookie)
        foreach ($cookie->getCookie() as $name => $val) {
            [$value, $expire, $option] = $val;

            $res->cookie($name, $value, $expire, $option['path'], $option['domain'], (bool) $option['secure'], (bool) $option['httponly'], $option['samesite']);

    protected function setHeader(Response $res, array $headers)
        foreach ($headers as $key => $val) {
            $res->header($key, $val);

    protected function setStatus(Response $res, $code)
        $res->status($code, Status::getReasonPhrase($code));

    protected function sendResponse(Response $res, \think\Request $request, \think\Response $response)
        switch (true) {
            case $response instanceof FileResponse:
                $this->sendFile($res, $request, $response);
                $this->sendContent($res, $response);

    protected function sendFile(Response $res, \think\Request $request, FileResponse $response)
        $ifNoneMatch = $request->header('If-None-Match');
        $ifRange     = $request->header('If-Range');

        $code         = $response->getCode();
        $file         = $response->getFile();
        $eTag         = $response->getHeader('ETag');
        $lastModified = $response->getHeader('Last-Modified');

        $fileSize = $file->getSize();
        $offset   = 0;
        $length   = -1;

        if ($ifNoneMatch == $eTag) {
            $code = 304;
        } elseif (!$ifRange || $ifRange === $eTag || $ifRange === $lastModified) {
            $range = $request->header('Range', '');
            if (Str::startsWith($range, 'bytes=')) {
                [$start, $end] = explode('-', substr($range, 6), 2) + [0];

                $end = ('' === $end) ? $fileSize - 1 : (int) $end;

                if ('' === $start) {
                    $start = $fileSize - $end;
                    $end   = $fileSize - 1;
                } else {
                    $start = (int) $start;

                if ($start <= $end) {
                    $end = min($end, $fileSize - 1);
                    if ($start < 0 || $start > $end) {
                        $code = 416;
                            'Content-Range' => sprintf('bytes */%s', $fileSize),
                    } elseif ($end - $start < $fileSize - 1) {
                        $length = $end < $fileSize ? $end - $start + 1 : -1;
                        $offset = $start;
                        $code   = 206;
                            'Content-Range'  => sprintf('bytes %s-%s/%s', $start, $end, $fileSize),
                            'Content-Length' => $end - $start + 1,

        $this->setStatus($res, $code);
        $this->setHeader($res, $response->getHeader());

        if ($code >= 200 && $code < 300 && $length !== 0) {
            $res->sendfile($file->getPathname(), $offset, $length);
        } else {

    protected function sendContent(Response $res, \think\Response $response)
        // 由于开启了 Transfer-Encoding: chunked,根据 HTTP 规范,不再需要设置 Content-Length
        $response->header(['Content-Length' => null]);

        $this->setStatus($res, $response->getCode());
        $this->setHeader($res, $response->getHeader());

        $content = $response->getContent();
        if ($content) {
            $contentSize = strlen($content);
            $chunkSize   = 8192;

            if ($contentSize > $chunkSize) {
                $sendSize = 0;
                do {
                    if (!$res->write(substr($content, $sendSize, $chunkSize))) {
                } while (($sendSize += $chunkSize) < $contentSize);
            } else {



namespace think\swoole\concerns;

use Swoole\Process;
use Swoole\Timer;
use think\helper\Arr;
use think\queue\event\JobFailed;
use think\queue\Worker;

trait InteractsWithQueue
    protected function createQueueWorkers()
        $workers = $this->getConfig('queue.workers', []);

        foreach ($workers as $queue => $options) {

            if (strpos($queue, '@') !== false) {
                [$queue, $connection] = explode('@', $queue);
            } else {
                $connection = null;

            $workerNum = Arr::get($options, 'worker_num', 1);

            $this->addBatchWorker($workerNum, function (Process\Pool $pool) use ($options, $connection, $queue) {
                $delay   = Arr::get($options, 'delay', 0);
                $sleep   = Arr::get($options, 'sleep', 3);
                $tries   = Arr::get($options, 'tries', 0);
                $timeout = Arr::get($options, 'timeout', 60);

                /** @var Worker $worker */
                $worker = $this->app->make(Worker::class);

                while (true) {
                    $timer = Timer::after($timeout * 1000, function () use ($pool) {

                    $this->runWithBarrier([$this, 'runInSandbox'], function () use ($connection, $queue, $delay, $sleep, $tries, $worker) {
                        $worker->runNextJob($connection, $queue, $delay, $sleep, $tries);

            }, "queue [$queue]");

    public function prepareQueue()
        if ($this->getConfig('queue.enable', false)) {

     * 注册事件
    protected function listenForEvents()
        $this->container->event->listen(JobFailed::class, function (JobFailed $event) {

     * 记录失败任务
     * @param JobFailed $event
    protected function logFailedJob(JobFailed $event)



namespace think\swoole\concerns;

use Generator;
use Swoole\Coroutine\Client;
use think\swoole\exception\RpcClientException;
use think\swoole\rpc\File;
use think\swoole\rpc\Packer;
use think\swoole\rpc\Protocol;

trait InteractsWithRpcConnector
    abstract protected function runWithClient($callback);

    protected function recv(Client $client, callable $decoder)
        $handler = null;
        $file    = null;

        while ($data = $client->recv()) {
            if (empty($handler)) {
                [$handler, $data] = Packer::unpack($data);

            $response = $handler->write($data);

            if (!empty($response)) {
                $handler = null;

                if ($response instanceof File) {
                    $file = $response;
                } else {
                    $result = $decoder($response);
                    if ($result === Protocol::FILE) {
                        $result = $file;
                    return $result;

            if (!empty($data)) {
                goto begin;

        if ($data === '') {
            throw new RpcClientException('Connection is closed. ' . $client->errMsg, $client->errCode);
        if ($data === false) {
            throw new RpcClientException('Error receiving data, errno=' . $client->errCode . ' errmsg=' . swoole_strerror($client->errCode), $client->errCode);

    public function sendAndRecv($data, callable $decoder)
        if (!$data instanceof Generator) {
            $data = [$data];

        return $this->runWithClient(function (Client $client) use ($decoder, $data) {
            try {
                foreach ($data as $string) {
                    if (!empty($string)) {
                        if ($client->send($string) === false) {
                            throw new RpcClientException('Send data failed. ' . $client->errMsg, $client->errCode);
                return $this->recv($client, $decoder);
            } catch (RpcClientException $e) {
                throw $e;




namespace think\swoole\concerns;

use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\ConnectorInterface;
use think\App;
use think\helper\Arr;
use think\swoole\Pool;

 * Trait InteractsWithRpc
 * @package think\swoole\concerns
 * @property App $app
trait InteractsWithPools
     * @return Pool
    public function getPools()
        return $this->app->make(Pool::class);

    protected function preparePools()
        $createPools = function () {
            /** @var Pool $pool */
            $pools = $this->getPools();

            foreach ($this->getConfig('pool', []) as $name => $config) {
                $type = Arr::pull($config, 'type');
                if ($type && is_subclass_of($type, ConnectorInterface::class)) {
                    $pool = new ConnectionPool(
                    $pools->add($name, $pool);
                    $this->app->instance("swoole.pool.{$name}", $pool);

        $this->onEvent('workerStart', $createPools);



namespace think\swoole\concerns;

use Smf\ConnectionPool\ConnectionPool;
use think\App;
use think\swoole\Pool;
use think\swoole\pool\Client;
use think\swoole\rpc\client\Connector;
use think\swoole\rpc\client\Gateway;
use think\swoole\rpc\client\Proxy;
use think\swoole\rpc\JsonParser;
use Throwable;

 * Trait InteractsWithRpcClient
 * @package think\swoole\concerns
 * @property App $app
 * @property App $container
 * @method Pool getPools()
trait InteractsWithRpcClient
    protected function prepareRpcClient()
        $this->onEvent('workerStart', function () {

    protected function bindRpcInterface()
        if (file_exists($rpc = $this->container->getBasePath() . 'rpc.php')) {

            $rpcServices = (array) include $rpc;

            try {
                foreach ($rpcServices as $name => $abstracts) {
                    $parserClass = $this->getConfig("rpc.client.{$name}.parser", JsonParser::class);
                    $tries       = $this->getConfig("rpc.client.{$name}.tries", 2);
                    $middleware  = $this->getConfig("rpc.client.{$name}.middleware", []);

                    $parser  = $this->getApplication()->make($parserClass);
                    $gateway = new Gateway($this->createRpcConnector($name), $parser, $tries);

                    foreach ($abstracts as $abstract) {
                            ->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) {
                                return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]);
            } catch (Throwable $e) {

    protected function bindRpcClientPool()
        if (!empty($clients = $this->getConfig('rpc.client'))) {
            foreach ($clients as $name => $config) {
                $pool = new ConnectionPool(
                    new Client(),
                $this->getPools()->add("rpc.client.{$name}", $pool);

    protected function createRpcConnector($name)
        $pool = $this->getPools()->get("rpc.client.{$name}");

        return new class($pool) implements Connector {

            use InteractsWithRpcConnector;

            protected $pool;

            public function __construct(ConnectionPool $pool)
                $this->pool = $pool;

            protected function runWithClient($callback)
                /** @var \Swoole\Coroutine\Client $client */
                $client = $this->pool->borrow();
                try {
                    return $callback($client);
                } finally {



namespace think\swoole\concerns;

use think\App;
use think\console\Output;
use think\exception\Handle;
use Throwable;

trait WithContainer

     * @var App
    protected $container;

     * Manager constructor.
     * @param App $container
    public function __construct(App $container)
        $this->container = $container;

    protected function getContainer()
        return $this->container;

     * 获取配置
     * @param string $name
     * @param null $default
     * @return mixed
    public function getConfig(string $name, $default = null)
        return $this->container->config->get("swoole.{$name}", $default);

     * 触发事件
     * @param string $event
     * @param null $params
    public function triggerEvent(string $event, $params = null): void
        $this->container->event->trigger("swoole.{$event}", $params);

     * 监听事件
     * @param string $event
     * @param        $listener
     * @param bool $first
    public function onEvent(string $event, $listener, bool $first = false): void
        $this->container->event->listen("swoole.{$event}", $listener, $first);

     * Log server error.
     * @param Throwable $e
    public function logServerError(Throwable $e)
        /** @var Handle $handle */
        $handle = $this->container->make(Handle::class);

        $handle->renderForConsole(new Output(), $e);




namespace think\swoole\concerns;

use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\PhpRedisConnector;
use Swoole\Coroutine;
use think\helper\Arr;
use think\swoole\coroutine\Context;
use think\swoole\Pool;
use think\tracing\reporter\RedisReporter;
use think\tracing\Tracer;

 * 链路追踪上报进程
trait InteractsWithTracing
    protected function prepareTracing()
        if (class_exists(Tracer::class)) {
            $tracers  = $this->container->config->get('tracing.tracers');
            $hasAsync = false;
            foreach ($tracers as $name => $tracer) {
                if (Arr::get($tracer, 'async', false)) {
                    $this->addWorker(function () use ($name) {
                        $tracer = $this->app->make(Tracer::class)->tracer($name);

                    }, "tracing [{$name}]");
                    $hasAsync = true;

            if ($hasAsync) {
                $this->onEvent('workerStart', function () {

    protected function bindTracingRedisReporter()
        $this->getApplication()->bind(RedisReporter::class, function ($name) {

            $pool = $this->getPools()->get("tracing.redis");

            $redis = Context::rememberData('tracing.redis', function () use ($pool) {
                $redis = $pool->borrow();
                Coroutine::defer(function () use ($pool, $redis) {

                return $redis;

            return new RedisReporter($name, $redis);

    protected function bindTracingRedisPool()
        $config = $this->container->config->get('tracing.redis');

        $pool = new ConnectionPool(
            new PhpRedisConnector(),
        $this->getPools()->add("tracing.redis", $pool);




use think\swoole\websocket\socketio\Handler;

return [
    'http'       => [
        'enable'     => true,
        'host'       => '',
        'port'       => 80,
        'worker_num' => swoole_cpu_num(),
        'options'    => [],
    'websocket'  => [
        'enable'        => false,
        'handler'       => Handler::class,
        'ping_interval' => 25000,
        'ping_timeout'  => 60000,
        'room'          => [
            'type'  => 'table',
            'table' => [
                'room_rows'   => 8192,
                'room_size'   => 2048,
                'client_rows' => 4096,
                'client_size' => 2048,
            'redis' => [
                'host'          => '',
                'port'          => 6379,
                'max_active'    => 3,
                'max_wait_time' => 5,
        'listen'        => [],
        'subscribe'     => [],
    'rpc'        => [
        'server' => [
            'enable'     => false,
            'host'       => '',
            'port'       => 9000,
            'worker_num' => swoole_cpu_num(),
            'services'   => [],
        'client' => [],
    'queue'      => [
        'enable'  => false,
        'workers' => [],
    'hot_update' => [
        'enable'  => env('APP_DEBUG', false),
        'name'    => ['*.php'],
        'include' => [app_path()],
        'exclude' => [],
    'pool'       => [
        'db'    => [
            'enable'        => true,
            'max_active'    => 3,
            'max_wait_time' => 5,
        'cache' => [
            'enable'        => true,
            'max_active'    => 3,
            'max_wait_time' => 5,
    'tables'     => [],
    'concretes'  => [],
    'resetters'  => [],
    'instances'  => [],
    'services'   => [],




namespace think\swoole\contract\websocket;

use Swoole\WebSocket\Frame;
use think\Request;

interface HandlerInterface
     * "onOpen" listener.
     * @param Request $request
    public function onOpen(Request $request);

     * "onMessage" listener.
     * @param Frame $frame
    public function onMessage(Frame $frame);

     * "onClose" listener.
    public function onClose();

    public function encodeMessage($message);


namespace think\swoole\contract\websocket;

interface RoomInterface
     * Rooms key
     * @const string
    public const ROOMS_KEY = 'rooms';

     * Descriptors key
     * @const string
    public const DESCRIPTORS_KEY = 'fds';

     * Do some init stuffs before workers started.
     * @return RoomInterface
    public function prepare(): RoomInterface;

     * Add multiple socket fds to a room.
     * @param string fd
     * @param array|string $roomNames
    public function add($fd, $roomNames);

     * Delete multiple socket fds from a room.
     * @param string fd
     * @param array|string $roomNames
    public function delete($fd, $roomNames);

     * Get all sockets by a room key.
     * @param string room
     * @return array
    public function getClients(string $room);

     * Get all rooms by a fd.
     * @param string fd
     * @return array
    public function getRooms($fd);



namespace think\swoole\contract;

use think\App;
use think\swoole\Sandbox;

interface ResetterInterface
     * "handle" function for resetting app.
     * @param \think\App $app
     * @param Sandbox $sandbox
    public function handle(App $app, Sandbox $sandbox);



namespace think\swoole\contract\rpc;

use think\swoole\rpc\Protocol;

interface ParserInterface

    const EOF = "\r\n\r\n";

     * @param Protocol $protocol
     * @return string
    public function encode(Protocol $protocol): string;

     * @param string $string
     * @return Protocol
    public function decode(string $string): Protocol;

     * @param string $string
     * @return mixed
    public function decodeResponse(string $string);

     * @param mixed $result
     * @return string
    public function encodeResponse($result): string;




namespace think\swoole\coroutine;

use ArrayObject;
use Closure;
use Swoole\Coroutine;

class Context

     * 获取协程上下文
     * @param int $cid
     * @return Coroutine\Context
    public static function get($cid = 0)
        return Coroutine::getContext($cid);

    public static function getDataObject()
        $context = self::get();
        if (!isset($context['#data'])) {
            $context['#data'] = new ArrayObject();
        return $context['#data'];

     * 获取当前协程临时数据
     * @param string $key
     * @param null $default
     * @return mixed|null
    public static function getData(string $key, $default = null)
        if (self::hasData($key)) {
            return self::getDataObject()->offsetGet($key);
        return $default;

     * 判断是否存在临时数据
     * @param string $key
     * @return bool
    public static function hasData(string $key)
        return self::getDataObject()->offsetExists($key);

     * 写入临时数据
     * @param string $key
     * @param $value
    public static function setData(string $key, $value)
        self::getDataObject()->offsetSet($key, $value);

     * 删除数据
     * @param string $key
    public static function removeData(string $key)
        if (self::hasData($key)) {

     * 如果不存在则写入数据
     * @param string $key
     * @param $value
     * @return mixed|null
    public static function rememberData(string $key, $value)
        if (self::hasData($key)) {
            return self::getData($key);

        if ($value instanceof Closure) {
            // 获取缓存数据
            $value = $value();

        self::setData($key, $value);

        return $value;

     * @internal
     * 清空数据
    public static function clear()

     * 获取当前协程ID
     * @return mixed
    public static function getCoroutineId()
        return Coroutine::getuid();




namespace think\swoole\exception;

use Exception;
use think\swoole\rpc\Error;

class RpcResponseException extends Exception
    protected $error;

    public function __construct(Error $error)
        parent::__construct($error->getMessage(), $error->getCode());
        $this->error = $error;

    public function getError()
        return $this->error;



namespace think\swoole\exception;

use Exception;

class RpcClientException extends Exception





namespace think\swoole\middleware;

use think\swoole\rpc\Protocol;
use think\tracing\Tracer;
use Throwable;
use const OpenTracing\Formats\TEXT_MAP;
use const OpenTracing\Tags\ERROR;
use const OpenTracing\Tags\SPAN_KIND;
use const OpenTracing\Tags\SPAN_KIND_RPC_SERVER;

class TraceRpcServer
    protected $tracer;

    public function __construct(Tracer $tracer)
        $this->tracer = $tracer;

    public function handle(Protocol $protocol, $next)
        $context = $this->tracer->extract(TEXT_MAP, $protocol->getContext());
        $scope   = $this->tracer->startActiveSpan(
            'rpc.server:' . $protocol->getInterface() . '@' . $protocol->getMethod(),
                'child_of' => $context,
                'tags'     => [
                    SPAN_KIND => SPAN_KIND_RPC_SERVER,
        $span    = $scope->getSpan();

        try {
            return $next($protocol);
        } catch (Throwable $e) {
            $span->setTag(ERROR, $e);
            throw $e;
        } finally {



namespace think\swoole\middleware;

use think\swoole\exception\RpcResponseException;
use think\swoole\rpc\Protocol;
use think\tracing\Tracer;
use Throwable;
use const OpenTracing\Formats\TEXT_MAP;
use const OpenTracing\Tags\ERROR;
use const OpenTracing\Tags\SPAN_KIND;
use const OpenTracing\Tags\SPAN_KIND_RPC_CLIENT;

class TraceRpcClient
    protected $tracer;

    public function __construct(Tracer $tracer)
        $this->tracer = $tracer;

    public function handle(Protocol $protocol, $next)
        $scope   = $this->tracer->startActiveSpan(
            'rpc.client:' . $protocol->getInterface() . '@' . $protocol->getMethod(),
                'tags' => [
                    SPAN_KIND => SPAN_KIND_RPC_CLIENT,
        $span    = $scope->getSpan();
        $context = $protocol->getContext();
        $this->tracer->inject($span->getContext(), TEXT_MAP, $context);

        try {
            return $next($protocol);
        } catch (Throwable $e) {
            if (!$e instanceof RpcResponseException) {
                $span->setTag(ERROR, $e);
            throw $e;
        } finally {



namespace think\swoole\middleware;

use Closure;
use Symfony\Component\VarDumper\Cloner\VarCloner;
use Symfony\Component\VarDumper\Dumper\HtmlDumper;
use Symfony\Component\VarDumper\VarDumper;
use think\Request;
use think\Response;

class InteractsWithVarDumper
     * @param Request $request
     * @param Closure $next
     * @return Response
    public function handle($request, Closure $next)
        if (class_exists(VarDumper::class)) {
            $cloner = new VarCloner();
            $dumper = new HtmlDumper();

            $prevHandler = VarDumper::setHandler(function ($var) use ($dumper, $cloner) {

            /** @var Response $response */
            $response = $next($request);

            return $response;
        return $next($request);




namespace think\swoole\pool\proxy;

use Psr\SimpleCache\CacheInterface;
use think\db\BaseQuery;
use think\db\ConnectionInterface;
use think\DbManager;
use think\swoole\pool\Proxy;

 * Class Connection
 * @package think\swoole\pool\db
 * @property ConnectionInterface $handler
class Connection extends Proxy implements ConnectionInterface

     * 获取当前连接器类对应的Query类
     * @access public
     * @return string
    public function getQueryClass(): string
        return $this->__call(__FUNCTION__, func_get_args());

     * 连接数据库方法
     * @access public
     * @param array $config 接参数
     * @param integer $linkNum 连接序号
     * @return mixed
    public function connect(array $config = [], $linkNum = 0)
        return $this->__call(__FUNCTION__, func_get_args());

     * 设置当前的数据库Db对象
     * @access public
     * @param DbManager $db
     * @return void
    public function setDb(DbManager $db)
        return $this->__call(__FUNCTION__, func_get_args());

     * 设置当前的缓存对象
     * @access public
     * @param CacheInterface $cache
     * @return void
    public function setCache(CacheInterface $cache)
        return $this->__call(__FUNCTION__, func_get_args());

     * 获取数据库的配置参数
     * @access public
     * @param string $config 配置名称
     * @return mixed
    public function getConfig(string $config = '')
        return $this->__call(__FUNCTION__, func_get_args());

     * 关闭数据库(或者重新连接)
     * @access public
    public function close()
        return $this->__call(__FUNCTION__, func_get_args());

     * 查找单条记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @return array
    public function find(BaseQuery $query): array
        return $this->__call(__FUNCTION__, func_get_args());

     * 查找记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @return array
    public function select(BaseQuery $query): array
        return $this->__call(__FUNCTION__, func_get_args());

     * 插入记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @param boolean $getLastInsID 返回自增主键
     * @return mixed
    public function insert(BaseQuery $query, bool $getLastInsID = false)
        return $this->__call(__FUNCTION__, func_get_args());

     * 批量插入记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @param mixed $dataSet 数据集
     * @return integer
     * @throws \Exception
     * @throws \Throwable
    public function insertAll(BaseQuery $query, array $dataSet = []): int
        return $this->__call(__FUNCTION__, func_get_args());

     * 更新记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @return integer
    public function update(BaseQuery $query): int
        return $this->__call(__FUNCTION__, func_get_args());

     * 删除记录
     * @access public
     * @param BaseQuery $query 查询对象
     * @return int
    public function delete(BaseQuery $query): int
        return $this->__call(__FUNCTION__, func_get_args());

     * 得到某个字段的值
     * @access public
     * @param BaseQuery $query 查询对象
     * @param string $field 字段名
     * @param mixed $default 默认值
     * @return mixed
    public function value(BaseQuery $query, string $field, $default = null)
        return $this->__call(__FUNCTION__, func_get_args());

     * 得到某个列的数组
     * @access public
     * @param BaseQuery $query 查询对象
     * @param string|array $column 字段名 多个字段用逗号分隔
     * @param string $key 索引
     * @return array
    public function column(BaseQuery $query, $column, string $key = ''): array
        return $this->__call(__FUNCTION__, func_get_args());

     * 执行数据库事务
     * @access public
     * @param callable $callback 数据操作方法回调
     * @return mixed
     * @throws \Throwable
    public function transaction(callable $callback)
        return $this->__call(__FUNCTION__, func_get_args());

     * 启动事务
     * @access public
     * @return void
     * @throws \Exception
    public function startTrans()
        return $this->__call(__FUNCTION__, func_get_args());

     * 用于非自动提交状态下面的查询提交
     * @access public
     * @return void
    public function commit()
        return $this->__call(__FUNCTION__, func_get_args());

     * 事务回滚
     * @access public
     * @return void
    public function rollback()
        return $this->__call(__FUNCTION__, func_get_args());

     * 获取最近一次查询的sql语句
     * @access public
     * @return string
    public function getLastSql(): string
        return $this->__call(__FUNCTION__, func_get_args());

    public function table($table)
        return $this->__call(__FUNCTION__, func_get_args());

    public function name($name)
        return $this->__call(__FUNCTION__, func_get_args());

namespace think\swoole\pool\proxy;

use Psr\SimpleCache\CacheInterface;
use think\contract\CacheHandlerInterface;
use think\swoole\pool\Proxy;

class Store extends Proxy implements CacheHandlerInterface, CacheInterface
     * @inheritDoc
    public function has($name)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function get($name, $default = null)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function set($name, $value, $expire = null)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function inc(string $name, int $step = 1)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function dec(string $name, int $step = 1)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function delete($name)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function clear()
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function clearTag(array $keys)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function getMultiple($keys, $default = null)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function setMultiple($values, $ttl = null)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function deleteMultiple($keys)
        return $this->__call(__FUNCTION__, func_get_args());

     * @inheritDoc
    public function tag($name)
        return $this->__call(__FUNCTION__, func_get_args());



namespace think\swoole\pool;

use think\swoole\pool\proxy\Store;

class Cache extends \think\Cache
    protected function createDriver(string $name)
        return new Store(function () use ($name) {
            return parent::createDriver($name);
        }, $this->app->config->get('swoole.pool.cache', []));




namespace think\swoole\pool;

use Smf\ConnectionPool\Connectors\ConnectorInterface;

class Connector implements ConnectorInterface
    protected $connector;

    public function __construct($connector)
        $this->connector = $connector;

    public function connect(array $config)
        return call_user_func($this->connector, $config);

    public function disconnect($connection)


    public function isConnected($connection): bool
        return !property_exists($connection, Proxy::KEY_DISCONNECTED);

    public function reset($connection, array $config)


    public function validate($connection): bool
        return true;



namespace think\swoole\pool;

use think\Config;
use think\db\ConnectionInterface;
use think\swoole\pool\proxy\Connection;

 * Class Db
 * @package think\swoole\pool
 * @property Config $config
class Db extends \think\Db

    protected function createConnection(string $name): ConnectionInterface
        return new Connection(new class(function () use ($name) {
            return parent::createConnection($name);
        }) extends Connector {
            public function disconnect($connection)
                if ($connection instanceof ConnectionInterface) {
        }, $this->config->get('swoole.pool.db', []));




namespace think\swoole\pool;

use Closure;
use Exception;
use RuntimeException;
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\ConnectorInterface;
use Swoole\Coroutine;
use think\swoole\coroutine\Context;
use think\swoole\Pool;
use Throwable;

abstract class Proxy
    const KEY_RELEASED = '__released';

    const KEY_DISCONNECTED = '__disconnected';

    protected $pool;

     * Proxy constructor.
     * @param Closure|ConnectorInterface $connector
     * @param array                      $config
    public function __construct($connector, $config, array $connectionConfig = [])
        if ($connector instanceof Closure) {
            $connector = new Connector($connector);

        $this->pool = new ConnectionPool(


    protected function getPoolConnection()
        return Context::rememberData('connection.' . spl_object_id($this), function () {
            $connection = $this->pool->borrow();

            $connection->{static::KEY_RELEASED} = false;

            Coroutine::defer(function () use ($connection) {

            return $connection;

    protected function releaseConnection($connection)
        if ($connection->{static::KEY_RELEASED}) {
        $connection->{static::KEY_RELEASED} = true;

    public function release()
        $connection = $this->getPoolConnection();

    public function __call($method, $arguments)
        $connection = $this->getPoolConnection();
        if ($connection->{static::KEY_RELEASED}) {
            throw new RuntimeException('Connection already has been released!');

        try {
            return $connection->{$method}(...$arguments);
        } catch (Exception|Throwable $e) {
            $connection->{static::KEY_DISCONNECTED} = true;
            throw $e;




namespace think\swoole\pool;

use Smf\ConnectionPool\Connectors\ConnectorInterface;
use think\helper\Arr;

class Client implements ConnectorInterface

     * Connect to the specified Server and returns the connection resource
     * @param array $config
     * @return \Swoole\Coroutine\Client
    public function connect(array $config)
        $client = new \Swoole\Coroutine\Client(SWOOLE_SOCK_TCP);

        $host    = Arr::pull($config, 'host');
        $port    = Arr::pull($config, 'port');
        $timeout = Arr::pull($config, 'timeout', 5);


        $client->connect($host, $port, $timeout);

        return $client;

     * Disconnect and free resources
     * @param \Swoole\Coroutine\Client $connection
     * @return mixed
    public function disconnect($connection)

     * Whether the connection is established
     * @param \Swoole\Coroutine\Client $connection
     * @return bool
    public function isConnected($connection): bool
        return $connection->isConnected() && $connection->peek() !== '';

     * Reset the connection
     * @param \Swoole\Coroutine\Client $connection
     * @param array $config
     * @return mixed
    public function reset($connection, array $config)

     * Validate the connection
     * @param \Swoole\Coroutine\Client $connection
     * @return bool
    public function validate($connection): bool
        return $connection instanceof \Swoole\Coroutine\Client;




namespace think\swoole\resetters;

use think\App;
use think\Model;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

class ResetModel implements ResetterInterface

    public function handle(App $app, Sandbox $sandbox)
        if (class_exists(Model::class)) {
            Model::setInvoker(function (...$args) use ($sandbox) {
                return $sandbox->getApplication()->invoke(...$args);



namespace think\swoole\resetters;

use think\App;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

class ResetConfig implements ResetterInterface

    public function handle(App $app, Sandbox $sandbox)
        $app->instance('config', clone $sandbox->getConfig());

        return $app;



namespace think\swoole\resetters;

use think\App;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

class ClearInstances implements ResetterInterface
    public function handle(App $app, Sandbox $sandbox)
        $instances = ['log', 'session', 'view', 'response', 'cookie'];

        $instances = array_merge($instances, $sandbox->getConfig()->get('swoole.instances', []));

        foreach ($instances as $instance) {

        return $app;



namespace think\swoole\resetters;

use think\App;
use think\swoole\concerns\ModifyProperty;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

 * Class ResetEvent
 * @package think\swoole\resetters
class ResetEvent implements ResetterInterface
    use ModifyProperty;

    public function handle(App $app, Sandbox $sandbox)
        $event = clone $sandbox->getEvent();
        $this->modifyProperty($event, $app);
        $app->instance('event', $event);

        return $app;



namespace think\swoole\resetters;

use think\App;
use think\swoole\concerns\ModifyProperty;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

 * Class ResetService
 * @package think\swoole\resetters
class ResetService implements ResetterInterface
    use ModifyProperty;

     * "handle" function for resetting app.
     * @param App $app
     * @param Sandbox $sandbox
    public function handle(App $app, Sandbox $sandbox)
        foreach ($sandbox->getServices() as $service) {
            $this->modifyProperty($service, $app);
            if (method_exists($service, 'register')) {
            if (method_exists($service, 'boot')) {
                $app->invoke([$service, 'boot']);




namespace think\swoole\resetters;

use think\App;
use think\Paginator;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;

class ResetPaginator implements ResetterInterface

    public function handle(App $app, Sandbox $sandbox)
        Paginator::currentPathResolver(function () use ($sandbox) {
            return $sandbox->getApplication()->request->baseUrl();

        Paginator::currentPageResolver(function ($varPage = 'page') use ($sandbox) {

            $page = $sandbox->getApplication()->request->param($varPage);

            if (filter_var($page, FILTER_VALIDATE_INT) !== false && (int) $page >= 1) {
                return (int) $page;

            return 1;




namespace think\swoole\response;

use DateTime;
use RuntimeException;
use SplFileInfo;
use think\Response;

class File extends Response
    public const DISPOSITION_ATTACHMENT = 'attachment';
    public const DISPOSITION_INLINE     = 'inline';

    protected $header = [
        'Content-Type'  => 'application/octet-stream',
        'Accept-Ranges' => 'bytes',

     * @var SplFileInfo
    protected $file;

    public function __construct($file, string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true)
        $this->setFile($file, $contentDisposition, $autoEtag, $autoLastModified, $autoContentType);

    public function getFile()
        return $this->file;

    public function setFile($file, string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true)
        if (!$file instanceof SplFileInfo) {
            $file = new SplFileInfo((string) $file);

        if (!$file->isReadable()) {
            throw new RuntimeException('File must be readable.');

        $this->header['Content-Length'] = $file->getSize();

        $this->file = $file;

        if ($autoEtag) {

        if ($autoLastModified) {

        if ($contentDisposition) {

        if ($autoContentType) {

        return $this;

    public function setAutoContentType()
        $finfo = finfo_open(FILEINFO_MIME_TYPE);

        $mimeType = finfo_file($finfo, $this->file->getPathname());
        if ($mimeType) {
            $this->header['Content-Type'] = $mimeType;

    public function setContentDisposition(string $disposition, string $filename = '')
        if ('' === $filename) {
            $filename = $this->file->getFilename();

        $this->header['Content-Disposition'] = "{$disposition}; filename=\"{$filename}\"";

        return $this;

    public function setAutoLastModified()
        $date = DateTime::createFromFormat('U', $this->file->getMTime());
        return $this->lastModified($date->format('D, d M Y H:i:s') . ' GMT');

    public function setAutoEtag()
        $eTag = "W/\"" . sha1_file($this->file->getPathname()) . "\"";

        return $this->eTag($eTag);

    protected function sendData(string $data): void




namespace think\swoole\rpc;

class Protocol
    const ACTION_INTERFACE = '@action_interface';
    const FILE             = '@file';

     * @var string
    private $interface = '';

     * @var string
    private $method = '';

     * @var array
    private $params = [];

     * @var array
    private $context = [];

     * Replace constructor
     * @param string $interface
     * @param string $method
     * @param array $params
     * @param array $context
     * @return Protocol
    public static function make(string $interface, string $method, array $params, array $context = [])
        $instance = new static();

        $instance->interface = $interface;
        $instance->method    = $method;
        $instance->params    = $params;
        $instance->context   = $context;

        return $instance;

     * @return string
    public function getInterface(): string
        return $this->interface;

     * @return string
    public function getMethod(): string
        return $this->method;

     * @return array
    public function getParams(): array
        return $this->params;

     * @return array
    public function getContext(): array
        return $this->context;

     * @param string $interface
    public function setInterface(string $interface): void
        $this->interface = $interface;

     * @param string $method
    public function setMethod(string $method): void
        $this->method = $method;

     * @param array $params
    public function setParams(array $params): void
        $this->params = $params;

     * @param array $context
    public function setContext(array $context): void
        $this->context = $context;




namespace think\swoole\rpc;

use Throwable;

class File extends \think\File
    public function __destruct()
        try {
            if (file_exists($this->getPathname())) {
        } catch (Throwable $e) {




namespace think\swoole\rpc\server;

use ReflectionClass;
use ReflectionException;
use ReflectionMethod;
use ReflectionNamedType;
use RuntimeException;
use Swoole\Coroutine\Server\Connection;
use think\App;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\Middleware;
use think\swoole\rpc\Error;
use think\swoole\rpc\Packer;
use think\swoole\rpc\Protocol;
use think\swoole\rpc\Sendfile;
use Throwable;

class Dispatcher
    use Sendfile;

     * Parser error
    const PARSER_ERROR = -32700;

     * Invalid Request
    const INVALID_REQUEST = -32600;

     * Method not found
    const METHOD_NOT_FOUND = -32601;

     * Invalid params
    const INVALID_PARAMS = -32602;

     * Internal error
    const INTERNAL_ERROR = -32603;

    protected $parser;

    protected $services = [];

    protected $middleware = [];

    public function __construct(ParserInterface $parser, $services, $middleware = [])
        $this->parser = $parser;
        $this->middleware = $middleware;

     * 获取服务接口
     * @param $services
     * @throws ReflectionException
    protected function prepareServices($services)
        foreach ($services as $className) {
            $reflectionClass = new ReflectionClass($className);
            $interfaces      = $reflectionClass->getInterfaceNames();
            if (!empty($interfaces)) {
                foreach ($interfaces as $interface) {
                    $this->services[class_basename($interface)] = [
                        'interface' => $interface,
                        'class'     => $className,
            } else {
                $this->services[class_basename($className)] = [
                    'interface' => $className,
                    'class'     => $className,

     * 获取接口信息
     * @return array
    protected function getInterfaces()
        $interfaces = [];
        foreach ($this->services as $key => ['interface' => $interface]) {
            $interfaces[$key] = $this->getMethods($interface);
        return $interfaces;

    protected function getMethods($interface)
        $methods = [];

        $reflection = new ReflectionClass($interface);
        foreach ($reflection->getMethods(ReflectionMethod::IS_PUBLIC) as $method) {
            if ($method->isConstructor() || $method->isDestructor()) {
            $returnType = $method->getReturnType();
            if ($returnType instanceof ReflectionNamedType) {
                $returnType = $returnType->getName();
            $methods[$method->getName()] = [
                'parameters' => $this->getParameters($method),
                'returnType' => $returnType,
                'comment'    => $method->getDocComment(),
        return $methods;

    protected function getParameters(ReflectionMethod $method)
        $parameters = [];
        foreach ($method->getParameters() as $parameter) {
            $type = $parameter->getType();
            if ($type instanceof ReflectionNamedType) {
                $type = $type->getName();
            $param = [
                'name' => $parameter->getName(),
                'type' => $type,

            if ($parameter->isOptional()) {
                $param['default'] = $parameter->getDefaultValue();

            if ($parameter->allowsNull()) {
                $param['nullable'] = true;

            $parameters[] = $param;
        return $parameters;

     * 调度
     * @param App          $app
     * @param Connection   $conn
     * @param string|Error $data
     * @param array        $files
    public function dispatch(App $app, Connection $conn, $data, $files = [])
        try {
            switch (true) {
                case $data instanceof Error:
                    $result = $data;
                case $data === Protocol::ACTION_INTERFACE:
                    $result = $this->getInterfaces();
                    $protocol = $this->parser->decode($data);
                    $result   = $this->dispatchWithMiddleware($app, $protocol, $files);
        } catch (Throwable $e) {
            $result = Error::make($e->getCode(), $e->getMessage());

        if ($result instanceof \think\File) {
            foreach ($this->fread($result) as $string) {
                if (!empty($string)) {
            $result = Protocol::FILE;

        $data = $this->parser->encodeResponse($result);


    protected function dispatchWithMiddleware(App $app, Protocol $protocol, $files)
        $interface = $protocol->getInterface();
        $method    = $protocol->getMethod();
        $params    = $protocol->getParams();

        foreach ($params as $index => $param) {
            if ($param === Protocol::FILE) {
                $params[$index] = array_shift($files);

        $service = $this->services[$interface] ?? null;
        if (empty($service)) {
            throw new RuntimeException(
                sprintf('Service %s is not founded!', $interface),

        $instance    = $app->make($service['class']);
        $middlewares = array_merge($this->middleware, $this->getServiceMiddlewares($instance, $method));

        return Middleware::make($app, $middlewares)
                         ->then(function () use ($instance, $method, $params) {
                             return call_user_func_array([$instance, $method], $params);

    protected function getServiceMiddlewares($service, $method)
        $middlewares = [];

        $class = new ReflectionClass($service);

        if ($class->hasProperty('middleware')) {
            $reflectionProperty = $class->getProperty('middleware');

            foreach ($reflectionProperty->getValue($service) as $key => $val) {
                if (!is_int($key)) {
                    $middleware = $key;
                    $options    = $val;
                } elseif (isset($val['middleware'])) {
                    $middleware = $val['middleware'];
                    $options    = $val['options'] ?? [];
                } else {
                    $middleware = $val;
                    $options    = [];

                if ((isset($options['only']) && !in_array($method, (array) $options['only'])) ||
                    (!empty($options['except']) && in_array($method, (array) $options['except']))) {

                if (is_string($middleware) && strpos($middleware, ':')) {
                    $middleware = explode(':', $middleware);
                    if (count($middleware) > 1) {
                        $middleware = [$middleware[0], array_slice($middleware, 1)];

                $middlewares[] = $middleware;

        return $middlewares;

namespace think\swoole\rpc\server;

use Swoole\Coroutine;
use think\swoole\rpc\packer\Buffer;
use think\swoole\rpc\packer\File;

class Channel
    protected $header;

    protected $queue;

    public function __construct($handler)
        $this->queue = new Coroutine\Channel(1);
        Coroutine::create(function () use ($handler) {

     * @return File|Buffer
    public function pop()
        return $this->queue->pop();

    public function push($handle)
        return $this->queue->push($handle);

    public function close()
        return $this->queue->close();




namespace think\swoole\rpc;

use RuntimeException;
use think\swoole\rpc\packer\Buffer;
use think\swoole\rpc\packer\File;

class Packer
    public const HEADER_SIZE   = 8;
    public const HEADER_STRUCT = 'Nlength/Ntype';
    public const HEADER_PACK   = 'NN';

    public const TYPE_BUFFER = 0;
    public const TYPE_FILE   = 1;

    public static function pack($data, $type = self::TYPE_BUFFER)
        return pack(self::HEADER_PACK, strlen($data), $type) . $data;

     * @param $data
     * @return array<Buffer|File|string>
    public static function unpack($data)
        $header = unpack(self::HEADER_STRUCT, substr($data, 0, self::HEADER_SIZE));
        if ($header === false) {
            throw new RuntimeException('Invalid Header');

        switch ($header['type']) {
            case Packer::TYPE_BUFFER:
                $handler = new Buffer($header['length']);
            case Packer::TYPE_FILE:
                $handler = new File($header['length']);
                throw new RuntimeException("unsupported data type: [{$header['type']}");

        $data = substr($data, self::HEADER_SIZE);

        return [$handler, $data];



namespace think\swoole\rpc;

trait Sendfile
    protected function fread(\think\File $file)
        try {
            $handle = fopen($file->getPathname(), 'rb');
            yield pack(Packer::HEADER_PACK, $file->getSize(), Packer::TYPE_FILE);
            while (!feof($handle)) {
                yield fread($handle, 8192);
        } finally {



namespace think\swoole\rpc;

class Error implements \JsonSerializable
     * @var int
    protected $code = 0;

     * @var string
    protected $message = '';

     * @var mixed
    protected $data;

     * @param int    $code
     * @param string $message
     * @param mixed  $data
     * @return Error
    public static function make(int $code, string $message, $data = null): self
        $instance = new static();

        $instance->code    = $code;
        $instance->message = $message;
        $instance->data    = $data;

        return $instance;

     * @return int
    public function getCode(): int
        return $this->code;

     * @return string
    public function getMessage(): string
        return $this->message;

     * @return mixed
    public function getData()
        return $this->data;

    public function jsonSerialize()
        return [
            'code'    => $this->code,
            'message' => $this->message,
            'data'    => $this->data,



namespace think\swoole\rpc;

use Exception;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\rpc\server\Dispatcher;

class JsonParser implements ParserInterface
     * Json-rpc version
    const VERSION = '2.0';

    const DELIMITER = "@";

     * @param Protocol $protocol
     * @return string
    public function encode(Protocol $protocol): string
        $interface  = $protocol->getInterface();
        $methodName = $protocol->getMethod();

        $method = $interface . self::DELIMITER . $methodName;
        $data   = [
            'jsonrpc' => self::VERSION,
            'method'  => $method,
            'params'  => $protocol->getParams(),
            'context' => $protocol->getContext(),
            'id'      => '',

        return json_encode($data, JSON_UNESCAPED_UNICODE);

     * @param string $string
     * @return Protocol
    public function decode(string $string): Protocol
        $data = json_decode($string, true);

        $error = json_last_error();
        if ($error != JSON_ERROR_NONE) {
            throw new Exception(
                sprintf('Data(%s) is not json format!', $string),

        $method  = $data['method'] ?? '';
        $params  = $data['params'] ?? [];
        $context = $data['context'] ?? [];

        if (empty($method)) {
            throw new Exception(
                sprintf('Method(%s) cant not be empty!', $string),

        $methodAry = explode(self::DELIMITER, $method);
        if (count($methodAry) < 2) {
            throw new Exception(
                sprintf('Method(%s) is bad format!', $method),

        [$interfaceClass, $methodName] = $methodAry;

        if (empty($interfaceClass) || empty($methodName)) {
            throw new Exception(
                sprintf('Interface(%s) or Method(%s) can not be empty!', $interfaceClass, $method),

        return Protocol::make($interfaceClass, $methodName, $params, $context);

     * @param string $string
     * @return mixed
    public function decodeResponse(string $string)
        $data = json_decode($string, true);

        if (array_key_exists('result', $data)) {
            return $data['result'];

        $code    = $data['error']['code'] ?? 0;
        $message = $data['error']['message'] ?? '';
        $data    = $data['error']['data'] ?? null;

        return Error::make($code, $message, $data);

     * @param mixed $result
     * @return string
    public function encodeResponse($result): string
        $data = [
            'jsonrpc' => self::VERSION,
            'id'      => '',

        if ($result instanceof Error) {
            $data['error'] = $result;
        } else {
            $data['result'] = $result;

        return json_encode($data);



namespace think\swoole\rpc\client;

interface Service
    public function withContext($context): self;

namespace think\swoole\rpc\client;

use Generator;

interface Connector
     * @param Generator|string $data
     * @param callable $decoder
     * @return string
    public function sendAndRecv($data, callable $decoder);

namespace think\swoole\rpc\client;

use InvalidArgumentException;
use Nette\PhpGenerator\Factory;
use Nette\PhpGenerator\PhpNamespace;
use ReflectionClass;
use think\App;
use think\swoole\Middleware;
use think\swoole\rpc\Protocol;

abstract class Proxy implements Service
    protected $interface;

    /** @var Gateway */
    protected $gateway;

    /** @var App */
    protected $app;

    protected $middleware = [];

    protected $context = [];

    final public function __construct(App $app, Gateway $gateway, $middleware)
        $this->app        = $app;
        $this->gateway    = $gateway;
        $this->middleware = $middleware;

    final protected function proxyCall($method, $params)
        $protocol = Protocol::make($this->interface, $method, $params, $this->context);

        return Middleware::make($this->app, $this->middleware)
                         ->then(function (Protocol $protocol) {
                             return $this->gateway->call($protocol);

    final public function withContext($context): self
        $this->context = $context;
        return $this;

    final public static function getClassName($client, $interface)
        if (!interface_exists($interface)) {
            throw new InvalidArgumentException(
                sprintf('%s must be exist interface!', $interface)

        $proxyName = class_basename($interface) . "Service";
        $className = "rpc\\service\\{$client}\\{$proxyName}";

        if (!class_exists($className, false)) {

            $namespace = new PhpNamespace("rpc\\service\\{$client}");

            $class = $namespace->addClass($proxyName);

            $class->addProperty('interface', class_basename($interface));

            $reflection = new ReflectionClass($interface);

            foreach ($reflection->getMethods() as $methodRef) {
                if ($methodRef->getDeclaringClass()->name == Service::class) {
                $method = (new Factory)->fromMethodReflection($methodRef);
                $body   = "\$this->proxyCall('{$methodRef->getName()}', func_get_args());";
                if ($method->getReturnType() != 'void') {
                    $body = "return {$body}";

        return $className;

namespace think\swoole\rpc\client;

use Closure;
use Swoole\Coroutine\Client;
use think\File;
use think\helper\Arr;
use think\swoole\concerns\InteractsWithRpcConnector;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\exception\RpcClientException;
use think\swoole\exception\RpcResponseException;
use think\swoole\rpc\Error;
use think\swoole\rpc\Packer;
use think\swoole\rpc\Protocol;
use think\swoole\rpc\Sendfile;

class Gateway
    use Sendfile;

    /** @var Connector */
    protected $connector;

    /** @var ParserInterface */
    protected $parser;

    protected $tries;

     * Gateway constructor.
     * @param Connector|array $connector
     * @param ParserInterface $parser
    public function __construct($connector, ParserInterface $parser, $tries = 2)
        if (is_array($connector)) {
            $connector = $this->createDefaultConnector($connector);
        $this->connector = $connector;
        $this->parser    = $parser;
        $this->tries     = $tries;

    protected function encodeData(Protocol $protocol)
        $params = $protocol->getParams();

        foreach ($params as $index => $param) {
            if ($param instanceof File) {
                yield from $this->fread($param);
                $params[$index] = Protocol::FILE;


        $data = $this->parser->encode($protocol);

        yield Packer::pack($data);

    protected function decodeResponse($response)
        $result = $this->parser->decodeResponse($response);

        if ($result instanceof Error) {
            throw new RpcResponseException($result);

        return $result;

    protected function sendAndRecv($data)
        return $this->connector->sendAndRecv($data, Closure::fromCallable([$this, 'decodeResponse']));

    public function call(Protocol $protocol)
        if ($this->tries > 1) {
            $result = backoff(function () use ($protocol) {
                try {
                    return $this->sendAndRecv($this->encodeData($protocol));
                } catch (RpcResponseException $e) {
                    return $e;
            }, $this->tries);

            if ($result instanceof RpcResponseException) {
                throw $result;

            return $result;
        } else {
            return $this->sendAndRecv($this->encodeData($protocol));

    public function getServices()
        return $this->sendAndRecv(Packer::pack(Protocol::ACTION_INTERFACE));

    protected function createDefaultConnector($config)
        return new class($config) implements Connector {

            use InteractsWithRpcConnector;

            /** @var Client */
            protected $client;
            protected $config;

             *  constructor.
             * @param [] $config
            public function __construct($config)
                $this->config = $config;

            protected function isConnected(): bool
                return $this->client && $this->client->isConnected() && $this->client->peek() !== '';

            protected function getClient()
                if (!$this->isConnected()) {
                    $client = new Client(SWOOLE_SOCK_TCP);

                    $config = $this->config;

                    $host    = Arr::pull($config, 'host');
                    $port    = Arr::pull($config, 'port');
                    $timeout = Arr::pull($config, 'timeout', 5);


                    if (!$client->connect($host, $port, $timeout)) {
                        throw new RpcClientException(
                            sprintf('Connect failed host=%s port=%d', $host, $port)

                    $this->client = $client;
                return $this->client;

            protected function runWithClient($callback)
                return $callback($this->getClient());



namespace think\swoole\rpc\packer;

class File
    protected $name;
    protected $handle;
    protected $length;

    public function __construct($length)
        $this->length = $length;

    public function write(&$data)
        if (!$this->handle) {
            $this->name   = tempnam(sys_get_temp_dir(), 'swoole_rpc_');
            $this->handle = fopen($this->name, 'ab');

        $size   = fstat($this->handle)['size'];
        $string = substr($data, 0, $this->length - $size);

        fwrite($this->handle, $string);

        if (strlen($data) >= $this->length - $size) {
            $data = substr($data, $this->length - $size);

            return new \think\swoole\rpc\File($this->name);
        } else {
            $data = '';

namespace think\swoole\rpc\packer;

class Buffer
    protected $data = '';
    protected $length;

    public function __construct($length)
        $this->length = $length;

    public function write(&$data)
        $size   = strlen($this->data);
        $string = substr($data, 0, $this->length - $size);

        $this->data .= $string;

        if (strlen($data) >= $this->length - $size) {
            $data = substr($data, $this->length - $size);

            return $this->data;
        } else {
            $data = '';




namespace think\swoole\watcher;

interface Driver
    public function watch(callable $callback);



namespace think\swoole\watcher;

use InvalidArgumentException;
use Swoole\Coroutine\System;
use Swoole\Timer;
use think\helper\Str;

class Find implements Driver
    protected $name;
    protected $directory;
    protected $exclude;

    public function __construct($directory, $exclude, $name)
        $ret = System::exec('which find');
        if (empty($ret['output'])) {
            throw new InvalidArgumentException('find not exists.');
        $ret = System::exec('find --help', true);
        if (Str::contains($ret['output'] ?? '', 'BusyBox')) {
            throw new InvalidArgumentException('find version not support.');

        $this->directory = $directory;
        $this->exclude   = $exclude;
        $this->name      = $name;

    public function watch(callable $callback)
        $ms      = 2000;
        $seconds = ceil(($ms + 1000) / 1000);
        $minutes = sprintf('-%.2f', $seconds / 60);

        $dest = implode(' ', $this->directory);

        $name    = empty($this->name) ? '' : ' \( ' . join(' -o ', array_map(fn ($v) => "-name \"{$v}\"", $this->name)) . ' \)';
        $notName = '';
        $notPath = '';
        if (!empty($this->exclude)) {
            $excludeDirs = $excludeFiles = [];
            foreach ($this->exclude as $directory) {
                $directory = rtrim($directory, '/');
                if (is_dir($directory)) {
                    $excludeDirs[] = $directory;
                } else {
                    $excludeFiles[] = $directory;

            if (!empty($excludeFiles)) {
                $notPath = ' -not \( ' . join(' -and ', array_map(fn ($v) => "-name \"{$v}\"", $excludeFiles)) . ' \)';

            if (!empty($excludeDirs)) {
                $notPath = ' -not \( ' . join(' -and ', array_map(fn ($v) => "-path \"{$v}/*\"", $excludeDirs)) . ' \)';

        $command = "find {$dest}{$name}{$notName}{$notPath} -mmin {$minutes} -type f -print";

        Timer::tick($ms, function () use ($callback, $command) {
            $ret = System::exec($command);
            if ($ret['code'] === 0 && strlen($ret['output'])) {
                $stdout = trim($ret['output']);
                if (!empty($stdout)) {




namespace think\swoole\watcher;

use Swoole\Timer;
use Symfony\Component\Finder\Finder;
use Symfony\Component\Finder\SplFileInfo;

class Scan implements Driver
    protected $finder;

    protected $files = [];

    public function __construct($directory, $exclude, $name)
        $this->finder = new Finder();
            ->in(array_filter((array) $directory, function ($dir) {
                return is_dir($dir);

    protected function findFiles()
        $files = [];
        /** @var SplFileInfo $f */
        foreach ($this->finder as $f) {
            $files[$f->getRealpath()] = $f->getMTime();
        return $files;

    public function watch(callable $callback)
        $this->files = $this->findFiles();

        Timer::tick(2000, function () use ($callback) {

            $files = $this->findFiles();

            foreach ($files as $path => $time) {
                if (empty($this->files[$path]) || $this->files[$path] != $time) {

            $this->files = $files;




namespace think\swoole\websocket\middleware;

use Closure;
use think\App;
use think\Request;
use think\Response;
use think\Session;

class SessionInit
    /** @var App */
    protected $app;

    /** @var Session */
    protected $session;

    public function __construct(App $app, Session $session)
        $this->app     = $app;
        $this->session = $session;

     * Session初始化
     * @access public
     * @param Request $request
     * @param Closure $next
     * @return Response
    public function handle($request, Closure $next)
        // Session初始化
        $varSessionId = $this->app->config->get('session.var_session_id');
        $cookieName   = $this->session->getName();

        if ($varSessionId && $request->request($varSessionId)) {
            $sessionId = $request->request($varSessionId);
        } else {
            $sessionId = $request->cookie($cookieName);

        if ($sessionId) {



        return $next($request);



namespace think\swoole\websocket;

class Event
    public $type;
    public $data;

    public function __construct($type, $data)
        $this->type = $type;
        $this->data = $data;



namespace think\swoole\websocket\room;

use InvalidArgumentException;
use Redis as PHPRedis;
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\PhpRedisConnector;
use think\helper\Arr;
use think\swoole\contract\websocket\RoomInterface;
use think\swoole\Manager;
use think\swoole\Pool;

 * Class RedisRoom
class Redis implements RoomInterface
     * @var array
    protected $config;

     * @var string
    protected $prefix = 'swoole:';

    /** @var Manager */
    protected $manager;

    /** @var ConnectionPool */
    protected $pool;

     * RedisRoom constructor.
     * @param Manager $manager
     * @param array $config
    public function __construct(Manager $manager, array $config)
        $this->manager = $manager;
        $this->config  = $config;

        if ($prefix = Arr::get($this->config, 'prefix')) {
            $this->prefix = $prefix;

     * @return RoomInterface
    public function prepare(): RoomInterface
        return $this;

    protected function prepareRedis()
        $this->manager->onEvent('workerStart', function () {
            $config     = $this->config;
            $this->pool = new ConnectionPool(
                new PhpRedisConnector(),
            $this->manager->getPools()->add('websocket.room', $this->pool);

    protected function initData()
        $connector = new PhpRedisConnector();

        $connection = $connector->connect($this->config);

        if (count($keys = $connection->keys("{$this->prefix}*"))) {


     * Add multiple socket fds to a room.
     * @param string $fd
     * @param array|string $roomNames
    public function add($fd, $roomNames)
        $rooms = is_array($roomNames) ? $roomNames : [$roomNames];

        $this->addValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);

        foreach ($rooms as $room) {
            $this->addValue($room, [$fd], RoomInterface::ROOMS_KEY);

     * Delete multiple socket fds from a room.
     * @param string fd
     * @param array|string rooms
    public function delete($fd, $rooms)
        $rooms = is_array($rooms) ? $rooms : [$rooms];
        $rooms = count($rooms) ? $rooms : $this->getRooms($fd);

        $this->removeValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);

        foreach ($rooms as $room) {
            $this->removeValue($room, [$fd], RoomInterface::ROOMS_KEY);

    protected function runWithRedis(\Closure $callable)
        $redis = $this->pool->borrow();
        try {
            return $callable($redis);
        } finally {

     * Add value to redis.
     * @param        $key
     * @param array $values
     * @param string $table
     * @return $this
    protected function addValue($key, array $values, string $table)
        $redisKey = $this->getKey($key, $table);

        $this->runWithRedis(function (PHPRedis $redis) use ($redisKey, $values) {
            $pipe = $redis->multi(PHPRedis::PIPELINE);

            foreach ($values as $value) {
                $pipe->sadd($redisKey, $value);


        return $this;

     * Remove value from redis.
     * @param        $key
     * @param array $values
     * @param string $table
     * @return $this
    protected function removeValue($key, array $values, string $table)
        $redisKey = $this->getKey($key, $table);

        $this->runWithRedis(function (PHPRedis $redis) use ($redisKey, $values) {
            $pipe = $redis->multi(PHPRedis::PIPELINE);
            foreach ($values as $value) {
                $pipe->srem($redisKey, $value);

        return $this;

     * Get all sockets by a room key.
     * @param string room
     * @return array
    public function getClients(string $room)
        return $this->getValue($room, RoomInterface::ROOMS_KEY) ?? [];

     * Get all rooms by a fd.
     * @param string fd
     * @return array
    public function getRooms($fd)
        return $this->getValue($fd, RoomInterface::DESCRIPTORS_KEY) ?? [];

     * Check table for rooms and descriptors.
     * @param string $table
    protected function checkTable(string $table)
        if (!in_array($table, [RoomInterface::ROOMS_KEY, RoomInterface::DESCRIPTORS_KEY])) {
            throw new InvalidArgumentException("Invalid table name: `{$table}`.");

     * Get value.
     * @param string $key
     * @param string $table
     * @return array
    protected function getValue(string $key, string $table)

        return $this->runWithRedis(function (PHPRedis $redis) use ($table, $key) {
            return $redis->smembers($this->getKey($key, $table));

     * Get key.
     * @param string $key
     * @param string $table
     * @return string
    protected function getKey(string $key, string $table)
        return "{$this->prefix}{$table}:{$key}";


namespace think\swoole\websocket\room;

use InvalidArgumentException;
use Swoole\Table as SwooleTable;
use think\swoole\contract\websocket\RoomInterface;

class Table implements RoomInterface
     * @var array
    protected $config = [
        'room_rows'   => 8192,
        'room_size'   => 2048,
        'client_rows' => 4096,
        'client_size' => 2048,

     * @var SwooleTable
    protected $rooms;

     * @var SwooleTable
    protected $fds;

     * TableRoom constructor.
     * @param array $config
    public function __construct(array $config)
        $this->config = array_merge($this->config, $config);

     * Do some init stuffs before workers started.
     * @return RoomInterface
    public function prepare(): RoomInterface

        return $this;

     * Add multiple socket fds to a room.
     * @param string fd
     * @param array|string $roomNames
    public function add($fd, $roomNames)
        $rooms     = $this->getRooms($fd);
        $roomNames = is_array($roomNames) ? $roomNames : [$roomNames];

        foreach ($roomNames as $room) {
            $fds = $this->getClients($room);

            if (in_array($fd, $fds)) {

            $fds[]   = $fd;
            $rooms[] = $room;

            $this->setClients($room, $fds);

        $this->setRooms($fd, $rooms);

     * Delete multiple socket fds from a room.
     * @param string fd
     * @param array|string $roomNames
    public function delete($fd, $roomNames = [])
        $allRooms  = $this->getRooms($fd);
        $roomNames = is_array($roomNames) ? $roomNames : [$roomNames];
        $rooms     = count($roomNames) ? $roomNames : $allRooms;

        $removeRooms = [];
        foreach ($rooms as $room) {
            $fds = $this->getClients($room);

            if (!in_array($fd, $fds)) {

            $this->setClients($room, array_values(array_diff($fds, [$fd])));
            $removeRooms[] = $room;

        $this->setRooms($fd, collect($allRooms)->diff($removeRooms)->values()->toArray());

     * Get all sockets by a room key.
     * @param string room
     * @return array
    public function getClients(string $room)
        return $this->getValue($room, RoomInterface::ROOMS_KEY) ?? [];

     * Get all rooms by a fd.
     * @param string fd
     * @return array
    public function getRooms($fd)
        return $this->getValue($fd, RoomInterface::DESCRIPTORS_KEY) ?? [];

     * @param string $room
     * @param array $fds
     * @return $this
    protected function setClients(string $room, array $fds)
        return $this->setValue($room, $fds, RoomInterface::ROOMS_KEY);

     * @param string $fd
     * @param array $rooms
     * @return $this
    protected function setRooms($fd, array $rooms)
        return $this->setValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);

     * Init rooms table
    protected function initRoomsTable(): void
        $this->rooms = new SwooleTable($this->config['room_rows']);
        $this->rooms->column('value', SwooleTable::TYPE_STRING, $this->config['room_size']);

     * Init descriptors table
    protected function initFdsTable()
        $this->fds = new SwooleTable($this->config['client_rows']);
        $this->fds->column('value', SwooleTable::TYPE_STRING, $this->config['client_size']);

     * Set value to table
     * @param        $key
     * @param array $value
     * @param string $table
     * @return $this
    public function setValue($key, array $value, string $table)

        if (empty($value)) {
        } else {
            $this->$table->set($key, ['value' => json_encode($value)]);

        return $this;

     * Get value from table
     * @param string $key
     * @param string $table
     * @return array|mixed
    public function getValue(string $key, string $table)

        $value = $this->$table->get($key);

        return $value ? json_decode($value['value'], true) : [];

     * Check table for exists
     * @param string $table
    protected function checkTable(string $table)
        if (!property_exists($this, $table) || !$this->$table instanceof SwooleTable) {
            throw new InvalidArgumentException("Invalid table name: `{$table}`.");



namespace think\swoole\websocket;

use Swoole\WebSocket\Frame;
use think\Event;
use think\Request;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\websocket\Event as WsEvent;

class Handler implements HandlerInterface
    protected $event;

    public function __construct(Event $event)
        $this->event = $event;

     * "onOpen" listener.
     * @param Request $request
    public function onOpen(Request $request)
        $this->event->trigger('swoole.websocket.Open', $request);

     * "onMessage" listener.
     * @param Frame $frame
    public function onMessage(Frame $frame)
        $this->event->trigger('swoole.websocket.Message', $frame);

        $this->event->trigger('swoole.websocket.Event', $this->decode($frame->data));

     * "onClose" listener.
    public function onClose()

    protected function decode($payload)
        $data = json_decode($payload, true);

        return new WsEvent($data['type'] ?? null, $data['data'] ?? null);

    public function encodeMessage($message)
        if ($message instanceof WsEvent) {
            return json_encode([
                'type' => $message->type,
                'data' => $message->data,
        return $message;



namespace think\swoole\websocket;

use think\Manager;
use think\swoole\websocket\room\Table;

 * Class Room
 * @package think\swoole\websocket
 * @mixin Table
class Room extends Manager
    protected $namespace = "\\think\\swoole\\websocket\\room\\";

    protected function resolveConfig(string $name)
        return $this->app->config->get("swoole.websocket.room.{$name}", []);

     * 默认驱动
     * @return string|null
    public function getDefaultDriver()
        return $this->app->config->get('swoole.websocket.room.type', 'table');



namespace think\swoole\websocket\message;

class PushMessage
    public $fd;
    public $data;

    public function __construct($fd, $data)
        $this->fd   = $fd;
        $this->data = $data;



namespace think\swoole\websocket\socketio;

class EnginePacket
     * Engine.io packet type `open`.
    const OPEN = 0;

     * Engine.io packet type `close`.
    const CLOSE = 1;

     * Engine.io packet type `ping`.
    const PING = 2;

     * Engine.io packet type `pong`.
    const PONG = 3;

     * Engine.io packet type `message`.
    const MESSAGE = 4;

     * Engine.io packet type 'upgrade'
    const UPGRADE = 5;

     * Engine.io packet type `noop`.
    const NOOP = 6;

    public $type;

    public $data = '';

    public function __construct($type, $data = '')
        $this->type = $type;
        $this->data = $data;

    public static function open($payload)
        return new static(self::OPEN, $payload);

    public static function pong($payload = '')
        return new static(self::PONG, $payload);

    public static function ping()
        return new static(self::PING);

    public static function message($payload)
        return new static(self::MESSAGE, $payload);

    public static function fromString(string $packet)
        return new static(substr($packet, 0, 1), substr($packet, 1) ?? '');

    public function toString()
        return $this->type . $this->data;

namespace think\swoole\websocket\socketio;

use Exception;
use Swoole\Timer;
use Swoole\Websocket\Frame;
use think\Config;
use think\Event;
use think\Request;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\Websocket;
use think\swoole\websocket\Event as WsEvent;

class Handler implements HandlerInterface
    /** @var Config */
    protected $config;

    protected $event;

    protected $websocket;

    protected $eio;

    protected $pingTimeoutTimer  = 0;
    protected $pingIntervalTimer = 0;

    protected $pingInterval;
    protected $pingTimeout;

    public function __construct(Event $event, Config $config, Websocket $websocket)
        $this->event        = $event;
        $this->config       = $config;
        $this->websocket    = $websocket;
        $this->pingInterval = $this->config->get('swoole.websocket.ping_interval', 25000);
        $this->pingTimeout  = $this->config->get('swoole.websocket.ping_timeout', 60000);

     * "onOpen" listener.
     * @param Request $request
    public function onOpen(Request $request)
        $this->eio = $request->param('EIO');

        $payload = json_encode(
                'sid'          => base64_encode(uniqid()),
                'upgrades'     => [],
                'pingInterval' => $this->pingInterval,
                'pingTimeout'  => $this->pingTimeout,


        $this->event->trigger('swoole.websocket.Open', $request);

        if ($this->eio < 4) {
            $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
        } else {

     * "onMessage" listener.
     * @param Frame $frame
    public function onMessage(Frame $frame)
        $enginePacket = EnginePacket::fromString($frame->data);

        $this->event->trigger('swoole.websocket.Message', $enginePacket);

        $this->resetPingTimeout($this->pingInterval + $this->pingTimeout);

        switch ($enginePacket->type) {
            case EnginePacket::MESSAGE:
                $packet = Packet::fromString($enginePacket->data);
                switch ($packet->type) {
                    case Packet::CONNECT:
                    case Packet::EVENT:
                        $type   = array_shift($packet->data);
                        $data   = $packet->data;
                        $result = $this->event->trigger('swoole.websocket.Event', new WsEvent($type, $data));

                        if ($packet->id !== null) {
                            $responsePacket = Packet::create(Packet::ACK, [
                                'id'   => $packet->id,
                                'nsp'  => $packet->nsp,
                                'data' => $result,

                    case Packet::DISCONNECT:
            case EnginePacket::PING:
            case EnginePacket::PONG:

     * "onClose" listener.
    public function onClose()

    protected function onConnect($data = null)
        try {
            $this->event->trigger('swoole.websocket.Connect', $data);
            $packet = Packet::create(Packet::CONNECT);
            if ($this->eio >= 4) {
                $packet->data = ['sid' => base64_encode(uniqid())];
        } catch (Exception $exception) {
            $packet = Packet::create(Packet::CONNECT_ERROR, [
                'data' => ['message' => $exception->getMessage()],


    protected function resetPingTimeout($timeout)
        $this->pingTimeoutTimer = Timer::after($timeout, function () {

    protected function schedulePing()
        $this->pingIntervalTimer = Timer::after($this->pingInterval, function () {

    public function encodeMessage($message)
        if ($message instanceof WsEvent) {
            $message = Packet::create(Packet::EVENT, [
                'data' => array_merge([$message->type], $message->data),

        if ($message instanceof Packet) {
            $message = EnginePacket::message($message->toString());

        if ($message instanceof EnginePacket) {
            $message = $message->toString();

        return $message;

    protected function push($data)


namespace think\swoole\websocket\socketio;

 * Class Packet
class Packet
     * Socket.io packet type `connect`.
    const CONNECT = 0;

     * Socket.io packet type `disconnect`.
    const DISCONNECT = 1;

     * Socket.io packet type `event`.
    const EVENT = 2;

     * Socket.io packet type `ack`.
    const ACK = 3;

     * Socket.io packet type `connect_error`.
    const CONNECT_ERROR = 4;

     * Socket.io packet type 'binary event'
    const BINARY_EVENT = 5;

     * Socket.io packet type `binary ack`. For acks with binary arguments.
    const BINARY_ACK = 6;

    public $type;
    public $nsp  = '/';
    public $data = null;
    public $id   = null;

    public function __construct(int $type)
        $this->type = $type;

    public static function create($type, array $decoded = [])
        $new     = new static($type);
        $new->id = $decoded['id'] ?? null;
        if (isset($decoded['nsp'])) {
            $new->nsp = $decoded['nsp'] ?: '/';
        } else {
            $new->nsp = '/';
        $new->data = $decoded['data'] ?? null;
        return $new;

    public function toString()
        $str = '' . $this->type;
        if ($this->nsp && '/' !== $this->nsp) {
            $str .= $this->nsp . ',';

        if ($this->id !== null) {
            $str .= $this->id;

        if (null !== $this->data) {
            $str .= json_encode($this->data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
        return $str;

    public static function fromString(string $str)
        $i = 0;

        $packet = new Packet((int) substr($str, 0, 1));

        // look up namespace (if any)
        if ('/' === substr($str, $i + 1, 1)) {
            $nsp = '';
            while (++$i) {
                $c = substr($str, $i, 1);
                if (',' === $c) {
                $nsp .= $c;
                if ($i === strlen($str)) {
            $packet->nsp = $nsp;
        } else {
            $packet->nsp = '/';

        // look up id
        $next = substr($str, $i + 1, 1);
        if ('' !== $next && is_numeric($next)) {
            $id = '';
            while (++$i) {
                $c = substr($str, $i, 1);
                if (null == $c || !is_numeric($c)) {
                $id .= substr($str, $i, 1);
                if ($i === strlen($str)) {
            $packet->id = intval($id);

        // look up json data
        if (substr($str, ++$i, 1)) {
            $packet->data = json_decode(substr($str, $i), true);

        return $packet;



namespace think\swoole\websocket;

use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\Manager;
use think\swoole\websocket\message\PushMessage;

 * Class Pusher
class Pusher

    /** @var Room */
    protected $room;

    /** @var Manager */
    protected $manager;

    /** @var HandlerInterface */
    protected $handler;

    protected $to = [];

    public function __construct(Manager $manager, Room $room, HandlerInterface $handler)
        $this->manager = $manager;
        $this->room    = $room;
        $this->handler = $handler;

    public function to(...$values)
        foreach ($values as $value) {
            if (is_array($value)) {
            } elseif (!in_array($value, $this->to)) {
                $this->to[] = $value;

        return $this;

     * Push message to related descriptors
     * @param $data
     * @return void
    public function push($data): void
        $fds = [];

        foreach ($this->to as $room) {
            $clients = $this->room->getClients($room);
            if (!empty($clients)) {
                $fds = array_merge($fds, $clients);

        foreach (array_unique($fds) as $fd) {
            [$workerId, $fd] = explode('.', $fd);
            $data = $this->handler->encodeMessage($data);
            $this->manager->sendMessage((int) $workerId, new PushMessage((int) $fd, $data));

    public function emit(string $event, ...$data): void
        $this->push(new Event($event, $data));


   ├── Watcher.php
   ├── Websocket.php
   ├── App.php
   ├── Service.php
   ├── Pool.php
   ├── Http.php
   ├── Sandbox.php
   ├── Manager.php
   ├── Middleware.php
   ├── helpers.php
   ├── Table.php
   └── command
       ├── Server.php
       ├── RpcInterface.php
   └── concerns
       ├── WithApplication.php
       ├── InteractsWithRpcServer.php
       ├── InteractsWithSwooleTable.php
       ├── InteractsWithWebsocket.php
       ├── WithRpcClient.php
       ├── ModifyProperty.php
       ├── InteractsWithServer.php
       ├── WithMiddleware.php
       ├── InteractsWithHttp.php
       ├── InteractsWithQueue.php
       ├── InteractsWithRpcConnector.php
       ├── InteractsWithPools.php
       ├── InteractsWithRpcClient.php
       ├── WithContainer.php
       ├── InteractsWithTracing.php
   └── config
       ├── swoole.php
   └── contract
       └── websocket
               ├── HandlerInterface.php
               ├── RoomInterface.php
       ├── ResetterInterface.php
       └── rpc
               ├── ParserInterface.php
   └── coroutine
       ├── Context.php
   └── exception
       ├── RpcResponseException.php
       ├── RpcClientException.php
   └── middleware
       ├── TraceRpcServer.php
       ├── TraceRpcClient.php
       ├── InteractsWithVarDumper.php
   └── pool
       └── proxy
               ├── Connection.php
               ├── Store.php
       ├── Cache.php
       ├── Connector.php
       ├── Db.php
       ├── Proxy.php
       ├── Client.php
   └── resetters
       ├── ResetModel.php
       ├── ResetConfig.php
       ├── ClearInstances.php
       ├── ResetEvent.php
       ├── ResetService.php
       ├── ResetPaginator.php
   └── response
       ├── File.php
   └── rpc
       ├── Protocol.php
       ├── File.php
       └── server
               ├── Dispatcher.php
               ├── Channel.php
       ├── Packer.php
       ├── Sendfile.php
       ├── Error.php
       ├── JsonParser.php
       └── client
               ├── Service.php
               ├── Connector.php
               ├── Proxy.php
               ├── Gateway.php
       └── packer
               ├── File.php
               ├── Buffer.php
   └── watcher
       ├── Driver.php
       ├── Find.php
       ├── Scan.php
   └── websocket
       └── middleware
               ├── SessionInit.php
       ├── Event.php
       └── room
               ├── Redis.php
               ├── Table.php
       ├── Handler.php
       ├── Room.php
       └── message
               ├── PushMessage.php
       └── socketio
               ├── EnginePacket.php
               ├── Handler.php
               ├── Packet.php
       ├── Pusher.php