现有个需求是需要把物联网设备的状态同步到两个独立的系统中(都是用webman开发)。计划设备回来的消息扔到mq中,然后两个系统去消费处理各自的业务,但发现消息被消费了多次,单条消息消费次数也不等于进程数。
我是在进程启动后做的监听操作,代码如下:
app\bootstrap\SubscribeRabbitmq.php
public static function start($worker)
{
$is_console = !$worker;
if ($is_console) {
// If you do not want to execute this in console, just return.
return;
}
if ($worker->id === 0) {
Log::channel('mq')->info('【初始化MQ】' . getmypid());
// 连接 RabbitMQ 实例并创建频道
(new Client([
'vhost' => envs('MQ_VHOST', '/'),
'user' => envs('MQ_USERNAME', 'guest'),
'password' => envs('MQ_PASSWORD', 'guest')
]))->connect()
->then(function (Client $client) {
return $client->channel();
})->then(function (Channel $channel) {
// 声明【设备状态变更】交换机和匿名队列
return $channel->exchangeDeclare('device.status.change', 'fanout')->then(function () use ($channel
) {
return $channel->queueDeclare('', false, false, true, false);
})->then(function (MethodQueueDeclareOkFrame $frame) use ($channel) {
// 获取绑定到队列上的帧对象,并将该帧的队列名传递给 $channel->queueBind() 函数来将队列绑定到 【设备状态变更】 交换机。
return $channel->queueBind($frame->queue, 'device.status.change')->then(function () use ($frame
) {
return $frame;
});
})->then(function (MethodQueueDeclareOkFrame $frame) use ($channel) {
// 设置消息消费函数,设置 $channel->consume() 函数,当消费者监听一个虚拟主机和队列时,该函数将一直运行,等待接收实例的消息
$channel->consume(
function (Message $message, Channel $channel, Client $client) {
Log::channel('mq')->info('【MQ-consume】收到一条待消费数据进程ID:' . getmypid(),
['message_content' => $message->content]);
},
$frame->queue,
'',
false,
true
);
});
});
}
}
记录的日志:
两个疑问:
1.不是很理解为啥有些消息消费了3次,有些2次,还有些消费的进程id和初始的不是同一个
2.我预期的是每个系统只要消费一次,这样的话不是应该用自定义进程?
自定义进程。bootstrap是每个进程都执行,包括webman的http进程和monitor进程,虽然你设置了worker->id === 0,但是http 0号进程和 monitor 0号进程都参与消费了,所以是两个消费进程在消费。至于重复消费,是不是没有ack?或者生产者确实发布了多个重复的消息?
我猜是exchange和queue配置问题
生产者重发发布这个可以排除。改成自定义进程就没问题了 谢谢大佬们