初始化代码

This commit is contained in:
2025-12-22 14:34:25 +08:00
parent c2c5ae2fdd
commit a77dbc743f
1510 changed files with 213008 additions and 0 deletions

View File

@@ -0,0 +1,202 @@
<?php
namespace think\swoole\websocket;
use Swoole\Server;
/**
* Class Pusher
*/
class Pusher
{
/**
* @var Server
*/
protected $server;
/**
* @var int
*/
protected $sender;
/**
* @var array
*/
protected $descriptors;
/**
* @var bool
*/
protected $broadcast;
/**
* @var bool
*/
protected $assigned;
/**
* @var string
*/
protected $payload;
/**
* Push constructor.
*
* @param Server $server
* @param int $sender
* @param array $descriptors
* @param bool $broadcast
* @param bool $assigned
* @param string $payload
*/
public function __construct(
Server $server,
string $payload,
int $sender = 0,
array $descriptors = [],
bool $broadcast = false,
bool $assigned = false
)
{
$this->sender = $sender;
$this->descriptors = $descriptors;
$this->broadcast = $broadcast;
$this->assigned = $assigned;
$this->payload = $payload;
$this->server = $server;
}
/**
* @return int
*/
public function getSender(): int
{
return $this->sender;
}
/**
* @return array
*/
public function getDescriptors(): array
{
return $this->descriptors;
}
/**
* @param int $descriptor
*
* @return self
*/
public function addDescriptor($descriptor): self
{
return $this->addDescriptors([$descriptor]);
}
/**
* @param array $descriptors
*
* @return self
*/
public function addDescriptors(array $descriptors): self
{
$this->descriptors = array_values(
array_unique(
array_merge($this->descriptors, $descriptors)
)
);
return $this;
}
/**
* @param int $descriptor
*
* @return bool
*/
public function hasDescriptor(int $descriptor): bool
{
return in_array($descriptor, $this->descriptors);
}
/**
* @return bool
*/
public function isBroadcast(): bool
{
return $this->broadcast;
}
/**
* @return bool
*/
public function isAssigned(): bool
{
return $this->assigned;
}
/**
* @return string
*/
public function getPayload(): string
{
return $this->payload;
}
/**
* @return bool
*/
public function shouldBroadcast(): bool
{
return $this->broadcast && empty($this->descriptors) && !$this->assigned;
}
/**
* Returns all descriptors that are websocket
*
* @return array
*/
protected function getWebsocketConnections(): array
{
return array_filter(iterator_to_array($this->server->connections), function ($fd) {
return (bool) $this->server->getClientInfo($fd)['websocket_status'] ?? false;
});
}
/**
* @param int $fd
*
* @return bool
*/
protected function shouldPushToDescriptor(int $fd): bool
{
if (!$this->server->exist($fd)) {
return false;
}
return $this->broadcast ? $this->sender !== (int) $fd : true;
}
/**
* Push message to related descriptors
* @return void
*/
public function push(): void
{
// attach sender if not broadcast
if (!$this->broadcast && $this->sender && !$this->hasDescriptor($this->sender)) {
$this->addDescriptor($this->sender);
}
// check if to broadcast to other clients
if ($this->shouldBroadcast()) {
$this->addDescriptors($this->getWebsocketConnections());
}
// push message to designated fds
foreach ($this->descriptors as $descriptor) {
if ($this->shouldPushToDescriptor($descriptor)) {
$this->server->push($descriptor, $this->payload);
}
}
}
}

View File

@@ -0,0 +1,30 @@
<?php
namespace think\swoole\websocket;
use think\Manager;
use think\swoole\websocket\room\Table;
/**
* Class Room
* @package think\swoole\websocket
* @mixin Table
*/
class Room extends Manager
{
protected $namespace = "\\think\\swoole\\websocket\\room\\";
protected function resolveConfig(string $name)
{
return $this->app->config->get("swoole.websocket.room.{$name}", []);
}
/**
* 默认驱动
* @return string|null
*/
public function getDefaultDriver()
{
return $this->app->config->get('swoole.websocket.room.type', 'table');
}
}

View File

