连续复制
一键复制
一键打包
Watcher.php
<?php
namespace think\swoole;
use think\swoole\watcher\Driver;
/**
* @mixin Driver
*/
class Watcher extends \think\Manager
{
protected $namespace = '\\think\\swoole\\watcher\\';
protected function getConfig(string $name, $default = null)
{
return $this->app->config->get('swoole.hot_update.' . $name, $default);
}
protected function resolveParams($name): array
{
return [
$this->getConfig('include', []),
$this->getConfig('exclude', []),
$this->getConfig('name', []),
];
}
public function getDefaultDriver()
{
return $this->getConfig('type', 'scan');
}
}
Websocket.php
<?php
namespace think\swoole;
use Swoole\Http\Response;
use think\Event;
use think\swoole\websocket\Pusher;
use think\swoole\websocket\Room;
/**
* Class Websocket
*/
class Websocket
{
/**
* @var \think\App
*/
protected $app;
/**
* @var Room
*/
protected $room;
/**
* Scoket sender's fd.
*
* @var string
*/
protected $sender;
/** @var Event */
protected $event;
/** @var Response */
protected $client;
/**
* Websocket constructor.
*
* @param \think\App $app
* @param Room $room
* @param Event $event
*/
public function __construct(\think\App $app, Room $room, Event $event)
{
$this->app = $app;
$this->room = $room;
$this->event = $event;
}
/**
* @return Pusher
*/
protected function makePusher()
{
return $this->app->invokeClass(Pusher::class);
}
public function to(...$values)
{
return $this->makePusher()->to(...$values);
}
public function push($data)
{
$this->makePusher()->to($this->getSender())->push($data);
}
public function emit(string $event, ...$data)
{
$this->makePusher()->to($this->getSender())->emit($event, ...$data);
}
/**
* Join sender to multiple rooms.
*
* @param string|integer|array $rooms
*
* @return $this
*/
public function join($rooms): self
{
$rooms = is_string($rooms) || is_int($rooms) ? func_get_args() : $rooms;
$this->room->add($this->getSender(), $rooms);
return $this;
}
/**
* Make sender leave multiple rooms.
*
* @param array|string|integer $rooms
*
* @return $this
*/
public function leave($rooms = []): self
{
$rooms = is_string($rooms) || is_int($rooms) ? func_get_args() : $rooms;
$this->room->delete($this->getSender(), $rooms);
return $this;
}
public function isEstablished()
{
return !!$this->client;
}
/**
* Close current connection.
*/
public function close()
{
if ($this->client) {
$this->client->close();
}
}
/**
* @param Response $response
*/
public function setClient($response)
{
$this->client = $response;
}
/**
* Set sender fd.
*
* @param string
*
* @return $this
*/
public function setSender(string $fd)
{
$this->sender = $fd;
return $this;
}
/**
* Get current sender fd.
*/
public function getSender()
{
return $this->sender;
}
}
App.php
<?php
namespace think\swoole;
class App extends \think\App
{
protected $inConsole = true;
public function setInConsole($inConsole = true)
{
$this->inConsole = $inConsole;
}
public function runningInConsole(): bool
{
return $this->inConsole;
}
public function clearInstances()
{
$this->instances = [];
}
}
Service.php
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\swoole;
use think\swoole\command\RpcInterface;
use think\swoole\command\Server as ServerCommand;
class Service extends \think\Service
{
public function boot()
{
$this->commands(
ServerCommand::class,
RpcInterface::class,
);
}
}
Pool.php
<?php
namespace think\swoole;
use Smf\ConnectionPool\ConnectionPool;
use think\helper\Arr;
class Pool
{
protected $pools = [];
/**
* @param string $name
* @param ConnectionPool $pool
*
* @return Pool
*/
public function add(string $name, ConnectionPool $pool)
{
$pool->init();
$this->pools[$name] = $pool;
return $this;
}
/**
* @param string $name
*
* @return ConnectionPool
*/
public function get(string $name)
{
return $this->pools[$name] ?? null;
}
public function close(string $key)
{
return $this->pools[$key]->close();
}
/**
* @return array
*/
public function getAll()
{
return $this->pools;
}
public function closeAll()
{
foreach ($this->pools as $pool) {
$pool->close();
}
}
/**
* @param string $key
*
* @return ConnectionPool
*/
public function __get($key)
{
return $this->get($key);
}
public static function pullPoolConfig(&$config)
{
return [
'minActive' => Arr::pull($config, 'min_active', 0),
'maxActive' => Arr::pull($config, 'max_active', 10),
'maxWaitTime' => Arr::pull($config, 'max_wait_time', 5),
'maxIdleTime' => Arr::pull($config, 'max_idle_time', 20),
'idleCheckInterval' => Arr::pull($config, 'idle_check_interval', 10),
];
}
}
Http.php
<?php
namespace think\swoole;
/**
* Class Http
* @package think\swoole
* @property $request
*/
class Http extends \think\Http
{
protected function loadMiddleware(): void
{
}
protected function loadRoutes(): void
{
}
}
Sandbox.php
<?php
namespace think\swoole;
use Closure;
use InvalidArgumentException;
use ReflectionObject;
use RuntimeException;
use Swoole\Coroutine;
use think\App;
use think\Config;
use think\Container;
use think\Event;
use think\swoole\App as SwooleApp;
use think\swoole\concerns\ModifyProperty;
use think\swoole\contract\ResetterInterface;
use think\swoole\coroutine\Context;
use think\swoole\resetters\ClearInstances;
use think\swoole\resetters\ResetConfig;
use think\swoole\resetters\ResetEvent;
use think\swoole\resetters\ResetModel;
use think\swoole\resetters\ResetPaginator;
use think\swoole\resetters\ResetService;
use Throwable;
class Sandbox
{
use ModifyProperty;
/**
* The app containers in different coroutine environment.
*
* @var SwooleApp[]
*/
protected $snapshots = [];
/** @var SwooleApp */
protected $app;
/** @var Config */
protected $config;
/** @var Event */
protected $event;
/** @var ResetterInterface[] */
protected $resetters = [];
protected $services = [];
public function __construct(Container $app)
{
$this->setBaseApp($app);
$this->initialize();
}
public function setBaseApp(Container $app)
{
$this->app = $app;
return $this;
}
public function getBaseApp()
{
return $this->app;
}
protected function initialize()
{
Container::setInstance(function () {
return $this->getApplication();
});
$this->setInitialConfig();
$this->setInitialServices();
$this->setInitialEvent();
$this->setInitialResetters();
return $this;
}
public function run(Closure $callable)
{
$this->init();
try {
$this->getApplication()->invoke($callable, [$this]);
} catch (Throwable $e) {
throw $e;
} finally {
$this->clear();
}
}
public function init()
{
$app = $this->getApplication(true);
$this->setInstance($app);
$this->resetApp($app);
}
public function clear()
{
if ($app = $this->getSnapshot()) {
$app->clearInstances();
unset($this->snapshots[$this->getSnapshotId()]);
}
Context::clear();
$this->setInstance($this->getBaseApp());
}
public function getApplication($init = false)
{
$snapshot = $this->getSnapshot($init);
if ($snapshot instanceof Container) {
return $snapshot;
}
if ($init) {
$snapshot = clone $this->getBaseApp();
$this->setSnapshot($snapshot);
return $snapshot;
}
throw new InvalidArgumentException('The app object has not been initialized');
}
protected function getSnapshotId($init = false)
{
if ($init) {
Coroutine::getContext()->offsetSet('#root', true);
return Coroutine::getCid();
} else {
$cid = Coroutine::getCid();
while (!Coroutine::getContext($cid)->offsetExists('#root')) {
$cid = Coroutine::getPcid($cid);
if ($cid < 1) {
break;
}
}
return $cid;
}
}
/**
* Get current snapshot.
* @return App|null
*/
public function getSnapshot($init = false)
{
return $this->snapshots[$this->getSnapshotId($init)] ?? null;
}
public function setSnapshot(Container $snapshot)
{
$this->snapshots[$this->getSnapshotId()] = $snapshot;
return $this;
}
public function setInstance(Container $app)
{
$app->instance('app', $app);
$app->instance(Container::class, $app);
$reflectObject = new ReflectionObject($app);
$reflectProperty = $reflectObject->getProperty('services');
$reflectProperty->setAccessible(true);
$services = $reflectProperty->getValue($app);
foreach ($services as $service) {
$this->modifyProperty($service, $app);
}
}
/**
* Set initial config.
*/
protected function setInitialConfig()
{
$this->config = clone $this->getBaseApp()->config;
}
protected function setInitialEvent()
{
$this->event = clone $this->getBaseApp()->event;
}
/**
* Get config snapshot.
*/
public function getConfig()
{
return $this->config;
}
public function getEvent()
{
return $this->event;
}
public function getServices()
{
return $this->services;
}
protected function setInitialServices()
{
$app = $this->getBaseApp();
$services = $this->config->get('swoole.services', []);
foreach ($services as $service) {
if (class_exists($service) && !in_array($service, $this->services)) {
$serviceObj = new $service($app);
$this->services[$service] = $serviceObj;
}
}
}
/**
* Initialize resetters.
*/
protected function setInitialResetters()
{
$app = $this->getBaseApp();
$resetters = [
ClearInstances::class,
ResetConfig::class,
ResetEvent::class,
ResetService::class,
ResetModel::class,
ResetPaginator::class,
];
$resetters = array_merge($resetters, $this->config->get('swoole.resetters', []));
foreach ($resetters as $resetter) {
$resetterClass = $app->make($resetter);
if (!$resetterClass instanceof ResetterInterface) {
throw new RuntimeException("{$resetter} must implement " . ResetterInterface::class);
}
$this->resetters[$resetter] = $resetterClass;
}
}
/**
* Reset Application.
*
* @param App $app
*/
protected function resetApp(App $app)
{
foreach ($this->resetters as $resetter) {
$resetter->handle($app, $this);
}
}
}
Manager.php
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\swoole;
use think\swoole\concerns\InteractsWithHttp;
use think\swoole\concerns\InteractsWithPools;
use think\swoole\concerns\InteractsWithQueue;
use think\swoole\concerns\InteractsWithRpcClient;
use think\swoole\concerns\InteractsWithRpcServer;
use think\swoole\concerns\InteractsWithServer;
use think\swoole\concerns\InteractsWithSwooleTable;
use think\swoole\concerns\InteractsWithTracing;
use think\swoole\concerns\WithApplication;
use think\swoole\concerns\WithContainer;
/**
* Class Manager
*/
class Manager
{
use InteractsWithServer,
InteractsWithSwooleTable,
InteractsWithHttp,
InteractsWithPools,
InteractsWithRpcClient,
InteractsWithRpcServer,
InteractsWithQueue,
InteractsWithTracing,
WithContainer,
WithApplication;
/**
* Initialize.
*/
protected function initialize(): void
{
$this->prepareTables();
$this->preparePools();
$this->prepareHttp();
$this->prepareRpcServer();
$this->prepareQueue();
$this->prepareRpcClient();
$this->prepareTracing();
}
}
Middleware.php
<?php
namespace think\swoole;
use Closure;
use InvalidArgumentException;
use think\App;
use think\Pipeline;
class Middleware
{
/**
* 中间件执行队列
* @var array
*/
protected $queue = [];
/**
* @var App
*/
protected $app;
public function __construct(App $app, $middlewares = [])
{
$this->app = $app;
foreach ($middlewares as $middleware) {
$this->queue[] = $this->buildMiddleware($middleware);
}
}
public static function make(App $app, $middlewares = [])
{
return new self($app, $middlewares);
}
/**
* 调度管道
* @return Pipeline
*/
public function pipeline()
{
return (new Pipeline())
->through(array_map(function ($middleware) {
return function ($request, $next) use ($middleware) {
[$call, $params] = $middleware;
if (is_array($call) && is_string($call[0])) {
$call = [$this->app->make($call[0]), $call[1]];
}
return call_user_func($call, $request, $next, ...$params);
};
}, $this->queue));
}
/**
* 解析中间件
* @param mixed $middleware
* @return array
*/
protected function buildMiddleware($middleware): array
{
if (is_array($middleware)) {
[$middleware, $params] = $middleware;
}
if ($middleware instanceof Closure) {
return [$middleware, $params ?? []];
}
if (!is_string($middleware)) {
throw new InvalidArgumentException('The middleware is invalid');
}
return [[$middleware, 'handle'], $params ?? []];
}
}
helpers.php
<?php
namespace {
if (!function_exists('swoole_cpu_num')) {
function swoole_cpu_num(): int
{
return 1;
}
}
if (!defined('SWOOLE_SOCK_TCP')) {
define('SWOOLE_SOCK_TCP', 1);
}
if (!defined('SWOOLE_PROCESS')) {
define('SWOOLE_PROCESS', 3);
}
if (!defined('SWOOLE_HOOK_ALL')) {
define('SWOOLE_HOOK_ALL', 1879048191);
}
}
namespace think\swoole\helper {
use think\swoole\response\File;
function download(string $filename, string $name = '', $disposition = File::DISPOSITION_ATTACHMENT): File
{
$response = new File($filename, $disposition);
if ($name) {
$response->setContentDisposition($disposition, $name);
}
return $response;
}
function file(string $filename)
{
return new File($filename);
}
}
Table.php
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\swoole;
use Swoole\Table as SwooleTable;
class Table
{
public const TYPE_INT = 1;
public const TYPE_STRING = 3;
public const TYPE_FLOAT = 2;
/**
* Registered swoole tables.
*
* @var array
*/
protected $tables = [];
/**
* Add a swoole table to existing tables.
*
* @param string $name
* @param SwooleTable $table
*
* @return Table
*/
public function add(string $name, SwooleTable $table)
{
$this->tables[$name] = $table;
return $this;
}
/**
* Get a swoole table by its name from existing tables.
*
* @param string $name
*
* @return SwooleTable $table
*/
public function get(string $name)
{
return $this->tables[$name] ?? null;
}
/**
* Get all existing swoole tables.
*
* @return array
*/
public function getAll()
{
return $this->tables;
}
/**
* Dynamically access table.
*
* @param string $key
*
* @return SwooleTable
*/
public function __get(string $key)
{
return $this->get($key);
}
}
command
Server.php
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: liu21st <liu21st@gmail.com>
// +----------------------------------------------------------------------
namespace think\swoole\command;
use think\console\Command;
use think\console\input\Option;
use think\swoole\Manager;
class Server extends Command
{
public function configure()
{
$this->setName('swoole')
->addOption(
'env',
'E',
Option::VALUE_REQUIRED,
'Environment name',
''
)
->setDescription('Swoole Server for ThinkPHP');
}
public function handle(Manager $manager)
{
$this->checkEnvironment();
$this->output->writeln('Starting swoole server...');
$this->output->writeln('You can exit with <info>`CTRL-C`</info>');
$envName = $this->input->getOption('env');
$manager->start($envName);
}
/**
* 检查环境
*/
protected function checkEnvironment()
{
if (!extension_loaded('swoole')) {
$this->output->error('Can\'t detect Swoole extension installed.');
exit(1);
}
if (!version_compare(swoole_version(), '4.6.0', 'ge')) {
$this->output->error('Your Swoole version must be higher than `4.6.0`.');
exit(1);
}
}
}
RpcInterface.php
<?php
namespace think\swoole\command;
use Nette\PhpGenerator\ClassType;
use Nette\PhpGenerator\Dumper;
use Nette\PhpGenerator\Helpers;
use Nette\PhpGenerator\PhpFile;
use think\console\Command;
use think\helper\Arr;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\rpc\client\Gateway;
use think\swoole\rpc\client\Service;
use think\swoole\rpc\JsonParser;
use function Swoole\Coroutine\run;
class RpcInterface extends Command
{
public function configure()
{
$this->setName('rpc:interface')
->setDescription('Generate Rpc Service Interfaces');
}
public function handle()
{
run(function () {
$file = new PhpFile;
$file->addComment('This file is auto-generated.');
$file->setStrictTypes();
$services = [];
$clients = $this->app->config->get('swoole.rpc.client', []);
foreach ($clients as $name => $config) {
$parserClass = Arr::get($config, 'parser', JsonParser::class);
/** @var ParserInterface $parser */
$parser = new $parserClass;
$gateway = new Gateway($config, $parser, Arr::get($config, 'tries', 2));
$result = $gateway->getServices();
$namespace = $file->addNamespace("rpc\\contract\\{$name}");
$namespace->addUse(Service::class);
foreach ($result as $interface => $methods) {
$services[$name][] = $namespace->getName() . "\\{$interface}";
$class = $namespace->addInterface($interface);
$class->addExtend(Service::class);
foreach ($methods as $methodName => ['parameters' => $parameters, 'returnType' => $returnType, 'comment' => $comment]) {
$method = $class->addMethod($methodName)
->setVisibility(ClassType::VISIBILITY_PUBLIC)
->setComment(Helpers::unformatDocComment($comment))
->setReturnType($returnType);
foreach ($parameters as $parameter) {
if ($parameter['type'] && (class_exists($parameter['type']) || interface_exists($parameter['type']))) {
$namespace->addUse($parameter['type']);
}
$param = $method->addParameter($parameter['name'])
->setType($parameter['type']);
if (array_key_exists('default', $parameter)) {
$param->setDefaultValue($parameter['default']);
}
if (array_key_exists('nullable', $parameter)) {
$param->setNullable();
}
}
}
}
}
$dumper = new Dumper();
$services = 'return ' . $dumper->dump($services) . ';';
file_put_contents($this->app->getBasePath() . 'rpc.php', $file . $services);
$this->output->writeln('<info>Succeed!</info>');
});
}
}
concerns
WithApplication.php
<?php
namespace think\swoole\concerns;
use Closure;
use think\App;
use think\swoole\App as SwooleApp;
use think\swoole\Manager;
use think\swoole\pool\Cache;
use think\swoole\pool\Db;
use think\swoole\Sandbox;
use Throwable;
/**
* Trait WithApplication
* @package think\swoole\concerns
* @property App $container
*/
trait WithApplication
{
/**
* @var SwooleApp
*/
protected $app;
protected function prepareApplication(string $envName)
{
if (!$this->app instanceof SwooleApp) {
$this->app = new SwooleApp($this->container->getRootPath());
$this->app->setEnvName($envName);
$this->app->bind(SwooleApp::class, App::class);
$this->app->bind(Manager::class, $this);
//绑定连接池
if ($this->getConfig('pool.db.enable', true)) {
$this->app->bind('db', Db::class);
$this->app->resolving(Db::class, function (Db $db) {
$db->setLog($this->container->log);
});
}
if ($this->getConfig('pool.cache.enable', true)) {
$this->app->bind('cache', Cache::class);
}
$this->app->initialize();
$this->app->instance('request', $this->container->request);
$this->prepareConcretes();
}
}
/**
* 预加载
*/
protected function prepareConcretes()
{
$defaultConcretes = ['db', 'cache', 'event'];
$concretes = array_merge($defaultConcretes, $this->getConfig('concretes', []));
foreach ($concretes as $concrete) {
if ($this->app->has($concrete)) {
$this->app->make($concrete);
}
}
}
public function getApplication()
{
return $this->app;
}
/**
* 获取沙箱
* @return Sandbox
*/
protected function getSandbox()
{
return $this->app->make(Sandbox::class);
}
/**
* 在沙箱中执行
* @param Closure $callable
*/
public function runInSandbox(Closure $callable)
{
try {
$this->getSandbox()->run($callable);
} catch (Throwable $e) {
$this->logServerError($e);
}
}
}
InteractsWithRpcServer.php
<?php
namespace think\swoole\concerns;
use Swoole\Coroutine\Server;
use Swoole\Coroutine\Server\Connection;
use think\App;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\Pool;
use think\swoole\rpc\Error;
use think\swoole\rpc\File;
use think\swoole\rpc\JsonParser;
use think\swoole\rpc\Packer;
use think\swoole\rpc\server\Dispatcher;
use Throwable;
/**
* Trait InteractsWithRpc
* @package think\swoole\concerns
* @property App $app
* @property App $container
* @method Pool getPools()
*/
trait InteractsWithRpcServer
{
protected function createRpcServer()
{
$this->bindRpcParser();
$this->bindRpcDispatcher();
$host = $this->getConfig('rpc.server.host', '0.0.0.0');
$port = $this->getConfig('rpc.server.port', 9000);
$server = new Server($host, $port, false, true);
$server->handle(function (Connection $conn) {
$this->runInSandbox(function (App $app, Dispatcher $dispatcher) use ($conn) {
$files = [];
while (true) {
//接收数据
$data = $conn->recv();
if ($data === '' || $data === false) {
break;
}
begin:
if (!isset($handler)) {
try {
[$handler, $data] = Packer::unpack($data);
} catch (Throwable $e) {
//错误的包头
$result = Error::make(Dispatcher::INVALID_REQUEST, $e->getMessage());
$this->runWithBarrier(function () use ($dispatcher, $app, $conn, $result) {
$dispatcher->dispatch($app, $conn, $result);
});
break;
}
}
$result = $handler->write($data);
if (!empty($result)) {
$handler = null;
if ($result instanceof File) {
$files[] = $result;
} else {
$this->runWithBarrier(function () use ($dispatcher, $app, $conn, $result, $files) {
$dispatcher->dispatch($app, $conn, $result, $files);
});
$files = [];
}
}
if (!empty($data)) {
goto begin;
}
}
$conn->close();
});
});
$server->start();
}
protected function prepareRpcServer()
{
if ($this->getConfig('rpc.server.enable', false)) {
$workerNum = $this->getConfig('rpc.server.worker_num', swoole_cpu_num());
$this->addBatchWorker($workerNum, [$this, 'createRpcServer'], 'rpc server');
}
}
protected function bindRpcDispatcher()
{
$services = $this->getConfig('rpc.server.services', []);
$middleware = $this->getConfig('rpc.server.middleware', []);
$this->app->make(Dispatcher::class, [$services, $middleware]);
}
protected function bindRpcParser()
{
$parserClass = $this->getConfig('rpc.server.parser', JsonParser::class);
$this->app->bind(ParserInterface::class, $parserClass);
$this->app->make(ParserInterface::class);
}
}
InteractsWithSwooleTable.php
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2018 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------
namespace think\swoole\concerns;
use Swoole\Table as SwooleTable;
use think\App;
use think\swoole\Table;
/**
* Trait InteractsWithSwooleTable
*
* @property App $container
* @property App $app
*/
trait InteractsWithSwooleTable
{
/**
* @var Table
*/
protected $currentTable;
/**
* Register customized swoole tables.
*/
protected function prepareTables()
{
$this->currentTable = new Table();
$this->registerTables();
$this->onEvent('workerStart', function () {
$this->app->instance(Table::class, $this->currentTable);
foreach ($this->currentTable->getAll() as $name => $table) {
$this->app->instance("swoole.table.{$name}", $table);
}
});
}
/**
* Register user-defined swoole tables.
*/
protected function registerTables()
{
$tables = $this->container->make('config')->get('swoole.tables', []);
foreach ($tables as $key => $value) {
$table = new SwooleTable($value['size']);
$columns = $value['columns'] ?? [];
foreach ($columns as $column) {
if (isset($column['size'])) {
$table->column($column['name'], $column['type'], $column['size']);
} else {
$table->column($column['name'], $column['type']);
}
}
$table->create();
$this->currentTable->add($key, $table);
}
}
}
InteractsWithWebsocket.php
<?php
namespace think\swoole\concerns;
use Swoole\Atomic;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\WebSocket\CloseFrame;
use Swoole\WebSocket\Frame;
use think\App;
use think\helper\Str;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\contract\websocket\RoomInterface;
use think\swoole\Middleware;
use think\swoole\Websocket;
use think\swoole\websocket\message\PushMessage;
use think\swoole\websocket\Room;
/**
* Trait InteractsWithWebsocket
* @package think\swoole\concerns
*
* @property App $app
* @property App $container
*/
trait InteractsWithWebsocket
{
/**
* @var RoomInterface
*/
protected $wsRoom;
/**
* @var Channel[]
*/
protected $wsMessageChannel = [];
protected $wsEnable = false;
/** @var Atomic */
protected $wsIdAtomic;
/**
* "onHandShake" listener.
*
* @param Request $req
* @param Response $res
*/
public function onHandShake($req, $res)
{
$this->runInSandbox(function (App $app, Websocket $websocket, HandlerInterface $handler) use ($req, $res) {
$res->upgrade();
$websocket->setClient($res);
$request = $this->prepareRequest($req);
$request = $this->setRequestThroughMiddleware($app, $request);
$fd = $this->wsIdAtomic->add();
$this->wsMessageChannel[$fd] = new Channel(1);
Coroutine::create(function () use ($res, $fd) {
//推送消息
while ($message = $this->wsMessageChannel[$fd]->pop()) {
$res->push($message);
}
});
try {
$id = "{$this->workerId}.{$fd}";
$websocket->setSender($id);
$websocket->join($id);
$this->runWithBarrier(function () use ($request, $handler) {
$handler->onOpen($request);
});
$this->runWithBarrier(function () use ($handler, $res) {
$cid = Coroutine::getCid();
$messages = 0;
$wait = false;
$frame = null;
while (true) {
/** @var Frame|false|string $recv */
$recv = $res->recv();
if ($recv === '' || $recv === false || $recv instanceof CloseFrame) {
break;
}
if (empty($frame)) {
$frame = new Frame();
$frame->opcode = $recv->opcode;
$frame->flags = $recv->flags;
$frame->fd = $recv->fd;
$frame->finish = false;
}
$frame->data .= $recv->data;
$frame->finish = $recv->finish;
if ($frame->finish) {
Coroutine::create(function () use (&$wait, &$messages, $cid, $frame, $handler) {
++$messages;
Coroutine::defer(function () use (&$wait, &$messages, $cid) {
--$messages;
if ($wait) {
Coroutine::resume($cid);
}
});
$handler->onMessage($frame);
});
$frame = null;
}
}
//等待消息执行完毕
while ($messages > 0) {
$wait = true;
Coroutine::yield();
}
});
//关闭连接
$res->close();
$this->runWithBarrier(function () use ($handler) {
$handler->onClose();
});
} finally {
// leave all rooms
$websocket->leave();
if (isset($this->wsMessageChannel[$fd])) {
$this->wsMessageChannel[$fd]->close();
unset($this->wsMessageChannel[$fd]);
}
$websocket->setClient(null);
}
});
}
/**
* @param App $app
* @param \think\Request $request
* @return \think\Request
*/
protected function setRequestThroughMiddleware(App $app, \think\Request $request)
{
$app->instance('request', $request);
return Middleware::make($app, $this->getConfig('websocket.middleware', []))
->pipeline()
->send($request)
->then(function ($request) {
return $request;
});
}
/**
* Prepare settings if websocket is enabled.
*/
protected function prepareWebsocket()
{
$this->prepareWebsocketIdAtomic();
$this->prepareWebsocketRoom();
$this->onEvent('message', function ($message) {
if ($message instanceof PushMessage) {
if (isset($this->wsMessageChannel[$message->fd])) {
$this->wsMessageChannel[$message->fd]->push($message->data);
}
}
});
$this->onEvent('workerStart', function () {
$this->bindWebsocketRoom();
$this->bindWebsocketHandler();
$this->prepareWebsocketListener();
});
}
protected function prepareWebsocketIdAtomic()
{
$this->wsIdAtomic = new Atomic();
}
/**
* Prepare websocket room.
*/
protected function prepareWebsocketRoom()
{
$this->wsRoom = $this->container->make(Room::class);
$this->wsRoom->prepare();
}
protected function prepareWebsocketListener()
{
$listeners = $this->getConfig('websocket.listen', []);
foreach ($listeners as $event => $listener) {
$this->app->event->listen('swoole.websocket.' . Str::studly($event), $listener);
}
$subscribers = $this->getConfig('websocket.subscribe', []);
foreach ($subscribers as $subscriber) {
$this->app->event->observe($subscriber, 'swoole.websocket.');
}
}
/**
* Prepare websocket handler for onOpen and onClose callback
*/
protected function bindWebsocketHandler()
{
$handlerClass = $this->getConfig('websocket.handler');
$this->app->bind(HandlerInterface::class, $handlerClass);
}
/**
* Bind room instance to app container.
*/
protected function bindWebsocketRoom(): void
{
$this->app->instance(Room::class, $this->wsRoom);
}
}
WithRpcClient.php
<?php
namespace think\swoole\concerns;
use think\App;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\helper\Arr;
use think\swoole\rpc\client\Gateway;
use think\swoole\rpc\client\Proxy;
use think\swoole\rpc\JsonParser;
use Throwable;
use function Swoole\Coroutine\run;
/**
* 在命令行里使用RPC
* @property App $app
*/
trait WithRpcClient
{
public function __construct()
{
if (!($this instanceof Command)) {
throw new \RuntimeException('Trait `WithRpcClient` only can be used in Command');
}
parent::__construct();
}
protected function execute(Input $input, Output $output)
{
$this->bindRpcInterface();
run(function () {
$this->app->invoke([$this, 'handle']);
});
}
protected function bindRpcInterface()
{
//引入rpc接口文件
if (file_exists($rpc = $this->app->getBasePath() . 'rpc.php')) {
$rpcServices = (array) include $rpc;
//绑定rpc接口
try {
foreach ($rpcServices as $name => $abstracts) {
$config = $this->app->config->get("swoole.rpc.client.{$name}", []);
$parserClass = Arr::pull($config, 'parser', JsonParser::class);
$tries = Arr::pull($config, 'tries', 2);
$middleware = Arr::pull($config, 'middleware', []);
$parser = $this->app->make($parserClass);
$gateway = new Gateway($config, $parser, $tries);
foreach ($abstracts as $abstract) {
$this->app->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) {
return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]);
});
}
}
} catch (Throwable $e) {
}
}
}
}
ModifyProperty.php
<?php
namespace think\swoole\concerns;
use ReflectionObject;
trait ModifyProperty
{
protected function modifyProperty($object, $value, $property = 'app')
{
$reflectObject = new ReflectionObject($object);
if ($reflectObject->hasProperty($property)) {
$reflectProperty = $reflectObject->getProperty($property);
$reflectProperty->setAccessible(true);
$reflectProperty->setValue($object, $value);
}
}
}
InteractsWithServer.php
<?php
namespace think\swoole\concerns;
use Swoole\Constant;
use Swoole\Coroutine;
use Swoole\Event;
use Swoole\Process;
use Swoole\Process\Pool;
use Swoole\Runtime;
use think\App;
use think\swoole\Watcher;
/**
* Trait InteractsWithServer
* @package think\swoole\concerns
* @property App $container
*/
trait InteractsWithServer
{
/**
* @var array
*/
protected $startFuncMap = [];
protected $workerId;
/** @var Pool */
protected $pool;
public function addBatchWorker(int $workerNum, callable $func, $name = null)
{
for ($i = 0; $i < $workerNum; $i++) {
$this->addWorker($func, $name ? "{$name} #{$i}" : null);
}
return $this;
}
public function addWorker(callable $func, $name = null): self
{
$this->startFuncMap[] = [$func, $name];
return $this;
}
/**
* 启动服务
* @param string $envName 环境变量标识
*/
public function start(string $envName): void
{
$this->setProcessName('manager process');
$this->initialize();
$this->triggerEvent('init');
//热更新
if ($this->getConfig('hot_update.enable', false)) {
$this->addHotUpdateProcess();
}
$pool = new Pool(count($this->startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, true);
$pool->on(Constant::EVENT_WORKER_START, function ($pool, $workerId) use ($envName) {
Runtime::enableCoroutine();
$this->pool = $pool;
$this->workerId = $workerId;
[$func, $name] = $this->startFuncMap[$workerId];
if ($name) {
$this->setProcessName($name);
}
Process::signal(SIGTERM, function () {
$this->pool->getProcess()->exit();
});
/** @var Coroutine\Socket $socket */
$socket = $this->pool->getProcess()->exportSocket();
//启动消息监听
Event::add($socket, function (Coroutine\Socket $socket) {
$recv = $socket->recv();
$message = unserialize($recv);
$this->triggerEvent('message', $message);
});
$this->clearCache();
$this->prepareApplication($envName);
$this->triggerEvent(Constant::EVENT_WORKER_START);
$func($pool, $workerId);
});
$pool->start();
}
public function getWorkerId()
{
return $this->workerId;
}
/**
* 获取当前工作进程池对象
* @return Pool
*/
public function getPool()
{
return $this->pool;
}
public function sendMessage($workerId, $message)
{
if ($workerId === $this->workerId) {
$this->triggerEvent('message', $message);
} else {
/** @var Process $process */
$process = $this->pool->getProcess($workerId);
$socket = $process->exportSocket();
$socket->send(serialize($message));
}
}
public function runWithBarrier(callable $func, ...$params)
{
$channel = new Coroutine\Channel(1);
Coroutine::create(function (...$params) use ($channel, $func) {
Coroutine::defer(function () use ($channel) {
$channel->close();
});
call_user_func_array($func, $params);
}, ...$params);
$channel->pop();
}
/**
* 热更新
*/
protected function addHotUpdateProcess()
{
$this->addWorker(function (Process\Pool $pool) {
$watcher = $this->container->make(Watcher::class);
$watcher->watch(function () use ($pool) {
Process::kill($pool->master_pid, SIGUSR1);
});
}, 'hot update');
}
/**
* 清除apc、op缓存
*/
protected function clearCache()
{
if (extension_loaded('apc')) {
apc_clear_cache();
}
if (extension_loaded('Zend OPcache')) {
opcache_reset();
}
}
/**
* Set process name.
*
* @param $process
*/
protected function setProcessName($process)
{
$appName = $this->container->config->get('app.name', 'ThinkPHP');
$name = sprintf('swoole: %s process for %s', $process, $appName);
@cli_set_process_title($name);
}
}
WithMiddleware.php
<?php
namespace think\swoole\concerns;
trait WithMiddleware
{
/**
* 中间件
* @var array
*/
protected $middleware = [];
protected function middleware($middleware, ...$params)
{
$options = [];
$this->middleware[] = [
'middleware' => [$middleware, $params],
'options' => &$options,
];
return new class($options) {
protected $options;
public function __construct(array &$options)
{
$this->options = &$options;
}
public function only($methods)
{
$this->options['only'] = is_array($methods) ? $methods : func_get_args();
return $this;
}
public function except($methods)
{
$this->options['except'] = is_array($methods) ? $methods : func_get_args();
return $this;
}
};
}
}
InteractsWithHttp.php
<?php
namespace think\swoole\concerns;
use Swoole\Coroutine\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\Http\Status;
use think\App;
use think\Cookie;
use think\Event;
use think\exception\Handle;
use think\helper\Arr;
use think\helper\Str;
use think\Http;
use think\swoole\App as SwooleApp;
use think\swoole\Http as SwooleHttp;
use think\swoole\response\File as FileResponse;
use Throwable;
use function substr;
/**
* Trait InteractsWithHttp
* @package think\swoole\concerns
* @property App $app
* @property App $container
*/
trait InteractsWithHttp
{
use InteractsWithWebsocket, ModifyProperty;
public function createHttpServer()
{
$this->preloadHttp();
$host = $this->getConfig('http.host');
$port = $this->getConfig('http.port');
$options = $this->getConfig('http.options', []);
$server = new Server($host, $port, false, true);
$server->set($options);
$server->handle('/', function (Request $req, Response $res) {
if ($this->wsEnable && $this->isWebsocketRequest($req)) {
$this->onHandShake($req, $res);
} else {
$this->onRequest($req, $res);
}
});
$server->start();
}
protected function preloadHttp()
{
$http = $this->app->http;
$this->app->invokeMethod([$http, 'loadMiddleware'], [], true);
if ($this->app->config->get('app.with_route', true)) {
$this->app->invokeMethod([$http, 'loadRoutes'], [], true);
$route = clone $this->app->route;
$this->modifyProperty($route, null);
unset($this->app->route);
$this->app->resolving(SwooleHttp::class, function ($http, App $app) use ($route) {
$newRoute = clone $route;
$this->modifyProperty($newRoute, $app);
$app->instance('route', $newRoute);
});
}
$middleware = clone $this->app->middleware;
$this->modifyProperty($middleware, null);
unset($this->app->middleware);
$this->app->resolving(SwooleHttp::class, function ($http, App $app) use ($middleware) {
$newMiddleware = clone $middleware;
$this->modifyProperty($newMiddleware, $app);
$app->instance('middleware', $newMiddleware);
});
unset($this->app->http);
$this->app->bind(Http::class, SwooleHttp::class);
}
protected function isWebsocketRequest(Request $req)
{
$header = $req->header;
return strcasecmp(Arr::get($header, 'connection', ''), 'upgrade') === 0 &&
strcasecmp(Arr::get($header, 'upgrade', ''), 'websocket') === 0;
}
protected function prepareHttp()
{
if ($this->getConfig('http.enable', true)) {
$this->wsEnable = $this->getConfig('websocket.enable', false);
if ($this->wsEnable) {
$this->prepareWebsocket();
}
$workerNum = $this->getConfig('http.worker_num', swoole_cpu_num());
$this->addBatchWorker($workerNum, [$this, 'createHttpServer'], 'http server');
}
}
/**
* "onRequest" listener.
*
* @param Request $req
* @param Response $res
*/
public function onRequest($req, $res)
{
$this->runWithBarrier([$this, 'runInSandbox'], function (Http $http, Event $event, SwooleApp $app) use ($req, $res) {
$app->setInConsole(false);
$request = $this->prepareRequest($req);
try {
$response = $this->handleRequest($http, $request);
} catch (Throwable $e) {
$response = $this->app
->make(Handle::class)
->render($request, $e);
}
$this->setCookie($res, $app->cookie);
$this->sendResponse($res, $request, $response);
});
}
protected function handleRequest(Http $http, $request)
{
$level = ob_get_level();
ob_start();
$response = $http->run($request);
$content = $response->getContent();
if (ob_get_level() == 0) {
ob_start();
}
$http->end($response);
if (ob_get_length() > 0) {
$response->content(ob_get_contents() . $content);
}
while (ob_get_level() > $level) {
ob_end_clean();
}
return $response;
}
protected function prepareRequest(Request $req)
{
$header = $req->header ?: [];
$server = $req->server ?: [];
foreach ($header as $key => $value) {
$server['http_' . str_replace('-', '_', $key)] = $value;
}
// 重新实例化请求对象 处理swoole请求数据
/** @var \think\Request $request */
$request = $this->app->make('request', [], true);
return $request
->withHeader($header)
->withServer($server)
->withGet($req->get ?: [])
->withPost($req->post ?: [])
->withCookie($req->cookie ?: [])
->withFiles($this->getFiles($req))
->withInput($req->rawContent())
->setBaseUrl($req->server['request_uri'])
->setUrl($req->server['request_uri'] . (!empty($req->server['query_string']) ? '?' . $req->server['query_string'] : ''))
->setPathinfo(ltrim($req->server['path_info'], '/'));
}
protected function getFiles(Request $req)
{
if (empty($req->files)) {
return [];
}
return array_map(function ($file) {
if (!Arr::isAssoc($file)) {
$files = [];
foreach ($file as $f) {
$files['name'][] = $f['name'];
$files['type'][] = $f['type'];
$files['tmp_name'][] = $f['tmp_name'];
$files['error'][] = $f['error'];
$files['size'][] = $f['size'];
}
return $files;
}
return $file;
}, $req->files);
}
protected function setCookie(Response $res, Cookie $cookie)
{
foreach ($cookie->getCookie() as $name => $val) {
[$value, $expire, $option] = $val;
$res->cookie($name, $value, $expire, $option['path'], $option['domain'], (bool) $option['secure'], (bool) $option['httponly'], $option['samesite']);
}
}
protected function setHeader(Response $res, array $headers)
{
foreach ($headers as $key => $val) {
$res->header($key, $val);
}
}
protected function setStatus(Response $res, $code)
{
$res->status($code, Status::getReasonPhrase($code));
}
protected function sendResponse(Response $res, \think\Request $request, \think\Response $response)
{
switch (true) {
case $response instanceof FileResponse:
$this->sendFile($res, $request, $response);
break;
default:
$this->sendContent($res, $response);
}
}
protected function sendFile(Response $res, \think\Request $request, FileResponse $response)
{
$ifNoneMatch = $request->header('If-None-Match');
$ifRange = $request->header('If-Range');
$code = $response->getCode();
$file = $response->getFile();
$eTag = $response->getHeader('ETag');
$lastModified = $response->getHeader('Last-Modified');
$fileSize = $file->getSize();
$offset = 0;
$length = -1;
if ($ifNoneMatch == $eTag) {
$code = 304;
} elseif (!$ifRange || $ifRange === $eTag || $ifRange === $lastModified) {
$range = $request->header('Range', '');
if (Str::startsWith($range, 'bytes=')) {
[$start, $end] = explode('-', substr($range, 6), 2) + [0];
$end = ('' === $end) ? $fileSize - 1 : (int) $end;
if ('' === $start) {
$start = $fileSize - $end;
$end = $fileSize - 1;
} else {
$start = (int) $start;
}
if ($start <= $end) {
$end = min($end, $fileSize - 1);
if ($start < 0 || $start > $end) {
$code = 416;
$response->header([
'Content-Range' => sprintf('bytes */%s', $fileSize),
]);
} elseif ($end - $start < $fileSize - 1) {
$length = $end < $fileSize ? $end - $start + 1 : -1;
$offset = $start;
$code = 206;
$response->header([
'Content-Range' => sprintf('bytes %s-%s/%s', $start, $end, $fileSize),
'Content-Length' => $end - $start + 1,
]);
}
}
}
}
$this->setStatus($res, $code);
$this->setHeader($res, $response->getHeader());
if ($code >= 200 && $code < 300 && $length !== 0) {
$res->sendfile($file->getPathname(), $offset, $length);
} else {
$res->end();
}
}
protected function sendContent(Response $res, \think\Response $response)
{
// 由于开启了 Transfer-Encoding: chunked,根据 HTTP 规范,不再需要设置 Content-Length
$response->header(['Content-Length' => null]);
$this->setStatus($res, $response->getCode());
$this->setHeader($res, $response->getHeader());
$content = $response->getContent();
if ($content) {
$contentSize = strlen($content);
$chunkSize = 8192;
if ($contentSize > $chunkSize) {
$sendSize = 0;
do {
if (!$res->write(substr($content, $sendSize, $chunkSize))) {
break;
}
} while (($sendSize += $chunkSize) < $contentSize);
} else {
$res->write($content);
}
}
$res->end();
}
}
InteractsWithQueue.php
<?php
namespace think\swoole\concerns;
use Swoole\Process;
use Swoole\Timer;
use think\helper\Arr;
use think\queue\event\JobFailed;
use think\queue\Worker;
trait InteractsWithQueue
{
protected function createQueueWorkers()
{
$workers = $this->getConfig('queue.workers', []);
foreach ($workers as $queue => $options) {
if (strpos($queue, '@') !== false) {
[$queue, $connection] = explode('@', $queue);
} else {
$connection = null;
}
$workerNum = Arr::get($options, 'worker_num', 1);
$this->addBatchWorker($workerNum, function (Process\Pool $pool) use ($options, $connection, $queue) {
$delay = Arr::get($options, 'delay', 0);
$sleep = Arr::get($options, 'sleep', 3);
$tries = Arr::get($options, 'tries', 0);
$timeout = Arr::get($options, 'timeout', 60);
/** @var Worker $worker */
$worker = $this->app->make(Worker::class);
while (true) {
$timer = Timer::after($timeout * 1000, function () use ($pool) {
$pool->getProcess()->exit();
});
$this->runWithBarrier([$this, 'runInSandbox'], function () use ($connection, $queue, $delay, $sleep, $tries, $worker) {
$worker->runNextJob($connection, $queue, $delay, $sleep, $tries);
});
Timer::clear($timer);
}
}, "queue [$queue]");
}
}
public function prepareQueue()
{
if ($this->getConfig('queue.enable', false)) {
$this->listenForEvents();
$this->createQueueWorkers();
}
}
/**
* 注册事件
*/
protected function listenForEvents()
{
$this->container->event->listen(JobFailed::class, function (JobFailed $event) {
$this->logFailedJob($event);
});
}
/**
* 记录失败任务
* @param JobFailed $event
*/
protected function logFailedJob(JobFailed $event)
{
$this->container['queue.failer']->log(
$event->connection,
$event->job->getQueue(),
$event->job->getRawBody(),
$event->exception
);
}
}
InteractsWithRpcConnector.php
<?php
namespace think\swoole\concerns;
use Generator;
use Swoole\Coroutine\Client;
use think\swoole\exception\RpcClientException;
use think\swoole\rpc\File;
use think\swoole\rpc\Packer;
use think\swoole\rpc\Protocol;
trait InteractsWithRpcConnector
{
abstract protected function runWithClient($callback);
protected function recv(Client $client, callable $decoder)
{
$handler = null;
$file = null;
while ($data = $client->recv()) {
begin:
if (empty($handler)) {
[$handler, $data] = Packer::unpack($data);
}
$response = $handler->write($data);
if (!empty($response)) {
$handler = null;
if ($response instanceof File) {
$file = $response;
} else {
$result = $decoder($response);
if ($result === Protocol::FILE) {
$result = $file;
}
return $result;
}
}
if (!empty($data)) {
goto begin;
}
}
if ($data === '') {
throw new RpcClientException('Connection is closed. ' . $client->errMsg, $client->errCode);
}
if ($data === false) {
throw new RpcClientException('Error receiving data, errno=' . $client->errCode . ' errmsg=' . swoole_strerror($client->errCode), $client->errCode);
}
}
public function sendAndRecv($data, callable $decoder)
{
if (!$data instanceof Generator) {
$data = [$data];
}
return $this->runWithClient(function (Client $client) use ($decoder, $data) {
try {
foreach ($data as $string) {
if (!empty($string)) {
if ($client->send($string) === false) {
throw new RpcClientException('Send data failed. ' . $client->errMsg, $client->errCode);
}
}
}
return $this->recv($client, $decoder);
} catch (RpcClientException $e) {
$client->close();
throw $e;
}
});
}
}
InteractsWithPools.php
<?php
namespace think\swoole\concerns;
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\ConnectorInterface;
use think\App;
use think\helper\Arr;
use think\swoole\Pool;
/**
* Trait InteractsWithRpc
* @package think\swoole\concerns
* @property App $app
*/
trait InteractsWithPools
{
/**
* @return Pool
*/
public function getPools()
{
return $this->app->make(Pool::class);
}
protected function preparePools()
{
$createPools = function () {
/** @var Pool $pool */
$pools = $this->getPools();
foreach ($this->getConfig('pool', []) as $name => $config) {
$type = Arr::pull($config, 'type');
if ($type && is_subclass_of($type, ConnectorInterface::class)) {
$pool = new ConnectionPool(
Pool::pullPoolConfig($config),
$this->app->make($type),
$config
);
$pools->add($name, $pool);
//注入到app
$this->app->instance("swoole.pool.{$name}", $pool);
}
}
};
$this->onEvent('workerStart', $createPools);
}
}
InteractsWithRpcClient.php
<?php
namespace think\swoole\concerns;
use Smf\ConnectionPool\ConnectionPool;
use think\App;
use think\swoole\Pool;
use think\swoole\pool\Client;
use think\swoole\rpc\client\Connector;
use think\swoole\rpc\client\Gateway;
use think\swoole\rpc\client\Proxy;
use think\swoole\rpc\JsonParser;
use Throwable;
/**
* Trait InteractsWithRpcClient
* @package think\swoole\concerns
* @property App $app
* @property App $container
* @method Pool getPools()
*/
trait InteractsWithRpcClient
{
protected function prepareRpcClient()
{
$this->onEvent('workerStart', function () {
$this->bindRpcClientPool();
$this->bindRpcInterface();
});
}
protected function bindRpcInterface()
{
//引入rpc接口文件
if (file_exists($rpc = $this->container->getBasePath() . 'rpc.php')) {
$rpcServices = (array) include $rpc;
//绑定rpc接口
try {
foreach ($rpcServices as $name => $abstracts) {
$parserClass = $this->getConfig("rpc.client.{$name}.parser", JsonParser::class);
$tries = $this->getConfig("rpc.client.{$name}.tries", 2);
$middleware = $this->getConfig("rpc.client.{$name}.middleware", []);
$parser = $this->getApplication()->make($parserClass);
$gateway = new Gateway($this->createRpcConnector($name), $parser, $tries);
foreach ($abstracts as $abstract) {
$this->getApplication()
->bind($abstract, function (App $app) use ($middleware, $gateway, $name, $abstract) {
return $app->invokeClass(Proxy::getClassName($name, $abstract), [$gateway, $middleware]);
});
}
}
} catch (Throwable $e) {
}
}
}
protected function bindRpcClientPool()
{
if (!empty($clients = $this->getConfig('rpc.client'))) {
//创建client连接池
foreach ($clients as $name => $config) {
$pool = new ConnectionPool(
Pool::pullPoolConfig($config),
new Client(),
$config
);
$this->getPools()->add("rpc.client.{$name}", $pool);
}
}
}
protected function createRpcConnector($name)
{
$pool = $this->getPools()->get("rpc.client.{$name}");
return new class($pool) implements Connector {
use InteractsWithRpcConnector;
protected $pool;
public function __construct(ConnectionPool $pool)
{
$this->pool = $pool;
}
protected function runWithClient($callback)
{
/** @var \Swoole\Coroutine\Client $client */
$client = $this->pool->borrow();
try {
return $callback($client);
} finally {
$this->pool->return($client);
}
}
};
}
}
WithContainer.php
<?php
namespace think\swoole\concerns;
use think\App;
use think\console\Output;
use think\exception\Handle;
use Throwable;
trait WithContainer
{
/**
* @var App
*/
protected $container;
/**
* Manager constructor.
* @param App $container
*/
public function __construct(App $container)
{
$this->container = $container;
}
protected function getContainer()
{
return $this->container;
}
/**
* 获取配置
* @param string $name
* @param null $default
* @return mixed
*/
public function getConfig(string $name, $default = null)
{
return $this->container->config->get("swoole.{$name}", $default);
}
/**
* 触发事件
* @param string $event
* @param null $params
*/
public function triggerEvent(string $event, $params = null): void
{
$this->container->event->trigger("swoole.{$event}", $params);
}
/**
* 监听事件
* @param string $event
* @param $listener
* @param bool $first
*/
public function onEvent(string $event, $listener, bool $first = false): void
{
$this->container->event->listen("swoole.{$event}", $listener, $first);
}
/**
* Log server error.
*
* @param Throwable $e
*/
public function logServerError(Throwable $e)
{
/** @var Handle $handle */
$handle = $this->container->make(Handle::class);
$handle->renderForConsole(new Output(), $e);
$handle->report($e);
}
}
InteractsWithTracing.php
<?php
namespace think\swoole\concerns;
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\PhpRedisConnector;
use Swoole\Coroutine;
use think\helper\Arr;
use think\swoole\coroutine\Context;
use think\swoole\Pool;
use think\tracing\reporter\RedisReporter;
use think\tracing\Tracer;
/**
* 链路追踪上报进程
*/
trait InteractsWithTracing
{
protected function prepareTracing()
{
if (class_exists(Tracer::class)) {
$tracers = $this->container->config->get('tracing.tracers');
$hasAsync = false;
foreach ($tracers as $name => $tracer) {
if (Arr::get($tracer, 'async', false)) {
$this->addWorker(function () use ($name) {
$tracer = $this->app->make(Tracer::class)->tracer($name);
$tracer->report();
}, "tracing [{$name}]");
$hasAsync = true;
}
}
if ($hasAsync) {
$this->onEvent('workerStart', function () {
$this->bindTracingRedisPool();
$this->bindTracingRedisReporter();
});
}
}
}
protected function bindTracingRedisReporter()
{
$this->getApplication()->bind(RedisReporter::class, function ($name) {
$pool = $this->getPools()->get("tracing.redis");
$redis = Context::rememberData('tracing.redis', function () use ($pool) {
$redis = $pool->borrow();
Coroutine::defer(function () use ($pool, $redis) {
$pool->return($redis);
});
return $redis;
});
return new RedisReporter($name, $redis);
});
}
protected function bindTracingRedisPool()
{
$config = $this->container->config->get('tracing.redis');
$pool = new ConnectionPool(
Pool::pullPoolConfig($config),
new PhpRedisConnector(),
$config
);
$this->getPools()->add("tracing.redis", $pool);
}
}
config
swoole.php
<?php
use think\swoole\websocket\socketio\Handler;
return [
'http' => [
'enable' => true,
'host' => '0.0.0.0',
'port' => 80,
'worker_num' => swoole_cpu_num(),
'options' => [],
],
'websocket' => [
'enable' => false,
'handler' => Handler::class,
'ping_interval' => 25000,
'ping_timeout' => 60000,
'room' => [
'type' => 'table',
'table' => [
'room_rows' => 8192,
'room_size' => 2048,
'client_rows' => 4096,
'client_size' => 2048,
],
'redis' => [
'host' => '127.0.0.1',
'port' => 6379,
'max_active' => 3,
'max_wait_time' => 5,
],
],
'listen' => [],
'subscribe' => [],
],
'rpc' => [
'server' => [
'enable' => false,
'host' => '0.0.0.0',
'port' => 9000,
'worker_num' => swoole_cpu_num(),
'services' => [],
],
'client' => [],
],
//队列
'queue' => [
'enable' => false,
'workers' => [],
],
'hot_update' => [
'enable' => env('APP_DEBUG', false),
'name' => ['*.php'],
'include' => [app_path()],
'exclude' => [],
],
//连接池
'pool' => [
'db' => [
'enable' => true,
'max_active' => 3,
'max_wait_time' => 5,
],
'cache' => [
'enable' => true,
'max_active' => 3,
'max_wait_time' => 5,
],
//自定义连接池
],
'tables' => [],
//每个worker里需要预加载以共用的实例
'concretes' => [],
//重置器
'resetters' => [],
//每次请求前需要清空的实例
'instances' => [],
//每次请求前需要重新执行的服务
'services' => [],
];
contract
websocket
HandlerInterface.php
<?php
namespace think\swoole\contract\websocket;
use Swoole\WebSocket\Frame;
use think\Request;
interface HandlerInterface
{
/**
* "onOpen" listener.
*
* @param Request $request
*/
public function onOpen(Request $request);
/**
* "onMessage" listener.
*
* @param Frame $frame
*/
public function onMessage(Frame $frame);
/**
* "onClose" listener.
*/
public function onClose();
public function encodeMessage($message);
}
RoomInterface.php
<?php
namespace think\swoole\contract\websocket;
interface RoomInterface
{
/**
* Rooms key
*
* @const string
*/
public const ROOMS_KEY = 'rooms';
/**
* Descriptors key
*
* @const string
*/
public const DESCRIPTORS_KEY = 'fds';
/**
* Do some init stuffs before workers started.
*
* @return RoomInterface
*/
public function prepare(): RoomInterface;
/**
* Add multiple socket fds to a room.
*
* @param string fd
* @param array|string $roomNames
*/
public function add($fd, $roomNames);
/**
* Delete multiple socket fds from a room.
*
* @param string fd
* @param array|string $roomNames
*/
public function delete($fd, $roomNames);
/**
* Get all sockets by a room key.
*
* @param string room
*
* @return array
*/
public function getClients(string $room);
/**
* Get all rooms by a fd.
*
* @param string fd
*
* @return array
*/
public function getRooms($fd);
}
ResetterInterface.php
<?php
namespace think\swoole\contract;
use think\App;
use think\swoole\Sandbox;
interface ResetterInterface
{
/**
* "handle" function for resetting app.
*
* @param \think\App $app
* @param Sandbox $sandbox
*/
public function handle(App $app, Sandbox $sandbox);
}
rpc
ParserInterface.php
<?php
namespace think\swoole\contract\rpc;
use think\swoole\rpc\Protocol;
interface ParserInterface
{
const EOF = "\r\n\r\n";
/**
* @param Protocol $protocol
*
* @return string
*/
public function encode(Protocol $protocol): string;
/**
* @param string $string
*
* @return Protocol
*/
public function decode(string $string): Protocol;
/**
* @param string $string
*
* @return mixed
*/
public function decodeResponse(string $string);
/**
* @param mixed $result
*
* @return string
*/
public function encodeResponse($result): string;
}
coroutine
Context.php
<?php
namespace think\swoole\coroutine;
use ArrayObject;
use Closure;
use Swoole\Coroutine;
class Context
{
/**
* 获取协程上下文
* @param int $cid
* @return Coroutine\Context
*/
public static function get($cid = 0)
{
return Coroutine::getContext($cid);
}
public static function getDataObject()
{
$context = self::get();
if (!isset($context['#data'])) {
$context['#data'] = new ArrayObject();
}
return $context['#data'];
}
/**
* 获取当前协程临时数据
* @param string $key
* @param null $default
* @return mixed|null
*/
public static function getData(string $key, $default = null)
{
if (self::hasData($key)) {
return self::getDataObject()->offsetGet($key);
}
return $default;
}
/**
* 判断是否存在临时数据
* @param string $key
* @return bool
*/
public static function hasData(string $key)
{
return self::getDataObject()->offsetExists($key);
}
/**
* 写入临时数据
* @param string $key
* @param $value
*/
public static function setData(string $key, $value)
{
self::getDataObject()->offsetSet($key, $value);
}
/**
* 删除数据
* @param string $key
*/
public static function removeData(string $key)
{
if (self::hasData($key)) {
self::getDataObject()->offsetUnset($key);
}
}
/**
* 如果不存在则写入数据
* @param string $key
* @param $value
* @return mixed|null
*/
public static function rememberData(string $key, $value)
{
if (self::hasData($key)) {
return self::getData($key);
}
if ($value instanceof Closure) {
// 获取缓存数据
$value = $value();
}
self::setData($key, $value);
return $value;
}
/**
* @internal
* 清空数据
*/
public static function clear()
{
self::getDataObject()->exchangeArray([]);
}
/**
* 获取当前协程ID
* @return mixed
*/
public static function getCoroutineId()
{
return Coroutine::getuid();
}
}
exception
RpcResponseException.php
<?php
namespace think\swoole\exception;
use Exception;
use think\swoole\rpc\Error;
class RpcResponseException extends Exception
{
protected $error;
public function __construct(Error $error)
{
parent::__construct($error->getMessage(), $error->getCode());
$this->error = $error;
}
public function getError()
{
return $this->error;
}
}
RpcClientException.php
<?php
namespace think\swoole\exception;
use Exception;
class RpcClientException extends Exception
{
}
middleware
TraceRpcServer.php
<?php
namespace think\swoole\middleware;
use think\swoole\rpc\Protocol;
use think\tracing\Tracer;
use Throwable;
use const OpenTracing\Formats\TEXT_MAP;
use const OpenTracing\Tags\ERROR;
use const OpenTracing\Tags\SPAN_KIND;
use const OpenTracing\Tags\SPAN_KIND_RPC_SERVER;
class TraceRpcServer
{
protected $tracer;
public function __construct(Tracer $tracer)
{
$this->tracer = $tracer;
}
public function handle(Protocol $protocol, $next)
{
$context = $this->tracer->extract(TEXT_MAP, $protocol->getContext());
$scope = $this->tracer->startActiveSpan(
'rpc.server:' . $protocol->getInterface() . '@' . $protocol->getMethod(),
[
'child_of' => $context,
'tags' => [
SPAN_KIND => SPAN_KIND_RPC_SERVER,
],
]
);
$span = $scope->getSpan();
try {
return $next($protocol);
} catch (Throwable $e) {
$span->setTag(ERROR, $e);
throw $e;
} finally {
$scope->close();
$this->tracer->flush();
}
}
}
TraceRpcClient.php
<?php
namespace think\swoole\middleware;
use think\swoole\exception\RpcResponseException;
use think\swoole\rpc\Protocol;
use think\tracing\Tracer;
use Throwable;
use const OpenTracing\Formats\TEXT_MAP;
use const OpenTracing\Tags\ERROR;
use const OpenTracing\Tags\SPAN_KIND;
use const OpenTracing\Tags\SPAN_KIND_RPC_CLIENT;
class TraceRpcClient
{
protected $tracer;
public function __construct(Tracer $tracer)
{
$this->tracer = $tracer;
}
public function handle(Protocol $protocol, $next)
{
$scope = $this->tracer->startActiveSpan(
'rpc.client:' . $protocol->getInterface() . '@' . $protocol->getMethod(),
[
'tags' => [
SPAN_KIND => SPAN_KIND_RPC_CLIENT,
],
]
);
$span = $scope->getSpan();
$context = $protocol->getContext();
$this->tracer->inject($span->getContext(), TEXT_MAP, $context);
$protocol->setContext($context);
try {
return $next($protocol);
} catch (Throwable $e) {
if (!$e instanceof RpcResponseException) {
$span->setTag(ERROR, $e);
}
throw $e;
} finally {
$scope->close();
}
}
}
InteractsWithVarDumper.php
<?php
namespace think\swoole\middleware;
use Closure;
use Symfony\Component\VarDumper\Cloner\VarCloner;
use Symfony\Component\VarDumper\Dumper\HtmlDumper;
use Symfony\Component\VarDumper\VarDumper;
use think\Request;
use think\Response;
class InteractsWithVarDumper
{
/**
* @param Request $request
* @param Closure $next
* @return Response
*/
public function handle($request, Closure $next)
{
if (class_exists(VarDumper::class)) {
$cloner = new VarCloner();
$dumper = new HtmlDumper();
$prevHandler = VarDumper::setHandler(function ($var) use ($dumper, $cloner) {
$dumper->dump($cloner->cloneVar($var));
});
/** @var Response $response */
$response = $next($request);
VarDumper::setHandler($prevHandler);
return $response;
}
return $next($request);
}
}
pool
proxy
Connection.php
<?php
namespace think\swoole\pool\proxy;
use Psr\SimpleCache\CacheInterface;
use think\db\BaseQuery;
use think\db\ConnectionInterface;
use think\DbManager;
use think\swoole\pool\Proxy;
/**
* Class Connection
* @package think\swoole\pool\db
* @property ConnectionInterface $handler
*/
class Connection extends Proxy implements ConnectionInterface
{
/**
* 获取当前连接器类对应的Query类
* @access public
* @return string
*/
public function getQueryClass(): string
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 连接数据库方法
* @access public
* @param array $config 接参数
* @param integer $linkNum 连接序号
* @return mixed
*/
public function connect(array $config = [], $linkNum = 0)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 设置当前的数据库Db对象
* @access public
* @param DbManager $db
* @return void
*/
public function setDb(DbManager $db)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 设置当前的缓存对象
* @access public
* @param CacheInterface $cache
* @return void
*/
public function setCache(CacheInterface $cache)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 获取数据库的配置参数
* @access public
* @param string $config 配置名称
* @return mixed
*/
public function getConfig(string $config = '')
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 关闭数据库(或者重新连接)
* @access public
*/
public function close()
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 查找单条记录
* @access public
* @param BaseQuery $query 查询对象
* @return array
*/
public function find(BaseQuery $query): array
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 查找记录
* @access public
* @param BaseQuery $query 查询对象
* @return array
*/
public function select(BaseQuery $query): array
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 插入记录
* @access public
* @param BaseQuery $query 查询对象
* @param boolean $getLastInsID 返回自增主键
* @return mixed
*/
public function insert(BaseQuery $query, bool $getLastInsID = false)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 批量插入记录
* @access public
* @param BaseQuery $query 查询对象
* @param mixed $dataSet 数据集
* @return integer
* @throws \Exception
* @throws \Throwable
*/
public function insertAll(BaseQuery $query, array $dataSet = []): int
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 更新记录
* @access public
* @param BaseQuery $query 查询对象
* @return integer
*/
public function update(BaseQuery $query): int
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 删除记录
* @access public
* @param BaseQuery $query 查询对象
* @return int
*/
public function delete(BaseQuery $query): int
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 得到某个字段的值
* @access public
* @param BaseQuery $query 查询对象
* @param string $field 字段名
* @param mixed $default 默认值
* @return mixed
*/
public function value(BaseQuery $query, string $field, $default = null)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 得到某个列的数组
* @access public
* @param BaseQuery $query 查询对象
* @param string|array $column 字段名 多个字段用逗号分隔
* @param string $key 索引
* @return array
*/
public function column(BaseQuery $query, $column, string $key = ''): array
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 执行数据库事务
* @access public
* @param callable $callback 数据操作方法回调
* @return mixed
* @throws \Throwable
*/
public function transaction(callable $callback)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 启动事务
* @access public
* @return void
* @throws \Exception
*/
public function startTrans()
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 用于非自动提交状态下面的查询提交
* @access public
* @return void
*/
public function commit()
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 事务回滚
* @access public
* @return void
*/
public function rollback()
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* 获取最近一次查询的sql语句
* @access public
* @return string
*/
public function getLastSql(): string
{
return $this->__call(__FUNCTION__, func_get_args());
}
public function table($table)
{
return $this->__call(__FUNCTION__, func_get_args());
}
public function name($name)
{
return $this->__call(__FUNCTION__, func_get_args());
}
}
Store.php
<?php
namespace think\swoole\pool\proxy;
use Psr\SimpleCache\CacheInterface;
use think\contract\CacheHandlerInterface;
use think\swoole\pool\Proxy;
class Store extends Proxy implements CacheHandlerInterface, CacheInterface
{
/**
* @inheritDoc
*/
public function has($name)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function get($name, $default = null)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function set($name, $value, $expire = null)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function inc(string $name, int $step = 1)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function dec(string $name, int $step = 1)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function delete($name)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function clear()
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function clearTag(array $keys)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function getMultiple($keys, $default = null)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function setMultiple($values, $ttl = null)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function deleteMultiple($keys)
{
return $this->__call(__FUNCTION__, func_get_args());
}
/**
* @inheritDoc
*/
public function tag($name)
{
return $this->__call(__FUNCTION__, func_get_args());
}
}
Cache.php
<?php
namespace think\swoole\pool;
use think\swoole\pool\proxy\Store;
class Cache extends \think\Cache
{
protected function createDriver(string $name)
{
return new Store(function () use ($name) {
return parent::createDriver($name);
}, $this->app->config->get('swoole.pool.cache', []));
}
}
Connector.php
<?php
namespace think\swoole\pool;
use Smf\ConnectionPool\Connectors\ConnectorInterface;
class Connector implements ConnectorInterface
{
protected $connector;
public function __construct($connector)
{
$this->connector = $connector;
}
public function connect(array $config)
{
return call_user_func($this->connector, $config);
}
public function disconnect($connection)
{
}
public function isConnected($connection): bool
{
return !property_exists($connection, Proxy::KEY_DISCONNECTED);
}
public function reset($connection, array $config)
{
}
public function validate($connection): bool
{
return true;
}
}
Db.php
<?php
namespace think\swoole\pool;
use think\Config;
use think\db\ConnectionInterface;
use think\swoole\pool\proxy\Connection;
/**
* Class Db
* @package think\swoole\pool
* @property Config $config
*/
class Db extends \think\Db
{
protected function createConnection(string $name): ConnectionInterface
{
return new Connection(new class(function () use ($name) {
return parent::createConnection($name);
}) extends Connector {
public function disconnect($connection)
{
if ($connection instanceof ConnectionInterface) {
$connection->close();
}
}
}, $this->config->get('swoole.pool.db', []));
}
}
Proxy.php
<?php
namespace think\swoole\pool;
use Closure;
use Exception;
use RuntimeException;
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\ConnectorInterface;
use Swoole\Coroutine;
use think\swoole\coroutine\Context;
use think\swoole\Pool;
use Throwable;
abstract class Proxy
{
const KEY_RELEASED = '__released';
const KEY_DISCONNECTED = '__disconnected';
protected $pool;
/**
* Proxy constructor.
* @param Closure|ConnectorInterface $connector
* @param array $config
*/
public function __construct($connector, $config, array $connectionConfig = [])
{
if ($connector instanceof Closure) {
$connector = new Connector($connector);
}
$this->pool = new ConnectionPool(
Pool::pullPoolConfig($config),
$connector,
$connectionConfig
);
$this->pool->init();
}
protected function getPoolConnection()
{
return Context::rememberData('connection.' . spl_object_id($this), function () {
$connection = $this->pool->borrow();
$connection->{static::KEY_RELEASED} = false;
Coroutine::defer(function () use ($connection) {
//自动释放
$this->releaseConnection($connection);
});
return $connection;
});
}
protected function releaseConnection($connection)
{
if ($connection->{static::KEY_RELEASED}) {
return;
}
$connection->{static::KEY_RELEASED} = true;
$this->pool->return($connection);
}
public function release()
{
$connection = $this->getPoolConnection();
$this->releaseConnection($connection);
}
public function __call($method, $arguments)
{
$connection = $this->getPoolConnection();
if ($connection->{static::KEY_RELEASED}) {
throw new RuntimeException('Connection already has been released!');
}
try {
return $connection->{$method}(...$arguments);
} catch (Exception|Throwable $e) {
$connection->{static::KEY_DISCONNECTED} = true;
throw $e;
}
}
}
Client.php
<?php
namespace think\swoole\pool;
use Smf\ConnectionPool\Connectors\ConnectorInterface;
use think\helper\Arr;
class Client implements ConnectorInterface
{
/**
* Connect to the specified Server and returns the connection resource
* @param array $config
* @return \Swoole\Coroutine\Client
*/
public function connect(array $config)
{
$client = new \Swoole\Coroutine\Client(SWOOLE_SOCK_TCP);
$host = Arr::pull($config, 'host');
$port = Arr::pull($config, 'port');
$timeout = Arr::pull($config, 'timeout', 5);
$client->set($config);
$client->connect($host, $port, $timeout);
return $client;
}
/**
* Disconnect and free resources
* @param \Swoole\Coroutine\Client $connection
* @return mixed
*/
public function disconnect($connection)
{
$connection->close();
}
/**
* Whether the connection is established
* @param \Swoole\Coroutine\Client $connection
* @return bool
*/
public function isConnected($connection): bool
{
return $connection->isConnected() && $connection->peek() !== '';
}
/**
* Reset the connection
* @param \Swoole\Coroutine\Client $connection
* @param array $config
* @return mixed
*/
public function reset($connection, array $config)
{
}
/**
* Validate the connection
*
* @param \Swoole\Coroutine\Client $connection
* @return bool
*/
public function validate($connection): bool
{
return $connection instanceof \Swoole\Coroutine\Client;
}
}
resetters
ResetModel.php
<?php
namespace think\swoole\resetters;
use think\App;
use think\Model;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;
class ResetModel implements ResetterInterface
{
public function handle(App $app, Sandbox $sandbox)
{
if (class_exists(Model::class)) {
Model::setInvoker(function (...$args) use ($sandbox) {
return $sandbox->getApplication()->invoke(...$args);
});
}
}
}
ResetConfig.php
<?php
namespace think\swoole\resetters;
use think\App;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;
class ResetConfig implements ResetterInterface
{
public function handle(App $app, Sandbox $sandbox)
{
$app->instance('config', clone $sandbox->getConfig());
return $app;
}
}
ClearInstances.php
<?php
namespace think\swoole\resetters;
use think\App;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;
class ClearInstances implements ResetterInterface
{
public function handle(App $app, Sandbox $sandbox)
{
$instances = ['log', 'session', 'view', 'response', 'cookie'];
$instances = array_merge($instances, $sandbox->getConfig()->get('swoole.instances', []));
foreach ($instances as $instance) {
$app->delete($instance);
}
return $app;
}
}
ResetEvent.php
<?php
namespace think\swoole\resetters;
use think\App;
use think\swoole\concerns\ModifyProperty;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;
/**
* Class ResetEvent
* @package think\swoole\resetters
*/
class ResetEvent implements ResetterInterface
{
use ModifyProperty;
public function handle(App $app, Sandbox $sandbox)
{
$event = clone $sandbox->getEvent();
$this->modifyProperty($event, $app);
$app->instance('event', $event);
return $app;
}
}
ResetService.php
<?php
namespace think\swoole\resetters;
use think\App;
use think\swoole\concerns\ModifyProperty;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;
/**
* Class ResetService
* @package think\swoole\resetters
*/
class ResetService implements ResetterInterface
{
use ModifyProperty;
/**
* "handle" function for resetting app.
*
* @param App $app
* @param Sandbox $sandbox
*/
public function handle(App $app, Sandbox $sandbox)
{
foreach ($sandbox->getServices() as $service) {
$this->modifyProperty($service, $app);
if (method_exists($service, 'register')) {
$service->register();
}
if (method_exists($service, 'boot')) {
$app->invoke([$service, 'boot']);
}
}
}
}
ResetPaginator.php
<?php
namespace think\swoole\resetters;
use think\App;
use think\Paginator;
use think\swoole\contract\ResetterInterface;
use think\swoole\Sandbox;
class ResetPaginator implements ResetterInterface
{
public function handle(App $app, Sandbox $sandbox)
{
Paginator::currentPathResolver(function () use ($sandbox) {
return $sandbox->getApplication()->request->baseUrl();
});
Paginator::currentPageResolver(function ($varPage = 'page') use ($sandbox) {
$page = $sandbox->getApplication()->request->param($varPage);
if (filter_var($page, FILTER_VALIDATE_INT) !== false && (int) $page >= 1) {
return (int) $page;
}
return 1;
});
}
}
response
File.php
<?php
namespace think\swoole\response;
use DateTime;
use RuntimeException;
use SplFileInfo;
use think\Response;
class File extends Response
{
public const DISPOSITION_ATTACHMENT = 'attachment';
public const DISPOSITION_INLINE = 'inline';
protected $header = [
'Content-Type' => 'application/octet-stream',
'Accept-Ranges' => 'bytes',
];
/**
* @var SplFileInfo
*/
protected $file;
public function __construct($file, string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true)
{
$this->setFile($file, $contentDisposition, $autoEtag, $autoLastModified, $autoContentType);
}
public function getFile()
{
return $this->file;
}
public function setFile($file, string $contentDisposition = null, bool $autoEtag = true, bool $autoLastModified = true, bool $autoContentType = true)
{
if (!$file instanceof SplFileInfo) {
$file = new SplFileInfo((string) $file);
}
if (!$file->isReadable()) {
throw new RuntimeException('File must be readable.');
}
$this->header['Content-Length'] = $file->getSize();
$this->file = $file;
if ($autoEtag) {
$this->setAutoEtag();
}
if ($autoLastModified) {
$this->setAutoLastModified();
}
if ($contentDisposition) {
$this->setContentDisposition($contentDisposition);
}
if ($autoContentType) {
$this->setAutoContentType();
}
return $this;
}
public function setAutoContentType()
{
$finfo = finfo_open(FILEINFO_MIME_TYPE);
$mimeType = finfo_file($finfo, $this->file->getPathname());
if ($mimeType) {
$this->header['Content-Type'] = $mimeType;
}
}
public function setContentDisposition(string $disposition, string $filename = '')
{
if ('' === $filename) {
$filename = $this->file->getFilename();
}
$this->header['Content-Disposition'] = "{$disposition}; filename=\"{$filename}\"";
return $this;
}
public function setAutoLastModified()
{
$date = DateTime::createFromFormat('U', $this->file->getMTime());
return $this->lastModified($date->format('D, d M Y H:i:s') . ' GMT');
}
public function setAutoEtag()
{
$eTag = "W/\"" . sha1_file($this->file->getPathname()) . "\"";
return $this->eTag($eTag);
}
protected function sendData(string $data): void
{
readfile($this->file->getPathname());
}
}
rpc
Protocol.php
<?php
namespace think\swoole\rpc;
class Protocol
{
const ACTION_INTERFACE = '@action_interface';
const FILE = '@file';
/**
* @var string
*/
private $interface = '';
/**
* @var string
*/
private $method = '';
/**
* @var array
*/
private $params = [];
/**
* @var array
*/
private $context = [];
/**
* Replace constructor
*
* @param string $interface
* @param string $method
* @param array $params
* @param array $context
* @return Protocol
*/
public static function make(string $interface, string $method, array $params, array $context = [])
{
$instance = new static();
$instance->interface = $interface;
$instance->method = $method;
$instance->params = $params;
$instance->context = $context;
return $instance;
}
/**
* @return string
*/
public function getInterface(): string
{
return $this->interface;
}
/**
* @return string
*/
public function getMethod(): string
{
return $this->method;
}
/**
* @return array
*/
public function getParams(): array
{
return $this->params;
}
/**
* @return array
*/
public function getContext(): array
{
return $this->context;
}
/**
* @param string $interface
*/
public function setInterface(string $interface): void
{
$this->interface = $interface;
}
/**
* @param string $method
*/
public function setMethod(string $method): void
{
$this->method = $method;
}
/**
* @param array $params
*/
public function setParams(array $params): void
{
$this->params = $params;
}
/**
* @param array $context
*/
public function setContext(array $context): void
{
$this->context = $context;
}
}
File.php
<?php
namespace think\swoole\rpc;
use Throwable;
class File extends \think\File
{
public function __destruct()
{
//销毁时删除临时文件
try {
if (file_exists($this->getPathname())) {
unlink($this->getPathname());
}
} catch (Throwable $e) {
}
}
}
server
Dispatcher.php
<?php
namespace think\swoole\rpc\server;
use ReflectionClass;
use ReflectionException;
use ReflectionMethod;
use ReflectionNamedType;
use RuntimeException;
use Swoole\Coroutine\Server\Connection;
use think\App;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\Middleware;
use think\swoole\rpc\Error;
use think\swoole\rpc\Packer;
use think\swoole\rpc\Protocol;
use think\swoole\rpc\Sendfile;
use Throwable;
class Dispatcher
{
use Sendfile;
/**
* Parser error
*/
const PARSER_ERROR = -32700;
/**
* Invalid Request
*/
const INVALID_REQUEST = -32600;
/**
* Method not found
*/
const METHOD_NOT_FOUND = -32601;
/**
* Invalid params
*/
const INVALID_PARAMS = -32602;
/**
* Internal error
*/
const INTERNAL_ERROR = -32603;
protected $parser;
protected $services = [];
protected $middleware = [];
public function __construct(ParserInterface $parser, $services, $middleware = [])
{
$this->parser = $parser;
$this->prepareServices($services);
$this->middleware = $middleware;
}
/**
* 获取服务接口
* @param $services
* @throws ReflectionException
*/
protected function prepareServices($services)
{
foreach ($services as $className) {
$reflectionClass = new ReflectionClass($className);
$interfaces = $reflectionClass->getInterfaceNames();
if (!empty($interfaces)) {
foreach ($interfaces as $interface) {
$this->services[class_basename($interface)] = [
'interface' => $interface,
'class' => $className,
];
}
} else {
$this->services[class_basename($className)] = [
'interface' => $className,
'class' => $className,
];
}
}
}
/**
* 获取接口信息
* @return array
*/
protected function getInterfaces()
{
$interfaces = [];
foreach ($this->services as $key => ['interface' => $interface]) {
$interfaces[$key] = $this->getMethods($interface);
}
return $interfaces;
}
protected function getMethods($interface)
{
$methods = [];
$reflection = new ReflectionClass($interface);
foreach ($reflection->getMethods(ReflectionMethod::IS_PUBLIC) as $method) {
if ($method->isConstructor() || $method->isDestructor()) {
continue;
}
$returnType = $method->getReturnType();
if ($returnType instanceof ReflectionNamedType) {
$returnType = $returnType->getName();
}
$methods[$method->getName()] = [
'parameters' => $this->getParameters($method),
'returnType' => $returnType,
'comment' => $method->getDocComment(),
];
}
return $methods;
}
protected function getParameters(ReflectionMethod $method)
{
$parameters = [];
foreach ($method->getParameters() as $parameter) {
$type = $parameter->getType();
if ($type instanceof ReflectionNamedType) {
$type = $type->getName();
}
$param = [
'name' => $parameter->getName(),
'type' => $type,
];
if ($parameter->isOptional()) {
$param['default'] = $parameter->getDefaultValue();
}
if ($parameter->allowsNull()) {
$param['nullable'] = true;
}
$parameters[] = $param;
}
return $parameters;
}
/**
* 调度
* @param App $app
* @param Connection $conn
* @param string|Error $data
* @param array $files
*/
public function dispatch(App $app, Connection $conn, $data, $files = [])
{
try {
switch (true) {
case $data instanceof Error:
$result = $data;
break;
case $data === Protocol::ACTION_INTERFACE:
$result = $this->getInterfaces();
break;
default:
$protocol = $this->parser->decode($data);
$result = $this->dispatchWithMiddleware($app, $protocol, $files);
}
} catch (Throwable $e) {
$result = Error::make($e->getCode(), $e->getMessage());
}
//传输文件
if ($result instanceof \think\File) {
foreach ($this->fread($result) as $string) {
if (!empty($string)) {
$conn->send($string);
}
}
$result = Protocol::FILE;
}
$data = $this->parser->encodeResponse($result);
$conn->send(Packer::pack($data));
}
protected function dispatchWithMiddleware(App $app, Protocol $protocol, $files)
{
$interface = $protocol->getInterface();
$method = $protocol->getMethod();
$params = $protocol->getParams();
//文件参数
foreach ($params as $index => $param) {
if ($param === Protocol::FILE) {
$params[$index] = array_shift($files);
}
}
$service = $this->services[$interface] ?? null;
if (empty($service)) {
throw new RuntimeException(
sprintf('Service %s is not founded!', $interface),
self::METHOD_NOT_FOUND
);
}
$instance = $app->make($service['class']);
$middlewares = array_merge($this->middleware, $this->getServiceMiddlewares($instance, $method));
return Middleware::make($app, $middlewares)
->pipeline()
->send($protocol)
->then(function () use ($instance, $method, $params) {
return call_user_func_array([$instance, $method], $params);
});
}
protected function getServiceMiddlewares($service, $method)
{
$middlewares = [];
$class = new ReflectionClass($service);
if ($class->hasProperty('middleware')) {
$reflectionProperty = $class->getProperty('middleware');
$reflectionProperty->setAccessible(true);
foreach ($reflectionProperty->getValue($service) as $key => $val) {
if (!is_int($key)) {
$middleware = $key;
$options = $val;
} elseif (isset($val['middleware'])) {
$middleware = $val['middleware'];
$options = $val['options'] ?? [];
} else {
$middleware = $val;
$options = [];
}
if ((isset($options['only']) && !in_array($method, (array) $options['only'])) ||
(!empty($options['except']) && in_array($method, (array) $options['except']))) {
continue;
}
if (is_string($middleware) && strpos($middleware, ':')) {
$middleware = explode(':', $middleware);
if (count($middleware) > 1) {
$middleware = [$middleware[0], array_slice($middleware, 1)];
}
}
$middlewares[] = $middleware;
}
}
return $middlewares;
}
}
Channel.php
<?php
namespace think\swoole\rpc\server;
use Swoole\Coroutine;
use think\swoole\rpc\packer\Buffer;
use think\swoole\rpc\packer\File;
class Channel
{
protected $header;
protected $queue;
public function __construct($handler)
{
$this->queue = new Coroutine\Channel(1);
Coroutine::create(function () use ($handler) {
$this->queue->push($handler);
});
}
/**
* @return File|Buffer
*/
public function pop()
{
return $this->queue->pop();
}
public function push($handle)
{
return $this->queue->push($handle);
}
public function close()
{
return $this->queue->close();
}
}
Packer.php
<?php
namespace think\swoole\rpc;
use RuntimeException;
use think\swoole\rpc\packer\Buffer;
use think\swoole\rpc\packer\File;
class Packer
{
public const HEADER_SIZE = 8;
public const HEADER_STRUCT = 'Nlength/Ntype';
public const HEADER_PACK = 'NN';
public const TYPE_BUFFER = 0;
public const TYPE_FILE = 1;
public static function pack($data, $type = self::TYPE_BUFFER)
{
return pack(self::HEADER_PACK, strlen($data), $type) . $data;
}
/**
* @param $data
* @return array<Buffer|File|string>
*/
public static function unpack($data)
{
$header = unpack(self::HEADER_STRUCT, substr($data, 0, self::HEADER_SIZE));
if ($header === false) {
throw new RuntimeException('Invalid Header');
}
switch ($header['type']) {
case Packer::TYPE_BUFFER:
$handler = new Buffer($header['length']);
break;
case Packer::TYPE_FILE:
$handler = new File($header['length']);
break;
default:
throw new RuntimeException("unsupported data type: [{$header['type']}");
}
$data = substr($data, self::HEADER_SIZE);
return [$handler, $data];
}
}
Sendfile.php
<?php
namespace think\swoole\rpc;
trait Sendfile
{
protected function fread(\think\File $file)
{
try {
$handle = fopen($file->getPathname(), 'rb');
yield pack(Packer::HEADER_PACK, $file->getSize(), Packer::TYPE_FILE);
while (!feof($handle)) {
yield fread($handle, 8192);
}
} finally {
fclose($handle);
}
}
}
Error.php
<?php
namespace think\swoole\rpc;
class Error implements \JsonSerializable
{
/**
* @var int
*/
protected $code = 0;
/**
* @var string
*/
protected $message = '';
/**
* @var mixed
*/
protected $data;
/**
* @param int $code
* @param string $message
* @param mixed $data
*
* @return Error
*/
public static function make(int $code, string $message, $data = null): self
{
$instance = new static();
$instance->code = $code;
$instance->message = $message;
$instance->data = $data;
return $instance;
}
/**
* @return int
*/
public function getCode(): int
{
return $this->code;
}
/**
* @return string
*/
public function getMessage(): string
{
return $this->message;
}
/**
* @return mixed
*/
public function getData()
{
return $this->data;
}
public function jsonSerialize()
{
return [
'code' => $this->code,
'message' => $this->message,
'data' => $this->data,
];
}
}
JsonParser.php
<?php
namespace think\swoole\rpc;
use Exception;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\rpc\server\Dispatcher;
class JsonParser implements ParserInterface
{
/**
* Json-rpc version
*/
const VERSION = '2.0';
const DELIMITER = "@";
/**
* @param Protocol $protocol
*
* @return string
*/
public function encode(Protocol $protocol): string
{
$interface = $protocol->getInterface();
$methodName = $protocol->getMethod();
$method = $interface . self::DELIMITER . $methodName;
$data = [
'jsonrpc' => self::VERSION,
'method' => $method,
'params' => $protocol->getParams(),
'context' => $protocol->getContext(),
'id' => '',
];
return json_encode($data, JSON_UNESCAPED_UNICODE);
}
/**
* @param string $string
*
* @return Protocol
*/
public function decode(string $string): Protocol
{
$data = json_decode($string, true);
$error = json_last_error();
if ($error != JSON_ERROR_NONE) {
throw new Exception(
sprintf('Data(%s) is not json format!', $string),
Dispatcher::PARSER_ERROR
);
}
$method = $data['method'] ?? '';
$params = $data['params'] ?? [];
$context = $data['context'] ?? [];
if (empty($method)) {
throw new Exception(
sprintf('Method(%s) cant not be empty!', $string),
Dispatcher::INVALID_PARAMS
);
}
$methodAry = explode(self::DELIMITER, $method);
if (count($methodAry) < 2) {
throw new Exception(
sprintf('Method(%s) is bad format!', $method),
Dispatcher::INVALID_PARAMS
);
}
[$interfaceClass, $methodName] = $methodAry;
if (empty($interfaceClass) || empty($methodName)) {
throw new Exception(
sprintf('Interface(%s) or Method(%s) can not be empty!', $interfaceClass, $method),
Dispatcher::INVALID_PARAMS
);
}
return Protocol::make($interfaceClass, $methodName, $params, $context);
}
/**
* @param string $string
*
* @return mixed
*/
public function decodeResponse(string $string)
{
$data = json_decode($string, true);
if (array_key_exists('result', $data)) {
return $data['result'];
}
$code = $data['error']['code'] ?? 0;
$message = $data['error']['message'] ?? '';
$data = $data['error']['data'] ?? null;
return Error::make($code, $message, $data);
}
/**
* @param mixed $result
*
* @return string
*/
public function encodeResponse($result): string
{
$data = [
'jsonrpc' => self::VERSION,
'id' => '',
];
if ($result instanceof Error) {
$data['error'] = $result;
} else {
$data['result'] = $result;
}
return json_encode($data);
}
}
client
Service.php
<?php
namespace think\swoole\rpc\client;
interface Service
{
public function withContext($context): self;
}
Connector.php
<?php
namespace think\swoole\rpc\client;
use Generator;
interface Connector
{
/**
* @param Generator|string $data
* @param callable $decoder
* @return string
*/
public function sendAndRecv($data, callable $decoder);
}
Proxy.php
<?php
namespace think\swoole\rpc\client;
use InvalidArgumentException;
use Nette\PhpGenerator\Factory;
use Nette\PhpGenerator\PhpNamespace;
use ReflectionClass;
use think\App;
use think\swoole\Middleware;
use think\swoole\rpc\Protocol;
abstract class Proxy implements Service
{
protected $interface;
/** @var Gateway */
protected $gateway;
/** @var App */
protected $app;
protected $middleware = [];
protected $context = [];
final public function __construct(App $app, Gateway $gateway, $middleware)
{
$this->app = $app;
$this->gateway = $gateway;
$this->middleware = $middleware;
}
final protected function proxyCall($method, $params)
{
$protocol = Protocol::make($this->interface, $method, $params, $this->context);
return Middleware::make($this->app, $this->middleware)
->pipeline()
->send($protocol)
->then(function (Protocol $protocol) {
return $this->gateway->call($protocol);
});
}
final public function withContext($context): self
{
$this->context = $context;
return $this;
}
final public static function getClassName($client, $interface)
{
if (!interface_exists($interface)) {
throw new InvalidArgumentException(
sprintf('%s must be exist interface!', $interface)
);
}
$proxyName = class_basename($interface) . "Service";
$className = "rpc\\service\\{$client}\\{$proxyName}";
if (!class_exists($className, false)) {
$namespace = new PhpNamespace("rpc\\service\\{$client}");
$namespace->addUse(Proxy::class);
$namespace->addUse($interface);
$class = $namespace->addClass($proxyName);
$class->setExtends(Proxy::class);
$class->addImplement($interface);
$class->addProperty('interface', class_basename($interface));
$reflection = new ReflectionClass($interface);
foreach ($reflection->getMethods() as $methodRef) {
if ($methodRef->getDeclaringClass()->name == Service::class) {
continue;
}
$method = (new Factory)->fromMethodReflection($methodRef);
$body = "\$this->proxyCall('{$methodRef->getName()}', func_get_args());";
if ($method->getReturnType() != 'void') {
$body = "return {$body}";
}
$method->setBody($body);
$class->addMember($method);
}
eval($namespace);
}
return $className;
}
}
Gateway.php
<?php
namespace think\swoole\rpc\client;
use Closure;
use Swoole\Coroutine\Client;
use think\File;
use think\helper\Arr;
use think\swoole\concerns\InteractsWithRpcConnector;
use think\swoole\contract\rpc\ParserInterface;
use think\swoole\exception\RpcClientException;
use think\swoole\exception\RpcResponseException;
use think\swoole\rpc\Error;
use think\swoole\rpc\Packer;
use think\swoole\rpc\Protocol;
use think\swoole\rpc\Sendfile;
class Gateway
{
use Sendfile;
/** @var Connector */
protected $connector;
/** @var ParserInterface */
protected $parser;
protected $tries;
/**
* Gateway constructor.
* @param Connector|array $connector
* @param ParserInterface $parser
*/
public function __construct($connector, ParserInterface $parser, $tries = 2)
{
if (is_array($connector)) {
$connector = $this->createDefaultConnector($connector);
}
$this->connector = $connector;
$this->parser = $parser;
$this->tries = $tries;
}
protected function encodeData(Protocol $protocol)
{
$params = $protocol->getParams();
//有文件,先传输
foreach ($params as $index => $param) {
if ($param instanceof File) {
yield from $this->fread($param);
$params[$index] = Protocol::FILE;
}
}
$protocol->setParams($params);
$data = $this->parser->encode($protocol);
yield Packer::pack($data);
}
protected function decodeResponse($response)
{
$result = $this->parser->decodeResponse($response);
if ($result instanceof Error) {
throw new RpcResponseException($result);
}
return $result;
}
protected function sendAndRecv($data)
{
return $this->connector->sendAndRecv($data, Closure::fromCallable([$this, 'decodeResponse']));
}
public function call(Protocol $protocol)
{
if ($this->tries > 1) {
$result = backoff(function () use ($protocol) {
try {
return $this->sendAndRecv($this->encodeData($protocol));
} catch (RpcResponseException $e) {
return $e;
}
}, $this->tries);
if ($result instanceof RpcResponseException) {
throw $result;
}
return $result;
} else {
return $this->sendAndRecv($this->encodeData($protocol));
}
}
public function getServices()
{
return $this->sendAndRecv(Packer::pack(Protocol::ACTION_INTERFACE));
}
protected function createDefaultConnector($config)
{
return new class($config) implements Connector {
use InteractsWithRpcConnector;
/** @var Client */
protected $client;
protected $config;
/**
* constructor.
* @param [] $config
*/
public function __construct($config)
{
$this->config = $config;
}
protected function isConnected(): bool
{
return $this->client && $this->client->isConnected() && $this->client->peek() !== '';
}
protected function getClient()
{
if (!$this->isConnected()) {
$client = new Client(SWOOLE_SOCK_TCP);
$config = $this->config;
$host = Arr::pull($config, 'host');
$port = Arr::pull($config, 'port');
$timeout = Arr::pull($config, 'timeout', 5);
$client->set($config);
if (!$client->connect($host, $port, $timeout)) {
throw new RpcClientException(
sprintf('Connect failed host=%s port=%d', $host, $port)
);
}
$this->client = $client;
}
return $this->client;
}
protected function runWithClient($callback)
{
return $callback($this->getClient());
}
};
}
}
packer
File.php
<?php
namespace think\swoole\rpc\packer;
class File
{
protected $name;
protected $handle;
protected $length;
public function __construct($length)
{
$this->length = $length;
}
public function write(&$data)
{
if (!$this->handle) {
$this->name = tempnam(sys_get_temp_dir(), 'swoole_rpc_');
$this->handle = fopen($this->name, 'ab');
}
$size = fstat($this->handle)['size'];
$string = substr($data, 0, $this->length - $size);
fwrite($this->handle, $string);
if (strlen($data) >= $this->length - $size) {
fclose($this->handle);
$data = substr($data, $this->length - $size);
return new \think\swoole\rpc\File($this->name);
} else {
$data = '';
}
}
}
Buffer.php
<?php
namespace think\swoole\rpc\packer;
class Buffer
{
protected $data = '';
protected $length;
public function __construct($length)
{
$this->length = $length;
}
public function write(&$data)
{
$size = strlen($this->data);
$string = substr($data, 0, $this->length - $size);
$this->data .= $string;
if (strlen($data) >= $this->length - $size) {
$data = substr($data, $this->length - $size);
return $this->data;
} else {
$data = '';
}
}
}
watcher
Driver.php
<?php
namespace think\swoole\watcher;
interface Driver
{
public function watch(callable $callback);
}
Find.php
<?php
namespace think\swoole\watcher;
use InvalidArgumentException;
use Swoole\Coroutine\System;
use Swoole\Timer;
use think\helper\Str;
class Find implements Driver
{
protected $name;
protected $directory;
protected $exclude;
public function __construct($directory, $exclude, $name)
{
$ret = System::exec('which find');
if (empty($ret['output'])) {
throw new InvalidArgumentException('find not exists.');
}
$ret = System::exec('find --help', true);
if (Str::contains($ret['output'] ?? '', 'BusyBox')) {
throw new InvalidArgumentException('find version not support.');
}
$this->directory = $directory;
$this->exclude = $exclude;
$this->name = $name;
}
public function watch(callable $callback)
{
$ms = 2000;
$seconds = ceil(($ms + 1000) / 1000);
$minutes = sprintf('-%.2f', $seconds / 60);
$dest = implode(' ', $this->directory);
$name = empty($this->name) ? '' : ' \( ' . join(' -o ', array_map(fn ($v) => "-name \"{$v}\"", $this->name)) . ' \)';
$notName = '';
$notPath = '';
if (!empty($this->exclude)) {
$excludeDirs = $excludeFiles = [];
foreach ($this->exclude as $directory) {
$directory = rtrim($directory, '/');
if (is_dir($directory)) {
$excludeDirs[] = $directory;
} else {
$excludeFiles[] = $directory;
}
}
if (!empty($excludeFiles)) {
$notPath = ' -not \( ' . join(' -and ', array_map(fn ($v) => "-name \"{$v}\"", $excludeFiles)) . ' \)';
}
if (!empty($excludeDirs)) {
$notPath = ' -not \( ' . join(' -and ', array_map(fn ($v) => "-path \"{$v}/*\"", $excludeDirs)) . ' \)';
}
}
$command = "find {$dest}{$name}{$notName}{$notPath} -mmin {$minutes} -type f -print";
Timer::tick($ms, function () use ($callback, $command) {
$ret = System::exec($command);
if ($ret['code'] === 0 && strlen($ret['output'])) {
$stdout = trim($ret['output']);
if (!empty($stdout)) {
call_user_func($callback);
}
}
});
}
}
Scan.php
<?php
namespace think\swoole\watcher;
use Swoole\Timer;
use Symfony\Component\Finder\Finder;
use Symfony\Component\Finder\SplFileInfo;
class Scan implements Driver
{
protected $finder;
protected $files = [];
public function __construct($directory, $exclude, $name)
{
$this->finder = new Finder();
$this->finder
->files()
->name($name)
->in(array_filter((array) $directory, function ($dir) {
return is_dir($dir);
}))
->exclude($exclude);
}
protected function findFiles()
{
$files = [];
/** @var SplFileInfo $f */
foreach ($this->finder as $f) {
$files[$f->getRealpath()] = $f->getMTime();
}
return $files;
}
public function watch(callable $callback)
{
$this->files = $this->findFiles();
Timer::tick(2000, function () use ($callback) {
$files = $this->findFiles();
foreach ($files as $path => $time) {
if (empty($this->files[$path]) || $this->files[$path] != $time) {
call_user_func($callback);
break;
}
}
$this->files = $files;
});
}
}
websocket
middleware
SessionInit.php
<?php
namespace think\swoole\websocket\middleware;
use Closure;
use think\App;
use think\Request;
use think\Response;
use think\Session;
class SessionInit
{
/** @var App */
protected $app;
/** @var Session */
protected $session;
public function __construct(App $app, Session $session)
{
$this->app = $app;
$this->session = $session;
}
/**
* Session初始化
* @access public
* @param Request $request
* @param Closure $next
* @return Response
*/
public function handle($request, Closure $next)
{
// Session初始化
$varSessionId = $this->app->config->get('session.var_session_id');
$cookieName = $this->session->getName();
if ($varSessionId && $request->request($varSessionId)) {
$sessionId = $request->request($varSessionId);
} else {
$sessionId = $request->cookie($cookieName);
}
if ($sessionId) {
$this->session->setId($sessionId);
}
$this->session->init();
$request->withSession($this->session);
return $next($request);
}
}
Event.php
<?php
namespace think\swoole\websocket;
class Event
{
public $type;
public $data;
public function __construct($type, $data)
{
$this->type = $type;
$this->data = $data;
}
}
room
Redis.php
<?php
namespace think\swoole\websocket\room;
use InvalidArgumentException;
use Redis as PHPRedis;
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\PhpRedisConnector;
use think\helper\Arr;
use think\swoole\contract\websocket\RoomInterface;
use think\swoole\Manager;
use think\swoole\Pool;
/**
* Class RedisRoom
*/
class Redis implements RoomInterface
{
/**
* @var array
*/
protected $config;
/**
* @var string
*/
protected $prefix = 'swoole:';
/** @var Manager */
protected $manager;
/** @var ConnectionPool */
protected $pool;
/**
* RedisRoom constructor.
*
* @param Manager $manager
* @param array $config
*/
public function __construct(Manager $manager, array $config)
{
$this->manager = $manager;
$this->config = $config;
if ($prefix = Arr::get($this->config, 'prefix')) {
$this->prefix = $prefix;
}
}
/**
* @return RoomInterface
*/
public function prepare(): RoomInterface
{
$this->initData();
$this->prepareRedis();
return $this;
}
protected function prepareRedis()
{
$this->manager->onEvent('workerStart', function () {
$config = $this->config;
$this->pool = new ConnectionPool(
Pool::pullPoolConfig($config),
new PhpRedisConnector(),
$config
);
$this->manager->getPools()->add('websocket.room', $this->pool);
});
}
protected function initData()
{
$connector = new PhpRedisConnector();
$connection = $connector->connect($this->config);
if (count($keys = $connection->keys("{$this->prefix}*"))) {
$connection->del($keys);
}
$connector->disconnect($connection);
}
/**
* Add multiple socket fds to a room.
*
* @param string $fd
* @param array|string $roomNames
*/
public function add($fd, $roomNames)
{
$rooms = is_array($roomNames) ? $roomNames : [$roomNames];
$this->addValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);
foreach ($rooms as $room) {
$this->addValue($room, [$fd], RoomInterface::ROOMS_KEY);
}
}
/**
* Delete multiple socket fds from a room.
*
* @param string fd
* @param array|string rooms
*/
public function delete($fd, $rooms)
{
$rooms = is_array($rooms) ? $rooms : [$rooms];
$rooms = count($rooms) ? $rooms : $this->getRooms($fd);
$this->removeValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);
foreach ($rooms as $room) {
$this->removeValue($room, [$fd], RoomInterface::ROOMS_KEY);
}
}
protected function runWithRedis(\Closure $callable)
{
$redis = $this->pool->borrow();
try {
return $callable($redis);
} finally {
$this->pool->return($redis);
}
}
/**
* Add value to redis.
*
* @param $key
* @param array $values
* @param string $table
*
* @return $this
*/
protected function addValue($key, array $values, string $table)
{
$this->checkTable($table);
$redisKey = $this->getKey($key, $table);
$this->runWithRedis(function (PHPRedis $redis) use ($redisKey, $values) {
$pipe = $redis->multi(PHPRedis::PIPELINE);
foreach ($values as $value) {
$pipe->sadd($redisKey, $value);
}
$pipe->exec();
});
return $this;
}
/**
* Remove value from redis.
*
* @param $key
* @param array $values
* @param string $table
*
* @return $this
*/
protected function removeValue($key, array $values, string $table)
{
$this->checkTable($table);
$redisKey = $this->getKey($key, $table);
$this->runWithRedis(function (PHPRedis $redis) use ($redisKey, $values) {
$pipe = $redis->multi(PHPRedis::PIPELINE);
foreach ($values as $value) {
$pipe->srem($redisKey, $value);
}
$pipe->exec();
});
return $this;
}
/**
* Get all sockets by a room key.
*
* @param string room
*
* @return array
*/
public function getClients(string $room)
{
return $this->getValue($room, RoomInterface::ROOMS_KEY) ?? [];
}
/**
* Get all rooms by a fd.
*
* @param string fd
*
* @return array
*/
public function getRooms($fd)
{
return $this->getValue($fd, RoomInterface::DESCRIPTORS_KEY) ?? [];
}
/**
* Check table for rooms and descriptors.
*
* @param string $table
*/
protected function checkTable(string $table)
{
if (!in_array($table, [RoomInterface::ROOMS_KEY, RoomInterface::DESCRIPTORS_KEY])) {
throw new InvalidArgumentException("Invalid table name: `{$table}`.");
}
}
/**
* Get value.
*
* @param string $key
* @param string $table
*
* @return array
*/
protected function getValue(string $key, string $table)
{
$this->checkTable($table);
return $this->runWithRedis(function (PHPRedis $redis) use ($table, $key) {
return $redis->smembers($this->getKey($key, $table));
});
}
/**
* Get key.
*
* @param string $key
* @param string $table
*
* @return string
*/
protected function getKey(string $key, string $table)
{
return "{$this->prefix}{$table}:{$key}";
}
}
Table.php
<?php
namespace think\swoole\websocket\room;
use InvalidArgumentException;
use Swoole\Table as SwooleTable;
use think\swoole\contract\websocket\RoomInterface;
class Table implements RoomInterface
{
/**
* @var array
*/
protected $config = [
'room_rows' => 8192,
'room_size' => 2048,
'client_rows' => 4096,
'client_size' => 2048,
];
/**
* @var SwooleTable
*/
protected $rooms;
/**
* @var SwooleTable
*/
protected $fds;
/**
* TableRoom constructor.
*
* @param array $config
*/
public function __construct(array $config)
{
$this->config = array_merge($this->config, $config);
}
/**
* Do some init stuffs before workers started.
*
* @return RoomInterface
*/
public function prepare(): RoomInterface
{
$this->initRoomsTable();
$this->initFdsTable();
return $this;
}
/**
* Add multiple socket fds to a room.
*
* @param string fd
* @param array|string $roomNames
*/
public function add($fd, $roomNames)
{
$rooms = $this->getRooms($fd);
$roomNames = is_array($roomNames) ? $roomNames : [$roomNames];
foreach ($roomNames as $room) {
$fds = $this->getClients($room);
if (in_array($fd, $fds)) {
continue;
}
$fds[] = $fd;
$rooms[] = $room;
$this->setClients($room, $fds);
}
$this->setRooms($fd, $rooms);
}
/**
* Delete multiple socket fds from a room.
*
* @param string fd
* @param array|string $roomNames
*/
public function delete($fd, $roomNames = [])
{
$allRooms = $this->getRooms($fd);
$roomNames = is_array($roomNames) ? $roomNames : [$roomNames];
$rooms = count($roomNames) ? $roomNames : $allRooms;
$removeRooms = [];
foreach ($rooms as $room) {
$fds = $this->getClients($room);
if (!in_array($fd, $fds)) {
continue;
}
$this->setClients($room, array_values(array_diff($fds, [$fd])));
$removeRooms[] = $room;
}
$this->setRooms($fd, collect($allRooms)->diff($removeRooms)->values()->toArray());
}
/**
* Get all sockets by a room key.
*
* @param string room
*
* @return array
*/
public function getClients(string $room)
{
return $this->getValue($room, RoomInterface::ROOMS_KEY) ?? [];
}
/**
* Get all rooms by a fd.
*
* @param string fd
*
* @return array
*/
public function getRooms($fd)
{
return $this->getValue($fd, RoomInterface::DESCRIPTORS_KEY) ?? [];
}
/**
* @param string $room
* @param array $fds
*
* @return $this
*/
protected function setClients(string $room, array $fds)
{
return $this->setValue($room, $fds, RoomInterface::ROOMS_KEY);
}
/**
* @param string $fd
* @param array $rooms
*
* @return $this
*/
protected function setRooms($fd, array $rooms)
{
return $this->setValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);
}
/**
* Init rooms table
*/
protected function initRoomsTable(): void
{
$this->rooms = new SwooleTable($this->config['room_rows']);
$this->rooms->column('value', SwooleTable::TYPE_STRING, $this->config['room_size']);
$this->rooms->create();
}
/**
* Init descriptors table
*/
protected function initFdsTable()
{
$this->fds = new SwooleTable($this->config['client_rows']);
$this->fds->column('value', SwooleTable::TYPE_STRING, $this->config['client_size']);
$this->fds->create();
}
/**
* Set value to table
*
* @param $key
* @param array $value
* @param string $table
*
* @return $this
*/
public function setValue($key, array $value, string $table)
{
$this->checkTable($table);
if (empty($value)) {
$this->$table->del($key);
} else {
$this->$table->set($key, ['value' => json_encode($value)]);
}
return $this;
}
/**
* Get value from table
*
* @param string $key
* @param string $table
*
* @return array|mixed
*/
public function getValue(string $key, string $table)
{
$this->checkTable($table);
$value = $this->$table->get($key);
return $value ? json_decode($value['value'], true) : [];
}
/**
* Check table for exists
*
* @param string $table
*/
protected function checkTable(string $table)
{
if (!property_exists($this, $table) || !$this->$table instanceof SwooleTable) {
throw new InvalidArgumentException("Invalid table name: `{$table}`.");
}
}
}
Handler.php
<?php
namespace think\swoole\websocket;
use Swoole\WebSocket\Frame;
use think\Event;
use think\Request;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\websocket\Event as WsEvent;
class Handler implements HandlerInterface
{
protected $event;
public function __construct(Event $event)
{
$this->event = $event;
}
/**
* "onOpen" listener.
*
* @param Request $request
*/
public function onOpen(Request $request)
{
$this->event->trigger('swoole.websocket.Open', $request);
}
/**
* "onMessage" listener.
*
* @param Frame $frame
*/
public function onMessage(Frame $frame)
{
$this->event->trigger('swoole.websocket.Message', $frame);
$this->event->trigger('swoole.websocket.Event', $this->decode($frame->data));
}
/**
* "onClose" listener.
*/
public function onClose()
{
$this->event->trigger('swoole.websocket.Close');
}
protected function decode($payload)
{
$data = json_decode($payload, true);
return new WsEvent($data['type'] ?? null, $data['data'] ?? null);
}
public function encodeMessage($message)
{
if ($message instanceof WsEvent) {
return json_encode([
'type' => $message->type,
'data' => $message->data,
]);
}
return $message;
}
}
Room.php
<?php
namespace think\swoole\websocket;
use think\Manager;
use think\swoole\websocket\room\Table;
/**
* Class Room
* @package think\swoole\websocket
* @mixin Table
*/
class Room extends Manager
{
protected $namespace = "\\think\\swoole\\websocket\\room\\";
protected function resolveConfig(string $name)
{
return $this->app->config->get("swoole.websocket.room.{$name}", []);
}
/**
* 默认驱动
* @return string|null
*/
public function getDefaultDriver()
{
return $this->app->config->get('swoole.websocket.room.type', 'table');
}
}
message
PushMessage.php
<?php
namespace think\swoole\websocket\message;
class PushMessage
{
public $fd;
public $data;
public function __construct($fd, $data)
{
$this->fd = $fd;
$this->data = $data;
}
}
socketio
EnginePacket.php
<?php
namespace think\swoole\websocket\socketio;
class EnginePacket
{
/**
* Engine.io packet type `open`.
*/
const OPEN = 0;
/**
* Engine.io packet type `close`.
*/
const CLOSE = 1;
/**
* Engine.io packet type `ping`.
*/
const PING = 2;
/**
* Engine.io packet type `pong`.
*/
const PONG = 3;
/**
* Engine.io packet type `message`.
*/
const MESSAGE = 4;
/**
* Engine.io packet type 'upgrade'
*/
const UPGRADE = 5;
/**
* Engine.io packet type `noop`.
*/
const NOOP = 6;
public $type;
public $data = '';
public function __construct($type, $data = '')
{
$this->type = $type;
$this->data = $data;
}
public static function open($payload)
{
return new static(self::OPEN, $payload);
}
public static function pong($payload = '')
{
return new static(self::PONG, $payload);
}
public static function ping()
{
return new static(self::PING);
}
public static function message($payload)
{
return new static(self::MESSAGE, $payload);
}
public static function fromString(string $packet)
{
return new static(substr($packet, 0, 1), substr($packet, 1) ?? '');
}
public function toString()
{
return $this->type . $this->data;
}
}
Handler.php
<?php
namespace think\swoole\websocket\socketio;
use Exception;
use Swoole\Timer;
use Swoole\Websocket\Frame;
use think\Config;
use think\Event;
use think\Request;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\Websocket;
use think\swoole\websocket\Event as WsEvent;
class Handler implements HandlerInterface
{
/** @var Config */
protected $config;
protected $event;
protected $websocket;
protected $eio;
protected $pingTimeoutTimer = 0;
protected $pingIntervalTimer = 0;
protected $pingInterval;
protected $pingTimeout;
public function __construct(Event $event, Config $config, Websocket $websocket)
{
$this->event = $event;
$this->config = $config;
$this->websocket = $websocket;
$this->pingInterval = $this->config->get('swoole.websocket.ping_interval', 25000);
$this->pingTimeout = $this->config->get('swoole.websocket.ping_timeout', 60000);
}
/**
* "onOpen" listener.
*
* @param Request $request
*/
public function onOpen(Request $request)
{
$this->eio = $request->param('EIO');
$payload = json_encode(
[
'sid' => base64_encode(uniqid()),
'upgrades' => [],
'pingInterval' => $this->pingInterval,
'pingTimeout' => $this->pingTimeout,
]
);
$this->push(EnginePacket::open($payload));
$this->event->trigger('swoole.websocket.Open', $request);
if ($this->eio < 4) {
$this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
$this->onConnect();
} else {
$this->schedulePing();
}
}
/**
* "onMessage" listener.
*
* @param Frame $frame
*/
public function onMessage(Frame $frame)
{
$enginePacket = EnginePacket::fromString($frame->data);
$this->event->trigger('swoole.websocket.Message', $enginePacket);
$this->resetPingTimeout($this->pingInterval + $this->pingTimeout);
switch ($enginePacket->type) {
case EnginePacket::MESSAGE:
$packet = Packet::fromString($enginePacket->data);
switch ($packet->type) {
case Packet::CONNECT:
$this->onConnect($packet->data);
break;
case Packet::EVENT:
$type = array_shift($packet->data);
$data = $packet->data;
$result = $this->event->trigger('swoole.websocket.Event', new WsEvent($type, $data));
if ($packet->id !== null) {
$responsePacket = Packet::create(Packet::ACK, [
'id' => $packet->id,
'nsp' => $packet->nsp,
'data' => $result,
]);
$this->push($responsePacket);
}
break;
case Packet::DISCONNECT:
$this->event->trigger('swoole.websocket.Disconnect');
$this->websocket->close();
break;
default:
$this->websocket->close();
break;
}
break;
case EnginePacket::PING:
$this->push(EnginePacket::pong($enginePacket->data));
break;
case EnginePacket::PONG:
$this->schedulePing();
break;
default:
$this->websocket->close();
break;
}
}
/**
* "onClose" listener.
*/
public function onClose()
{
Timer::clear($this->pingTimeoutTimer);
Timer::clear($this->pingIntervalTimer);
$this->event->trigger('swoole.websocket.Close');
}
protected function onConnect($data = null)
{
try {
$this->event->trigger('swoole.websocket.Connect', $data);
$packet = Packet::create(Packet::CONNECT);
if ($this->eio >= 4) {
$packet->data = ['sid' => base64_encode(uniqid())];
}
} catch (Exception $exception) {
$packet = Packet::create(Packet::CONNECT_ERROR, [
'data' => ['message' => $exception->getMessage()],
]);
}
$this->push($packet);
}
protected function resetPingTimeout($timeout)
{
Timer::clear($this->pingTimeoutTimer);
$this->pingTimeoutTimer = Timer::after($timeout, function () {
$this->websocket->close();
});
}
protected function schedulePing()
{
Timer::clear($this->pingIntervalTimer);
$this->pingIntervalTimer = Timer::after($this->pingInterval, function () {
$this->push(EnginePacket::ping());
$this->resetPingTimeout($this->pingTimeout);
});
}
public function encodeMessage($message)
{
if ($message instanceof WsEvent) {
$message = Packet::create(Packet::EVENT, [
'data' => array_merge([$message->type], $message->data),
]);
}
if ($message instanceof Packet) {
$message = EnginePacket::message($message->toString());
}
if ($message instanceof EnginePacket) {
$message = $message->toString();
}
return $message;
}
protected function push($data)
{
$this->websocket->push($data);
}
}
Packet.php
<?php
namespace think\swoole\websocket\socketio;
/**
* Class Packet
*/
class Packet
{
/**
* Socket.io packet type `connect`.
*/
const CONNECT = 0;
/**
* Socket.io packet type `disconnect`.
*/
const DISCONNECT = 1;
/**
* Socket.io packet type `event`.
*/
const EVENT = 2;
/**
* Socket.io packet type `ack`.
*/
const ACK = 3;
/**
* Socket.io packet type `connect_error`.
*/
const CONNECT_ERROR = 4;
/**
* Socket.io packet type 'binary event'
*/
const BINARY_EVENT = 5;
/**
* Socket.io packet type `binary ack`. For acks with binary arguments.
*/
const BINARY_ACK = 6;
public $type;
public $nsp = '/';
public $data = null;
public $id = null;
public function __construct(int $type)
{
$this->type = $type;
}
public static function create($type, array $decoded = [])
{
$new = new static($type);
$new->id = $decoded['id'] ?? null;
if (isset($decoded['nsp'])) {
$new->nsp = $decoded['nsp'] ?: '/';
} else {
$new->nsp = '/';
}
$new->data = $decoded['data'] ?? null;
return $new;
}
public function toString()
{
$str = '' . $this->type;
if ($this->nsp && '/' !== $this->nsp) {
$str .= $this->nsp . ',';
}
if ($this->id !== null) {
$str .= $this->id;
}
if (null !== $this->data) {
$str .= json_encode($this->data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
}
return $str;
}
public static function fromString(string $str)
{
$i = 0;
$packet = new Packet((int) substr($str, 0, 1));
// look up namespace (if any)
if ('/' === substr($str, $i + 1, 1)) {
$nsp = '';
while (++$i) {
$c = substr($str, $i, 1);
if (',' === $c) {
break;
}
$nsp .= $c;
if ($i === strlen($str)) {
break;
}
}
$packet->nsp = $nsp;
} else {
$packet->nsp = '/';
}
// look up id
$next = substr($str, $i + 1, 1);
if ('' !== $next && is_numeric($next)) {
$id = '';
while (++$i) {
$c = substr($str, $i, 1);
if (null == $c || !is_numeric($c)) {
--$i;
break;
}
$id .= substr($str, $i, 1);
if ($i === strlen($str)) {
break;
}
}
$packet->id = intval($id);
}
// look up json data
if (substr($str, ++$i, 1)) {
$packet->data = json_decode(substr($str, $i), true);
}
return $packet;
}
}
Pusher.php
<?php
namespace think\swoole\websocket;
use think\swoole\contract\websocket\HandlerInterface;
use think\swoole\Manager;
use think\swoole\websocket\message\PushMessage;
/**
* Class Pusher
*/
class Pusher
{
/** @var Room */
protected $room;
/** @var Manager */
protected $manager;
/** @var HandlerInterface */
protected $handler;
protected $to = [];
public function __construct(Manager $manager, Room $room, HandlerInterface $handler)
{
$this->manager = $manager;
$this->room = $room;
$this->handler = $handler;
}
public function to(...$values)
{
foreach ($values as $value) {
if (is_array($value)) {
$this->to(...$value);
} elseif (!in_array($value, $this->to)) {
$this->to[] = $value;
}
}
return $this;
}
/**
* Push message to related descriptors
* @param $data
* @return void
*/
public function push($data): void
{
$fds = [];
foreach ($this->to as $room) {
$clients = $this->room->getClients($room);
if (!empty($clients)) {
$fds = array_merge($fds, $clients);
}
}
foreach (array_unique($fds) as $fd) {
[$workerId, $fd] = explode('.', $fd);
$data = $this->handler->encodeMessage($data);
$this->manager->sendMessage((int) $workerId, new PushMessage((int) $fd, $data));
}
}
public function emit(string $event, ...$data): void
{
$this->push(new Event($event, $data));
}
}
目录
├── Watcher.php
├── Websocket.php
├── App.php
├── Service.php
├── Pool.php
├── Http.php
├── Sandbox.php
├── Manager.php
├── Middleware.php
├── helpers.php
├── Table.php
└── command
├── Server.php
├── RpcInterface.php
└── concerns
├── WithApplication.php
├── InteractsWithRpcServer.php
├── InteractsWithSwooleTable.php
├── InteractsWithWebsocket.php
├── WithRpcClient.php
├── ModifyProperty.php
├── InteractsWithServer.php
├── WithMiddleware.php
├── InteractsWithHttp.php
├── InteractsWithQueue.php
├── InteractsWithRpcConnector.php
├── InteractsWithPools.php
├── InteractsWithRpcClient.php
├── WithContainer.php
├── InteractsWithTracing.php
└── config
├── swoole.php
└── contract
└── websocket
├── HandlerInterface.php
├── RoomInterface.php
├── ResetterInterface.php
└── rpc
├── ParserInterface.php
└── coroutine
├── Context.php
└── exception
├── RpcResponseException.php
├── RpcClientException.php
└── middleware
├── TraceRpcServer.php
├── TraceRpcClient.php
├── InteractsWithVarDumper.php
└── pool
└── proxy
├── Connection.php
├── Store.php
├── Cache.php
├── Connector.php
├── Db.php
├── Proxy.php
├── Client.php
└── resetters
├── ResetModel.php
├── ResetConfig.php
├── ClearInstances.php
├── ResetEvent.php
├── ResetService.php
├── ResetPaginator.php
└── response
├── File.php
└── rpc
├── Protocol.php
├── File.php
└── server
├── Dispatcher.php
├── Channel.php
├── Packer.php
├── Sendfile.php
├── Error.php
├── JsonParser.php
└── client
├── Service.php
├── Connector.php
├── Proxy.php
├── Gateway.php
└── packer
├── File.php
├── Buffer.php
└── watcher
├── Driver.php
├── Find.php
├── Scan.php
└── websocket
└── middleware
├── SessionInit.php
├── Event.php
└── room
├── Redis.php
├── Table.php
├── Handler.php
├── Room.php
└── message
├── PushMessage.php
└── socketio
├── EnginePacket.php
├── Handler.php
├── Packet.php
├── Pusher.php
评论已关闭