【已解决】使用workerman/rabbitmq写消费者,捕获到异常时,无法正常消费

dkou

问题描述

我希望的消费者业务流程时:

1)先从队列拿一条信息出来,先处理(ack)掉,无论业务逻辑成不成功
2)信息放到处理业务程序去处理。

要问的问题

如果业务程序中途出现异常,或者程序中途kill掉,发现队列中这条信息还在(或者ack不成功?)
如果进程正常跑没有异常或kill队列的信息是能正常消费的。

应该怎么改才能满足我的业务流程???????????

贴一下代码希望大佬指点指点

截图

备注:说说为啥要先处理信息

我的思路是,无论这条信息对应的业务是否成功,消息都要处理,防止队列数据一直卡着。如果业务处理不成功,我会重新向队尾发送一条延时队列,重新处理。

659 3 1
3个回答

SillyDog

Bunny\Channel::consume 这个方法 有个noack 参数 默认为 false 改为true 看看

  • dkou 2024-02-23

    试过了,设置true的话,只要中途进程断了,队列里面所有信息都会清空了

owenzhang

不懂问下。你这个die()不会导致workerman退出吗?

  • nitron 2024-02-23

    旁边不是注释"模拟捕获异常或进程被kill"么

  • dkou 2024-02-24

    模拟异常

dkou

问题已自行解决

经过查看github上的examples拿到头绪,再次感谢walkor,examples上写的比手册详细很多,而且每种模式都有写的。

问题的关键:

通过自己调试发现,上述写法是流程是,先把队列信息批量取出来,处理,然后再一次性ack的,即假设队列有10条信息,会看到10条信息状态会先变成Unacked,等10条信息对也业务逻辑完,这10条信息才会消失。
因此,如果我在处理第3条时die()了,其实前2条队列信息是还没收到ack的,所以队列信息还在。
而我的要求,是要逐条信息处理,每条信息处理完就ack的,因此,应该要设置每次批量获取信息的数量,设置为1。以下是代码
截图

题外话

经指点,我这种“无论这条信息对应的业务是否成功,消息都要处理,防止队列数据一直卡着。如果业务处理不成功,我会重新向队尾发送一条延时队列,重新处理。”处理方式在rabbitmq其实没必要的,因为本来就有ack机制,unacked的队列,会自己回到reday,不会被删除。因为以前使用redis做队列留下的思维。

  • 暂无评论
×
🔝