@@ -0,0 +1,46 @@
<?php
namespace think\swoole\websocket;
use Swoole\Websocket\Frame;
use think\swoole\contract\websocket\ParserInterface;
class SimpleParser implements ParserInterface
{
/**
* Encode output payload for websocket push.
*
* @param string $event
* @param mixed $data
*
* @return mixed
*/
public function encode(string $event, $data)
{
return json_encode(
[
'event' => $event,
'data' => $data,
]
);
}
/**
* Input message on websocket connected.
* Define and return event name and payload data here.
*
* @param Frame $frame
*
* @return array
*/
public function decode($frame)
{
$data = json_decode($frame->data, true);
return [
'event' => $data['event'] ?? null,
'data' => $data['data'] ?? null,
];
}
}

View File

@@ -0,0 +1,231 @@
<?php
namespace think\swoole\websocket\room;
use InvalidArgumentException;
use Redis as PHPRedis;
use think\helper\Arr;
use think\swoole\contract\websocket\RoomInterface;
/**
* Class RedisRoom
*/
class Redis implements RoomInterface
{
/**
* @var PHPRedis
*/
protected $redis;
/**
* @var array
*/
protected $config;
/**
* @var string
*/
protected $prefix = 'swoole:';
/**
* RedisRoom constructor.
*
* @param array $config
*/
public function __construct(array $config)
{
$this->config = $config;
if ($prefix = Arr::get($this->config, 'prefix')) {
$this->prefix = $prefix;
}
}
/**
* @return RoomInterface
*/
public function prepare(): RoomInterface
{
$this->cleanRooms();
//关闭redis
$this->redis->close();
$this->redis = null;
return $this;
}
/**
* Set redis client.
*
*/
protected function getRedis()
{
if (!$this->redis) {
$host = Arr::get($this->config, 'host', '127.0.0.1');
$port = Arr::get($this->config, 'port', 6379);
$this->redis = new PHPRedis();
$this->redis->pconnect($host, $port);
}
return $this->redis;
}
/**
* Add multiple socket fds to a room.
*
* @param int fd
* @param array|string rooms
*/
public function add(int $fd, $rooms)
{
$rooms = is_array($rooms) ? $rooms : [$rooms];
$this->addValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);
foreach ($rooms as $room) {
$this->addValue($room, [$fd], RoomInterface::ROOMS_KEY);
}
}
/**
* Delete multiple socket fds from a room.
*
* @param int fd
* @param array|string rooms
*/
public function delete(int $fd, $rooms)
{
$rooms = is_array($rooms) ? $rooms : [$rooms];
$rooms = count($rooms) ? $rooms : $this->getRooms($fd);
$this->removeValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);
foreach ($rooms as $room) {
$this->removeValue($room, [$fd], RoomInterface::ROOMS_KEY);
}
}
/**
* Add value to redis.
*
* @param $key
* @param array $values
* @param string $table
*
* @return $this
*/
protected function addValue($key, array $values, string $table)
{
$this->checkTable($table);
$redisKey = $this->getKey($key, $table);
$pipe = $this->getRedis()->multi(PHPRedis::PIPELINE);
foreach ($values as $value) {
$pipe->sadd($redisKey, $value);
}
$pipe->exec();
return $this;
}
/**
* Remove value from reddis.
*
* @param $key
* @param array $values
* @param string $table
*
* @return $this
*/
protected function removeValue($key, array $values, string $table)
{
$this->checkTable($table);
$redisKey = $this->getKey($key, $table);
$pipe = $this->getRedis()->multi(PHPRedis::PIPELINE);
foreach ($values as $value) {
$pipe->srem($redisKey, $value);
}
$pipe->exec();
return $this;
}
/**
* Get all sockets by a room key.
*
* @param string room
*
* @return array
*/
public function getClients(string $room)
{
return $this->getValue($room, RoomInterface::ROOMS_KEY) ?? [];
}
/**
* Get all rooms by a fd.
*
* @param int fd
*
* @return array
*/
public function getRooms(int $fd)
{
return $this->getValue($fd, RoomInterface::DESCRIPTORS_KEY) ?? [];
}
/**
* Check table for rooms and descriptors.
*
* @param string $table
*/
protected function checkTable(string $table)
{
if (!in_array($table, [RoomInterface::ROOMS_KEY, RoomInterface::DESCRIPTORS_KEY])) {
throw new InvalidArgumentException("Invalid table name: `{$table}`.");
}
}
/**
* Get value.
*
* @param string $key
* @param string $table
*
* @return array
*/
protected function getValue(string $key, string $table)
{
$this->checkTable($table);
return $this->getRedis()->smembers($this->getKey($key, $table));
}
/**
* Get key.
*
* @param string $key
* @param string $table
*
* @return string
*/
protected function getKey(string $key, string $table)
{
return "{$this->prefix}{$table}:{$key}";
}
/**
* Clean all rooms.
*/
protected function cleanRooms(): void
{
if (count($keys = $this->getRedis()->keys("{$this->prefix}*"))) {
$this->getRedis()->del($keys);
}
}
}

