用websocket 创建pulsar客户端 消费后发送 ack 消息后 链接就中断了
public function onWorkerStart()
{
global $consumer, $service;
$domain = 'ws://mqe.tuyacn.com:8285/';
$option = [
'ssl' => array(
// 本地证书路径。 必须是 PEM 格式,并且包含本地的证书及私钥。
'local_cert' => '/your/path/to/pemfile',
// local_cert 文件的密码。
'passphrase' => 'your_pem_passphrase',
// 是否允许自签名证书。
'allow_self_signed' => true,
// 是否需要验证 SSL 证书。
'verify_peer' => false
)
];
$consumer = new AsyncTcpConnection($domain,$option);
// 设置以ssl加密方式访问
$consumer->transport = 'ssl';
//pulsar逻辑处理
$service = new Pulsar();
$consumer->headers = [
'username' => $access_id,
'password' => self::genPwd($access_id,$access_key),
"Connection" => "Upgrade",
];
$consumer->onConnect = function(AsyncTcpConnection $con) {
};
$consumer->onMessage = function(AsyncTcpConnection $con, $data)use($service,$access_key) {
//服务文件处理
$message_id = $service->statisticsData($data,$access_key);
$con->send(json_encode(['messageId' => $message_id]));
};
$consumer->connect();
// 设置连接的onClose回调
$consumer->onClose = function(AsyncTcpConnection $con)
{
echo "connection closed\n";
//断线重连
$con->reConnect(1);
};
}
这里写问题具体描述
官方给的c# demo ack消息是这样的
if (messageId != ""){
var payload = System.Text.Encoding.UTF8.GetString(Convert.FromBase64String(jobject["payload"].ToString()));
//msg handler
DateTime s = DateTime.Now;
MessageHandler(payload);
Console.WriteLine("business processing cost="+(s-DateTime.Now));
//send ack
await webSocket.SendAsync(new ArraySegment<byte>(System.Text.Encoding.UTF8.GetBytes(ackStrJson)), WebSocketMessageType.Text, true, CancellationToken.None);
}
是我写的有问题吗