WebSocket 当前连接收到了不属于它的订阅数据,新手,不知道是变量污染还是其他问题,求兄弟们帮忙看一下。
protected $socket = 'websocket://0.0.0.0:8090';
protected $clients = [];
/**
* 连接成功事件
* @param $connection
*/
public function onConnect($connection)
{
$connection->id = $this->getUniqueConnectionId($connection);
// 为每个连接初始化一个订阅频道的数组
$this->clients[$connection->id] = [
'connection' => $connection,
'subscriptions' => [], // 存储订阅的频道信息
'timerId' => ''
];
}
/**
* 接收消息事件
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
// 解析接收到的数据
$message = json_decode($data, true);
// 检查解析结果是否为null,即是否解析失败
if ($message === null) {
$connection->send('Failed to decode JSON data.'); // 发送解析失败响应
return;
}
if (!isset($message['type'])) {
return;
}
$queryId = Random::uuid();
$queryParam['symbol_id'] = $message['symbol_id'] ?? null;
$queryParam['channel'] = $message['channel'] ?? null;
// 现在 $message 包含解析后的数组数据,您可以继续处理它
switch ($message['type']) {
case 'subscribe':
// 订阅消息
$this->subscribe($connection, $queryParam);
break;
case 'unsubscribe':
// 取消订阅消息
$this->unsubscribe($connection, $queryParam);
break;
case 'pong':
// 收到客户端的pong响应
break;
default:
// 其他消息类型,可以根据需求进行处理
break;
}
}
// 加入频道
protected function subscribe($connection, $queryParam)
{
// 停止之前的定时器
$this->clearTimer($connection->id);
// 添加订阅信息到连接中
$this->clients[$connection->id]['subscriptions'][] = $queryParam;
// 创建一个5秒后发送数据的定时器
$timerId = Timer::add(5, function () use ($connection, $queryParam) {
// 获取当前连接的订阅信息
$subscriptions = $this->clients[$connection->id]['subscriptions'];
// 检查当前连接是否订阅了当前频道
$isSubscribed = false;
foreach ($subscriptions as $subscription) {
if ($subscription['channel'] === $queryParam['channel'] && $subscription['symbol_id'] === $queryParam['symbol_id']) {
$isSubscribed = true;
break;
}
}
if ($isSubscribed) {
// 发送数据给当前连接
$queryResult = $this->getDataFromCache($queryParam['channel'], $queryParam['symbol_id']);
if (!empty($queryResult)) {
$connection->clearBuffer();
$connection->send(json_encode($queryResult));
}
}
}, [], true);
// 记录定时器ID
$this->clients[$connection->id]['timerId'] = $timerId;
}
// 检查连接是否订阅了指定的频道
protected function isSubscribed($connection, $channel, $symbol_id)
{
$subscriptions = $this->clients[$connection->id]['subscriptions'];
foreach ($subscriptions as $subscription) {
if ($subscription['channel'] === $channel && $subscription['symbol_id'] === $symbol_id) {
return true;
}
}
return false;
}
// 取消频道时移除连接及其订阅信息
protected function unsubscribe($connection, $queryParam)
{
$subscriptions = &$this->clients[$connection->id]['subscriptions'];
foreach ($subscriptions as $key => $subscription) {
if ($subscription['channel'] === $queryParam['channel'] && $subscription['symbol_id'] === $queryParam['symbol_id']) {
unset($subscriptions[$key]);
break;
}
}
}
/**
* 心跳事件
* @param $connection
*/
public function onHeartbeat($connection)
{
// 可以在这里处理心跳逻辑,例如检查连接是否存活等
}
/**
* 连接关闭事件
* @param $connection
*/
public function onClose($connection)
{
$this->clearTimer($connection->id);
unset($this->clients[$connection->id]);
}
/**
* 清除连接的定时器
* @param $connectionId
*/
protected function clearTimer($connectionId)
{
if (isset($this->clients[$connectionId]['timerId'])) {
Timer::del($this->clients[$connectionId]['timerId']);
}
}
/**
* 获取唯一的连接ID,可以添加worker ID前缀
* @param $connection
* @return string
*/
protected function getUniqueConnectionId($connection)
{
return $connection->worker->id . '_' . $connection->id;
}
// 从缓存中获取数据
protected function getDataFromCache($channel, $symbol_id)
{
}
}
https://www.workerman.net/doc/webman/process.html 不用设置listen