View File

@@ -0,0 +1,220 @@
<?php
namespace think\swoole\websocket\room;
use InvalidArgumentException;
use Swoole\Table as SwooleTable;
use think\swoole\contract\websocket\RoomInterface;
class Table implements RoomInterface
{
/**
* @var array
*/
protected $config = [
'room_rows' => 4096,
'room_size' => 2048,
'client_rows' => 8192,
'client_size' => 2048,
];
/**
* @var SwooleTable
*/
protected $rooms;
/**
* @var SwooleTable
*/
protected $fds;
/**
* TableRoom constructor.
*
* @param array $config
*/
public function __construct(array $config)
{
$this->config = array_merge($this->config, $config);
}
/**
* Do some init stuffs before workers started.
*
* @return RoomInterface
*/
public function prepare(): RoomInterface
{
$this->initRoomsTable();
$this->initFdsTable();
return $this;
}
/**
* Add multiple socket fds to a room.
*
* @param int fd
* @param array|string rooms
*/
public function add(int $fd, $roomNames)
{
$rooms = $this->getRooms($fd);
$roomNames = is_array($roomNames) ? $roomNames : [$roomNames];
foreach ($roomNames as $room) {
$fds = $this->getClients($room);
if (in_array($fd, $fds)) {
continue;
}
$fds[] = $fd;
$rooms[] = $room;
$this->setClients($room, $fds);
}
$this->setRooms($fd, $rooms);
}
/**
* Delete multiple socket fds from a room.
*
* @param int fd
* @param array|string rooms
*/
public function delete(int $fd, $roomNames = [])
{
$allRooms = $this->getRooms($fd);
$roomNames = is_array($roomNames) ? $roomNames : [$roomNames];
$rooms = count($roomNames) ? $roomNames : $allRooms;
$removeRooms = [];
foreach ($rooms as $room) {
$fds = $this->getClients($room);
if (!in_array($fd, $fds)) {
continue;
}
$this->setClients($room, array_values(array_diff($fds, [$fd])));
$removeRooms[] = $room;
}
$this->setRooms($fd, collect($allRooms)->diff($removeRooms)->values()->toArray());
}
/**
* Get all sockets by a room key.
*
* @param string room
*
* @return array
*/
public function getClients(string $room)
{
return $this->getValue($room, RoomInterface::ROOMS_KEY) ?? [];
}
/**
* Get all rooms by a fd.
*
* @param int fd
*
* @return array
*/
public function getRooms(int $fd)
{
return $this->getValue($fd, RoomInterface::DESCRIPTORS_KEY) ?? [];
}
/**
* @param string $room
* @param array $fds
*
* @return $this
*/
protected function setClients(string $room, array $fds)
{
return $this->setValue($room, $fds, RoomInterface::ROOMS_KEY);
}
/**
* @param int $fd
* @param array $rooms
*
* @return $this
*/
protected function setRooms(int $fd, array $rooms)
{
return $this->setValue($fd, $rooms, RoomInterface::DESCRIPTORS_KEY);
}
/**
* Init rooms table
*/
protected function initRoomsTable(): void
{
$this->rooms = new SwooleTable($this->config['room_rows']);
$this->rooms->column('value', SwooleTable::TYPE_STRING, $this->config['room_size']);
$this->rooms->create();
}
/**
* Init descriptors table
*/
protected function initFdsTable()
{
$this->fds = new SwooleTable($this->config['client_rows']);
$this->fds->column('value', SwooleTable::TYPE_STRING, $this->config['client_size']);
$this->fds->create();
}
/**
* Set value to table
*
* @param $key
* @param array $value
* @param string $table
*
* @return $this
*/
public function setValue($key, array $value, string $table)
{
$this->checkTable($table);
$this->$table->set($key, ['value' => json_encode($value)]);
return $this;
}
/**
* Get value from table
*
* @param string $key
* @param string $table
*
* @return array|mixed
*/
public function getValue(string $key, string $table)
{
$this->checkTable($table);
$value = $this->$table->get($key);
return $value ? json_decode($value['value'], true) : [];
}
/**
* Check table for exists
*
* @param string $table
*/
protected function checkTable(string $table)
{
if (!property_exists($this, $table) || !$this->$table instanceof SwooleTable) {
throw new InvalidArgumentException("Invalid table name: `{$table}`.");
}
}
}

