WebSocket 当前连接收到了不属于它的订阅数据

程序玩

问题描述

WebSocket 当前连接收到了不属于它的订阅数据,新手,不知道是变量污染还是其他问题,求兄弟们帮忙看一下。

为此你搜索到了哪些方案及不适用的原因

protected $socket = 'websocket://0.0.0.0:8090';

protected $clients = [];

/**
 * 连接成功事件
 * @param $connection
 */
public function onConnect($connection)
{
    $connection->id = $this->getUniqueConnectionId($connection);
    // 为每个连接初始化一个订阅频道的数组
    $this->clients[$connection->id] = [
        'connection' => $connection,
        'subscriptions' => [], // 存储订阅的频道信息
        'timerId' => ''
    ];
}

/**
 * 接收消息事件
 * @param $connection
 * @param $data
 */
public function onMessage($connection, $data)
{
    // 解析接收到的数据
    $message = json_decode($data, true);

    // 检查解析结果是否为null,即是否解析失败
    if ($message === null) {
        $connection->send('Failed to decode JSON data.'); // 发送解析失败响应
        return;
    }

    if (!isset($message['type'])) {
        return;
    }

    $queryId = Random::uuid();
    $queryParam['symbol_id'] = $message['symbol_id'] ?? null;
    $queryParam['channel']   = $message['channel'] ?? null;

    // 现在 $message 包含解析后的数组数据,您可以继续处理它
    switch ($message['type']) {
        case 'subscribe':
            // 订阅消息
            $this->subscribe($connection, $queryParam);
            break;
        case 'unsubscribe':
            // 取消订阅消息
            $this->unsubscribe($connection, $queryParam);
            break;
        case 'pong':
            // 收到客户端的pong响应
            break;
        default:
            // 其他消息类型,可以根据需求进行处理
            break;
    }

}

// 加入频道
protected function subscribe($connection, $queryParam)
{
    // 停止之前的定时器
    $this->clearTimer($connection->id);
    // 添加订阅信息到连接中
    $this->clients[$connection->id]['subscriptions'][] = $queryParam;

    // 创建一个5秒后发送数据的定时器
    $timerId = Timer::add(5, function () use ($connection, $queryParam) {
        // 获取当前连接的订阅信息
        $subscriptions = $this->clients[$connection->id]['subscriptions'];

        // 检查当前连接是否订阅了当前频道
        $isSubscribed = false;
        foreach ($subscriptions as $subscription) {
            if ($subscription['channel'] === $queryParam['channel'] && $subscription['symbol_id'] === $queryParam['symbol_id']) {
                $isSubscribed = true;
                break;
            }
        }

        if ($isSubscribed) {
            // 发送数据给当前连接
            $queryResult = $this->getDataFromCache($queryParam['channel'], $queryParam['symbol_id']);
            if (!empty($queryResult)) {
                $connection->clearBuffer();
                $connection->send(json_encode($queryResult));
            }
        }
    }, [], true);

    // 记录定时器ID
    $this->clients[$connection->id]['timerId'] = $timerId;
}

// 检查连接是否订阅了指定的频道
protected function isSubscribed($connection, $channel, $symbol_id)
{
    $subscriptions = $this->clients[$connection->id]['subscriptions'];
    foreach ($subscriptions as $subscription) {
        if ($subscription['channel'] === $channel && $subscription['symbol_id'] === $symbol_id) {
            return true;
        }
    }
    return false;
}

// 取消频道时移除连接及其订阅信息
protected function unsubscribe($connection, $queryParam)
{
    $subscriptions = &$this->clients[$connection->id]['subscriptions'];
    foreach ($subscriptions as $key => $subscription) {
        if ($subscription['channel'] === $queryParam['channel'] && $subscription['symbol_id'] === $queryParam['symbol_id']) {
            unset($subscriptions[$key]);
            break;
        }
    }
}

/**
 * 心跳事件
 * @param $connection
 */
public function onHeartbeat($connection)
{
    // 可以在这里处理心跳逻辑,例如检查连接是否存活等
}

/**
 * 连接关闭事件
 * @param $connection
 */
public function onClose($connection)
{
    $this->clearTimer($connection->id);
    unset($this->clients[$connection->id]);
}

/**
 * 清除连接的定时器
 * @param $connectionId
 */
protected function clearTimer($connectionId)
{
    if (isset($this->clients[$connectionId]['timerId'])) {
        Timer::del($this->clients[$connectionId]['timerId']);
    }
}

/**
 * 获取唯一的连接ID,可以添加worker ID前缀
 * @param $connection
 * @return string
 */
protected function getUniqueConnectionId($connection)
{
    return $connection->worker->id . '_' . $connection->id;
}

// 从缓存中获取数据
protected function getDataFromCache($channel, $symbol_id)
{

}

}

595 1 0
1个回答

不抽烟
<?php
namespace app\process;

use Workerman\Timer;

use think\facade\Db;

use Workerman\Protocols\Ws;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;

class TaskDev
{

    public function onWorkerStart()
    {

        echo '监听/n';
        // 以websocket协议连接远程websocket服务器
        $ws_connection = new AsyncTcpConnection("ws://xxx.xxx.xxx.xxx:xx/xxxxxx");
        // 每隔55秒向服务端发送一个opcode为0x9的websocket心跳
        $ws_connection->websocketPingInterval = 55;
        // 自定义http头
        $ws_connection->headers = [];
        // 设置数据类型,默认BINARY_TYPE_BLOB为文本
        $ws_connection->websocketType = Ws::BINARY_TYPE_BLOB; // BINARY_TYPE_BLOB为文本 BINARY_TYPE_ARRAYBUFFER为二进制
        // 当TCP完成三次握手后
        $ws_connection->onConnect = function($connection){
            // echo "tcp connected\n";
        };
        // 当websocket完成握手后
        $ws_connection->onWebSocketConnect = function(AsyncTcpConnection $con, $response) {
            echo "握手";
            echo $response;

            $data = [
                "params"=>[],
                "time"=>10
            ];
            $con->send(json_encode($data));

        };
        // 远程websocket服务器发来消息时
        $ws_connection->onMessage = function($connection, $data){
            echo $data;
            $res_data = [];
            $devlist = json_decode($data,true);

            foreach ($devlist as $key => $value) {

            }

        };
        // 连接上发生错误时,一般是连接远程websocket服务器失败错误
        $ws_connection->onError = function($connection, $code, $msg){
            echo "error: $msg\n";
        };
        // 当连接远程websocket服务器的连接断开时
        $ws_connection->onClose = function($connection){
            echo "connection closed and try to reconnect\n";
            // 如果连接断开,1秒后重连
            $connection->reConnect(1);
        };
        // 设置好以上各种回调后,执行连接操作
        $ws_connection->connect();

    }

}
×
🔝