关于websocket onMessage 发送ack消息后 断开连接的疑问

Gin

问题描述

用websocket 创建pulsar客户端 消费后发送 ack 消息后 链接就中断了

public function onWorkerStart()
    {
        global $consumer, $service;

        $domain = 'ws://mqe.tuyacn.com:8285/';
        $option = [
            'ssl' => array(
                // 本地证书路径。 必须是 PEM 格式,并且包含本地的证书及私钥。
                'local_cert'        => '/your/path/to/pemfile',
                // local_cert 文件的密码。
                'passphrase'        => 'your_pem_passphrase',
                // 是否允许自签名证书。
                'allow_self_signed' => true,
                // 是否需要验证 SSL 证书。
                'verify_peer'       => false
            )
        ];
        $consumer = new AsyncTcpConnection($domain,$option);
        // 设置以ssl加密方式访问
        $consumer->transport = 'ssl';

        //pulsar逻辑处理
        $service = new Pulsar();

        $consumer->headers = [
            'username' => $access_id,
            'password' => self::genPwd($access_id,$access_key),
            "Connection" => "Upgrade",
        ];
        $consumer->onConnect = function(AsyncTcpConnection $con) {

        };

        $consumer->onMessage = function(AsyncTcpConnection $con, $data)use($service,$access_key) {

            //服务文件处理
            $message_id = $service->statisticsData($data,$access_key);

            $con->send(json_encode(['messageId' => $message_id]));
        };

        $consumer->connect();

        // 设置连接的onClose回调
        $consumer->onClose = function(AsyncTcpConnection $con)
        {
            echo "connection closed\n";
            //断线重连
            $con->reConnect(1);
        };
    }

这里写问题具体描述
官方给的c# demo ack消息是这样的

if (messageId != ""){
    var payload = System.Text.Encoding.UTF8.GetString(Convert.FromBase64String(jobject["payload"].ToString()));
    //msg handler
    DateTime s = DateTime.Now;
    MessageHandler(payload);
    Console.WriteLine("business processing cost="+(s-DateTime.Now));
    //send ack
    await webSocket.SendAsync(new ArraySegment<byte>(System.Text.Encoding.UTF8.GetBytes(ackStrJson)), WebSocketMessageType.Text, true, CancellationToken.None);
                }

为此你搜索到了哪些方案及不适用的原因

是我写的有问题吗

507 0 0
0个回答

×
🔝