View File

@@ -0,0 +1,54 @@
<?php
namespace think\swoole\websocket\socketio;
use think\Config;
use think\Cookie;
use think\Request;
class Controller
{
protected $transports = ['polling', 'websocket'];
public function upgrade(Request $request, Config $config, Cookie $cookie)
{
if (!in_array($request->param('transport'), $this->transports)) {
return json(
[
'code' => 0,
'message' => 'Transport unknown',
],
400
);
}
if ($request->has('sid')) {
$response = response('1:6');
} else {
$sid = base64_encode(uniqid());
$payload = json_encode(
[
'sid' => $sid,
'upgrades' => ['websocket'],
'pingInterval' => $config->get('swoole.websocket.ping_interval'),
'pingTimeout' => $config->get('swoole.websocket.ping_timeout'),
]
);
$cookie->set('io', $sid);
$response = response('97:0' . $payload . '2:40');
}
return $response->contentType('text/plain');
}
public function reject(Request $request)
{
return json(
[
'code' => 3,
'message' => 'Bad request',
],
400
);
}
}

View File

@@ -0,0 +1,98 @@
<?php
namespace think\swoole\websocket\socketio;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebsocketServer;
use think\Config;
use think\Request;
use think\swoole\contract\websocket\HandlerInterface;
class Handler implements HandlerInterface
{
/** @var WebsocketServer */
protected $server;
/** @var Config */
protected $config;
public function __construct(Server $server, Config $config)
{
$this->server = $server;
$this->config = $config;
}
/**
* "onOpen" listener.
*
* @param int $fd
* @param Request $request
*/
public function onOpen($fd, Request $request)
{
if (!$request->param('sid')) {
$payload = json_encode(
[
'sid' => base64_encode(uniqid()),
'upgrades' => [],
'pingInterval' => $this->config->get('swoole.websocket.ping_interval'),
'pingTimeout' => $this->config->get('swoole.websocket.ping_timeout'),
]
);
$initPayload = Packet::OPEN . $payload;
$connectPayload = Packet::MESSAGE . Packet::CONNECT;
$this->server->push($fd, $initPayload);
$this->server->push($fd, $connectPayload);
}
}
/**
* "onMessage" listener.
* only triggered when event handler not found
*
* @param Frame $frame
* @return bool
*/
public function onMessage(Frame $frame)
{
$packet = $frame->data;
if (Packet::getPayload($packet)) {
return false;
}
$this->checkHeartbeat($frame->fd, $packet);
return true;
}
/**
* "onClose" listener.
*
* @param int $fd
* @param int $reactorId
*/
public function onClose($fd, $reactorId)
{
return;
}
protected function checkHeartbeat($fd, $packet)
{
$packetLength = strlen($packet);
$payload = '';
if ($isPing = Packet::isSocketType($packet, 'ping')) {
$payload .= Packet::PONG;
}
if ($isPing && $packetLength > 1) {
$payload .= substr($packet, 1, $packetLength - 1);
}
if ($isPing) {
$this->server->push($fd, $payload);
}
}
}

