对接IOT的amqp的时候,已经建立正常连接。如下:
问题: 在进行多次数据包的传送后, 云平台的数据包接收 出现 不完整的情况。导致出现:unknown cmd MESSAGE
报错。
代码位置stomp包下的Client
类的onConnectionMessage
方法
进一步排查TcpConnection
下第557行public function baseRead($socket, $check_eof = true)
方法处理数据包可能有问题。
因为前几次包的数据解析正常,后面就一直不对了。
原因已经找到了,我觉得需要提交pr了。
包的长度是在
vendor/workerman/stomp/src/Protocols/Stomp.php
里的input方法计算的,看下这里是不是有什么问题,返回的长度不一致。老大, 我在
TcpConnection
类下baseRead
方法下,if ($this->_currentPackageLength === 0) { break; }
方法体内部追加$this->_recvBuffer = '';
作用是修复因Remove the current package from the receive buffer后, 接收下次数据包的时候 ( $this->_recvBuffer .= $buffer;) 导致数据包长度不一致。$this->_recvBuffer .= $buffer;
原因就是recvBuffer 会拼接每次的新数据包,而当$this->_currentPackageLength
为0 的时候,没有clean 这个$this->_recvBuffe
.应该不是workerman的问题。
$this->_currentPackageLength
为0,说明当前包的数据不够,无法计算包长,继续等待数据。数据不够肯定不能将已经收到的数据清零的,那样就丢包了。stomp包的长度是在
vendor/workerman/stomp/src/Protocols/Stomp.php
里的input方法计算的,如果你每个包的长度都一样,但是$this->_currentPackageLength,应该在input方法里找原因首先,包上这样的`$buffer = 'MESSAGE
qos:1
destination:/a1pyomK5esO//user/update
message-id:1524185981254223361
topic:/a1pyomK5esO//user/update
subscription:client-1
generateTime:1652229230594
{"breathe":"0","heartbeat":"0","onbed":"0","fanshen":"0","pressure":"0","deviation":"193"}' . "\0" . '' . "\0" . '';
从buffer中 get full package后,余下的buffer 不够input方法计算,返回
$this->_currentPackageLength为0, 如果不添加
$this->_recvBuffer = '', 继续等待数据。等来下一次新的数据包发来的时候,
$this->_recvBuffer .= $buffer;会拼接上一次余下的buffer,导致包的数据就不对了。然后 input 方法计算后的的长度,给
$one_request_buffer = \substr($this->_recvBuffer, 0, $this->_currentPackageLength);` 得到数据就是错误的。为什么会余下buffer,因为上一次的input计算的包长不对
不知道末尾为什么有两个空字符,"\0" . '' . "\0",可能是这个原因。
对的,我也很疑惑
老大 我建议将 input 下的
$end_pos = strpos($buffer, "\x00");
换成$end_pos = strripos($buffer, "\x00");
这样就可以了。。
也不能这样,万一它连续发了2个包就出问题了。我把多余的空字符当心跳吧
composer require workerman/stomp ^1.0.6
更新试下解决了,老大考虑的周全
mark
阿里云末尾是以2个空字符结尾的,而我们input里面用的
strpos($buffer, "\x00")
只判断了第一个,所有建议使用strripos($buffer, "\x00")
定位最后一位空字符位置你这是用的阿里云的那个产品?
AMQP
RabbitMQ 还是RocketMQ?