写了一个rabbitmq的工具类,下面贴代码,需要生产消息时直接
\util\Rabbitmq\publishWorkerQueue($queueName, $msg);
需要消费消息时:
\util\Rabbitmq\consumeWorkerQueue($queueName, $callback);
我一直有个疑问,这个rabbit的connection对象,应该是在worker启动时就创建好,然后在需要的地方直接调用就行,否则像现在这样,每生产一次要建立一次连接再销毁,应该会浪费资源吧。
但是我想不明白该在什么地方怎么写这个全局建立连接对象的方法,对于logger对象也是同样的疑问,希望可以得到指点。
具体代码如下:
<?php
namespace util;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use support\Log;
class Rabbitmq
{
protected static function Connection()
{
$connection = new AMQPStreamConnection(
env('RABBITMQ_HOST', '127.0.0.1'),
env('RABBITMQ_PORT', 5672),
env('RABBITMQ_USER', 'guest'),
env('RABBITMQ_PASSWORD', 'guest')
);
return $connection;
}
/**
* 发布消息到worker队列,支持一次性发布多个消息
*
* @author Aaron <chenqiang@h024.cn>
*
* @param string $queueName 队列名称
* @param string|array $msgData 需要入队的消息,单一消息为字符串类型,多个消息是数组类型
*/
public static function publishWorkerQueue(string $queueName = '', $msgData)
{
$log = Log::channel('producer');
if ($queueName == '') {
$queueName = env('RABBITMQ_DEFAULT_QUEUE', 'default');
}
$connection = self::Connection();
$channel = $connection->channel();
$channel->queue_declare($queueName, false, true, false, false);
if (!is_array($msgData)) {
$msgData = array($msgData);
}
// 遍历数组,对每一个元素做入队操作
foreach ($msgData as $dataBody) {
// 把字符串类型的元素入队,忽略其他类型的元素
try {
$dataBody = (string)$dataBody;
} catch (\Exception $e) {
$dataBody = json_encode($dataBody);
}
$msg = new AMQPMessage(
$dataBody,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$log->debug("{$queueName}入队数据: {$dataBody}");
$channel->basic_publish($msg, '', $queueName);
}
$channel->close();
$connection->close();
}
/**
* 从一个worker队列里消费一条记录
*
* @author Aaron <chenqiang@h024.cn>
*
* @param string $queueName 队列名称
* @param callable $callback 消费的回调函数,接收值$msg了队列中的一条消息
* @param bool $autoAck 自动确认消费,默认为false,需要在消费回调里手动执行$msg->ack()做消费成功确认
*/
public static function consumeWorkerQueue(string $queueName, callable $callback, bool $autoAck = false)
{
$log = Log::channel('consumer');
if ($queueName == '') {
$queueName = env('RABBITMQ_DEFAULT_QUEUE', 'default');
}
$connection = self::Connection();
$channel = $connection->channel();
$channel->queue_declare($queueName, false, true, false, false); // 默认自动确认
$channel->basic_qos(null, 1, null); // 一次只消费一条
$channel->basic_consume($queueName, '', false, $autoAck, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
pcntl_signal_dispatch(); // 针对exit with status 9
}
}
}
进程与进程之间是资源阻隔的,在当前进程里,global或静态变量都可以承载这个连接资源,如果你想要全局的,所有进程共享,那不行,除非用线程
文档里有写,地址: https://www.workerman.net/doc/webman/others/bootstrap.html
自己注意,初始化的是当前进程有效的.
这个文档我看了,同时也参考了support\bootstrap\Session和support\bootstrap\LaravelDb,但还是没想明白具体我该怎么写,比如我创建app/bootstrap/Rabbitmq.php文件,在它的start文件法写了$connection = new AMQPStreamConnection(...),然后呢,怎么能让process中的RabbitConsumer.php能够访问到这个$connection呢?
数据库连接是怎么实现进程内单例的,你就可以怎么实现,最简单的方式就是class中用static变量保存new AMQPStreamConnection(...),然后下次用的时候都用这个static变量