View File

@@ -0,0 +1,171 @@
<?php
namespace think\swoole\websocket\socketio;
/**
* Class Packet
*/
class Packet
{
/**
* Socket.io packet type `open`.
*/
const OPEN = 0;
/**
* Socket.io packet type `close`.
*/
const CLOSE = 1;
/**
* Socket.io packet type `ping`.
*/
const PING = 2;
/**
* Socket.io packet type `pong`.
*/
const PONG = 3;
/**
* Socket.io packet type `message`.
*/
const MESSAGE = 4;
/**
* Socket.io packet type 'upgrade'
*/
const UPGRADE = 5;
/**
* Socket.io packet type `noop`.
*/
const NOOP = 6;
/**
* Engine.io packet type `connect`.
*/
const CONNECT = 0;
/**
* Engine.io packet type `disconnect`.
*/
const DISCONNECT = 1;
/**
* Engine.io packet type `event`.
*/
const EVENT = 2;
/**
* Engine.io packet type `ack`.
*/
const ACK = 3;
/**
* Engine.io packet type `error`.
*/
const ERROR = 4;
/**
* Engine.io packet type 'binary event'
*/
const BINARY_EVENT = 5;
/**
* Engine.io packet type `binary ack`. For acks with binary arguments.
*/
const BINARY_ACK = 6;
/**
* Socket.io packet types.
*/
public static $socketTypes = [
0 => 'OPEN',
1 => 'CLOSE',
2 => 'PING',
3 => 'PONG',
4 => 'MESSAGE',
5 => 'UPGRADE',
6 => 'NOOP',
];
/**
* Engine.io packet types.
*/
public static $engineTypes = [
0 => 'CONNECT',
1 => 'DISCONNECT',
2 => 'EVENT',
3 => 'ACK',
4 => 'ERROR',
5 => 'BINARY_EVENT',
6 => 'BINARY_ACK',
];
/**
* Get socket packet type of a raw payload.
*
* @param string $packet
*
* @return int|null
*/
public static function getSocketType(string $packet)
{
$type = $packet[0] ?? null;
if (!array_key_exists($type, static::$socketTypes)) {
return;
}
return (int) $type;
}
/**
* Get data packet from a raw payload.
*
* @param string $packet
*
* @return array|null
*/
public static function getPayload(string $packet)
{
$packet = trim($packet);
$start = strpos($packet, '[');
if ($start === false || substr($packet, -1) !== ']') {
return;
}
$data = substr($packet, $start, strlen($packet) - $start);
$data = json_decode($data, true);
if (is_null($data)) {
return;
}
return [
'event' => $data[0],
'data' => $data[1] ?? null,
];
}
/**
* Return if a socket packet belongs to specific type.
*
* @param $packet
* @param string $typeName
*
* @return bool
*/
public static function isSocketType($packet, string $typeName)
{
$type = array_search(strtoupper($typeName), static::$socketTypes);
if ($type === false) {
return false;
}
return static::getSocketType($packet) === $type;
}
}

View File

@@ -0,0 +1,45 @@
<?php
namespace think\swoole\websocket\socketio;
use think\swoole\contract\websocket\ParserInterface;
class Parser implements ParserInterface
{
/**
* Encode output payload for websocket push.
*
* @param string $event
* @param mixed $data
*
* @return mixed
*/
public function encode(string $event, $data)
{
$packet = Packet::MESSAGE . Packet::EVENT;
$shouldEncode = is_array($data) || is_object($data);
$data = $shouldEncode ? json_encode($data) : $data;
$format = $shouldEncode ? '["%s",%s]' : '["%s","%s"]';
return $packet . sprintf($format, $event, $data);
}
/**
* Decode message from websocket client.
* Define and return payload here.
*
* @param \Swoole\Websocket\Frame $frame
*
* @return array
*/
public function decode($frame)
{
$payload = Packet::getPayload($frame->data);
return [
'event' => $payload['event'] ?? null,
'data' => $payload['data'] ?? null,
];
}
}