关于AsyncTcpConnection充当客户端掉线问题?

FlyLow

问题描述

这个问题反复看了好几次论坛里的回答还是无法解决。
大致逻辑如下:
1.前端用户连接A服务器workerman的Websocket服务。
2.A服务器在Event中即充担客户端也担任服务端,A服务器连接B服务的WebScoket服务。
3.B服务连接后会一直吐一段数据给A服务器,A服务器负责转发给前端。
4.期间当多个前端用户同时使用A服务WebScoket服务时,其中某个A服务器与B服务器的链接会被中断,且不触发onClose,本地调试和日志排查后无报错。且是一个概率事件。
5.想和大佬们讨论下这个问题应该如何精准定位和排查以及处理

 private static $ws_connection;

    public static function onWorkerStart($worker)
    {
        self::$ws_connection = new AsyncTcpConnection("ws://***.***.***.**:80");
    }

    public static function onConnect($client_id)
    {

    }

    public static function onWebSocketConnect($client_id, $data)
    {

    }

    public static function onMessage($client_id, $message)
    {
        /**
         * $msg = [ ];
         *
         */
        $msg = json_decode($message,TRUE);
                    try {
                        /* <<连接远端socket>> Start  */
                        // 以websocket协议连接远程websocket服务器
                            // 连上后发送数据
                            $sendData = [];
                            self::$ws_connection->onConnect = function(AsyncTcpConnection $connection)use ($sendData,$client_id){
                                $_SESSION['time_id'] = Timer::add(10,function () use($connection){
                                    $connection->send('{"type":"ping"}');
                                });
                                static $is_first_connect = true;
                                if (!$is_first_connect) return;
                                $is_first_connect = false;
                                $connection->send(json_encode($sendData,JSON_UNESCAPED_UNICODE));
                                Log::info($client_id . '连接成功');
                            };
                            // 远程websocket服务器发来消息时
                            self::$ws_connection->onMessage = function(AsyncTcpConnection $connection, $dataMsg) use ($client_id,$data,$res){
                                Log::info($client_id . $dataMsg);
                                $dataMsg = json_decode($dataMsg,true);
                                if(isset($dataMsg['type']) && $dataMsg['type'] == 'msg'){             
                                Gateway::sendToClient($client_id,json_encode($choices,JSON_UNESCAPED_UNICODE));
                                };
                            // 连接上发生错误时,一般是连接远程websocket服务器失败错误
                            self::$ws_connection->onError = function($connection, $code, $msg){
                                Log::error("error: $msg\n");
                            };
                            self::$ws_connection->onClose = function($connection) use ($client_id){
                                Log::info($client_id . '我下线了,重连\n');
                            };
                            // 设置好以上各种回调后,执行连接操作
                            self::$ws_connection->connect();
                            /* <<连接远端socket>> End  */
                    }catch (\Exception $e){
                        $res['code'] = $e->getCode();
                        $res['msg'] = $e->getMessage();
                        Gateway::sendToClient($client_id,json_encode($res,JSON_UNESCAPED_UNICODE));
                        return;
                    }

            }
    public static function onClose($client_id)
    {
//        Log::info($client_id . 'Close');
    }
766 6 1
6个回答

小W

先梳理一下代码整体的逻辑吧

  • 暂无评论
damao

代码有bug吧。
1、你代码里用的self::$ws_connection,就是所有前端用户共用一个 self::$ws_connection,但是你代码里又是在onMessage里不断重置 self::$ws_connection。应该是在onWorkerStart里设置self::$ws_connection才对。

2、没有人保证连接永远不断开,所以要在onClose里加一个reconnect()逻辑,断开自动重连。前端代码也是一样。

其他都就是连接数超过1024要装event扩展,优化linux内核

  • FlyLow 2023-08-08

    好的 感谢 我梳理一下看看

  • FlyLow 2023-08-08

    静态属性都换成new之后掉线情况更严重了,突然某一秒就断了,服务端知道断线,客户端不知道也不触发close。

  • damao 2023-08-08

    可能你哪里代码还有bug吧。既然没触发onclose,那么你怎么知道断了?

  • FlyLow 2023-08-08

    我在服务端打印了 Gateway::isOnline($client_id) 文档中写这个值是1的时候在线 0的时候就掉线了

  • damao 2023-08-08

    你不是说 ”其中某个A服务器与B服务器的链接会被中断“ 么?怎么会用$client_id判断在线,$client_id是浏览器到gatewayWorker的链接id,不是A服务器与B服务器的链接。

  • FlyLow 2023-08-08

    A服务器到B服务器是通过AsyncTcpContent通过websocket链接的 所以是可以在B服务器看到A服务器与B服务器的链接是否在线的

efnic

onWorkerStart干的事情

/**
 * 进程启动时
 * @param Worker $worker
 * @return void
 */
public function onWorkerStart(Worker $worker): void
{
    try {
        $this->startChannel();
        $this->startCrontab();

        $this->startPublic();
        $this->startPrivate();
        //实时交易
        TradeOrderHelper::start($this->websocket_api->getConfig());
    } catch (Throwable $throwable) {
        print_r(['【onWorkerStart系统异常】' . date('Y-m-d H:i:s') . ' 文件:' . __FILE__ . ' 行号:' . __LINE__, $throwable->getMessage()]);
        sleep(2);
        Worker::stopAll();
    }
}
  • 暂无评论
efnic

启动公共连接:

/**
 * @return void
 * @throws Exception
 */
private function startPublic(): void
{
    [$http, $public, $private] = $this->service_url;
    $connection = new AsyncTcpConnection($public, $this->getContextOption());
    $connection->setLabel(AsyncTcpConnection::LABEL_PUBLIC);
    $this->bindConnection($connection);
    $connection->connect();
}

设置异步连接:

/**
 * 设置异步连接
 * @param AsyncTcpConnection $con
 * @return void
 */
private function bindConnection(AsyncTcpConnection $con): void
{
    $con->setStartTime(time());
    $con->account_name = $this->websocket_api->getConfig()->title;
    $con->api_key = $this->websocket_api->getConfig()->api_key;
    // 设置以ssl加密方式访问
    $con->transport = 'ssl';
    $con->onError = function (AsyncTcpConnection $connection, $err_code, $err_msg) {
        //echo "$err_code, $err_msg\n";
    };
    $con->onClose = function (AsyncTcpConnection $connection) {
    };
    $con->onConnect = function (AsyncTcpConnection $con) {
        $con->send(json_encode($this->websocket_api->login()));
        $con->send('ping');
    };
    $con->onMessage = function (AsyncTcpConnection $con, $data) {
        if (!$data) {
            return;
        }
        try {
            switch ($data) {
                case 'pong':    //心跳回应
                    $filename = 'pong_' . $con->getLabel() . '_' . $con->api_key . '.log';
                    $account_name = $this->websocket_api->getConfig()->title;
                    file_put_contents($this->logsPath() . $filename, $account_name . ' 最后收到pong:' . date('Y-m-d H:i:s'));
                    $con->setLastPongTime(time());
                    break;
                default:
                    $event = json_decode($data, true);
                    if (isset($event['event'])) {
                        //事件
                        //WssMessageService::listenEvent($event, $data, $this->websocket_api, $con);
                        $handler = EventFactory::make($event);
                        $handler->handle($event, $this->websocket_api, $con);
                    } else if (isset($event['arg']['channel'])) {
                        //频道
                        //WssMessageService::listenChannel($event, $data, $this->websocket_api, $con);
                        $handler = ChannelFactory::make($event);
                        $handler->handle($event, $this->websocket_api, $con);
                    } else if (isset($event['id']) && isset($event['op'])) {
                        //交易
                        //WssMessageService::listenOp($event, $data, $this->websocket_api, $con);
                        $handler = OperationFactory::make($event);
                        $handler->handle($event, $this->websocket_api, $con);
                    }
                    break;
            }
        } catch (Throwable $throwable) {
            fileDebug([($con->isPublicChannel() ? '【公共频道】' : '【私有频道】') . date('Y-m-d H:i:s'), '【onMessage业务异常】 文件:' . __FILE__ . ' 行号:' . __LINE__, $throwable->getMessage(), $data], 'logs', 'onMessage_Throwable');
        }
    };
}
  • 暂无评论
efnic

断线重连逻辑:

/**
 * 检查ping状态,超时断线重连
 * @param AsyncTcpConnection $con
 * @return void
 * @throws Exception
 */
private function checkStatus(AsyncTcpConnection $con): void
{
    try {
        $ttl = config('okx.ping_interval', 20);
        $last_pong_time = $con->getLastPongTime();
        $start_time = $con->getStartTime();
        if (empty($last_pong_time) && ($start_time + $ttl > time())) {
            //刚启动,还没收到pong
            return;
        }

        //超时判断
        if (($last_pong_time + $ttl * 1.3) < time()) {
            RobotNotify::send('ping状态异常正在重连', 'checkStatus:' . $con->getLabel() . PHP_EOL . ($con->account_name ?? ''));
            //重连
            $con->close();
            if ($con->isPrivateChannel()) {
                $this->startPrivate();
            } else {
                $this->startPublic();
            }
        }
    } catch (Throwable $throwable) {
        //通知开发者
        RobotNotify::send('重连异常', 'reconnect:' . $con->getLabel() . PHP_EOL . ($con->account_name ?? '') . PHP_EOL . $throwable->getMessage());
        Worker::stopAll();
    }
}
  • efnic 2023-08-08

    定时器周期的调用checkStatus方法,检查连接是否正常(最后收到客户端消息的时间,超时主动断开)

  • FlyLow 2023-08-09

    感谢 逻辑都没有问题 最后发现是Phar打包的问题导致的

  • 小W 2023-08-09

    哈哈哈

liong

你看看这个https://www.workerman.net/q/11278

  • 暂无评论
年代过于久远,无法发表回答
×
🔝