首先感谢 workbunny 提供这么一款很好用的插件。使用【workbunny】RabbitMQ客户端,插件地址:https://www.workerman.net/plugin/67, 在消费者中如果有阻塞并超过一定时间,就会导致重复消费。
"workerman/webman-framework": "^1.5.0"
"workbunny/webman-rabbitmq": "^1.0"
生产者:IndexController.php
workbunny rabbitmq配置:app.php
消费者:TestBuilder.php
通过日志可以看到,在出现错误前,是有消息消费成功并返回了 ACK。在出现错误后,相关进程重启,导致消息开始重新消费,以前返回的 ACK 似乎无效?RabbitMQ的控制台,始终显示有对应数量的信息处于 Unacked 状态。
消费也要做幂等性
使用幂等的确能弥补重复消费,但需要引入新的服务比如数据库或缓存;测试发现一条消息会存在不止重复消费一次的情况,即使使用幂等但由此带来的阻塞和性能消耗也是未知的。
workbunny/webman-rabbitmq是基于bunny的一个异步rabbitmq-client,底层依赖event-loop,相当于把所有事件都投送到loop中执行,你在消费函数里使用sleep会出让当前进程,让当前进程的event-loop暂时停转等待cpu调度;
您把sleep换成
来模拟阻塞试试是否还会出现重复消费的情况;
另外在实际使用上,消费者逻辑建议增加幂等性的判断。
最后感谢您的建议和意见,我后续会根据目前的猜测论证一下,然后优化这个问题;
最近也是比较忙,2.0.0-beta也搁置了一段时间了
所有的行为都是异步的,包括ack,在打印了ack后,程序将ack行为投放到event-loop,然后进程杀死,event-loop中的ack消息可能还没有真正的执行,但event-loop已经随当前进程杀死了
这个也是目前2.X开发的一个方向
添加幂等性判断后,会存在这样一种情况,仅供参考,测试用例描述以下:
以前在项目中也使用过该插件,十分好用。最近在业务上做批量发送消息(发送消息是阻塞的)发现了该问题。也了解到作者技术强并且乐于分享。在此表示感谢。
这个也是目前2.x版本开发要解决的问题之一,因为所有行为都是异步的以后会带来很多行为会滞后执行,那么就没办法把控,导致一些意外情况的产生;比如异步publish后进程被杀死,可能导致数据实际上没有投放成功;消费者成功消费了以后返回ack,但进程因意外被杀死,可能导致消息重复消费。
因为目前1.x属于能将就用就将就用状态,现目前是全力开发2.x以解决之前已知的这些问题,所以你可以描述一下的的业务需求,我看看尝试给你出个方案,可能不一定使用rabbitmq。
当前的业务场景,由管理端给微信用户推送订阅信息:
尝试的方案(在选择方案前还未发现超时重复消费问题):
根据你们的业务来看,貌似没有需要频繁关闭/重启消费进程的地方,如果是进程持续运行的话,实际上不存在重复消费的问题,重复消费的问题只存在于关闭进程的那一瞬间;
方案一:尽可能减少重复消费的方案:
方案二:换其他队列组件试试
可以先暂时测试一下https://www.workerman.net/plugin/69是否满足你们的需求,这个队列同样具备ack机制,同时也经历过小范围生产测试,内部的执行方式是同步执行
我用您的插件发布为什么不会持久化呢?
这个插件:
https://www.workerman.net/plugin/67
用的是最新版本:
如果使用的是1.x,可以更新到1.0.11试试,如果是用的2.x,可以更新到2.0.0-beta.2试试
好的,谢谢!