通过综合分析、研究和探索workerman和PhpAmqpLibr相关手册,经过长期的实践,现分享一套基于workerman的rabbitmq客户端生产者和消费者代码,供大家测试,使用。
rabbitmq_productor.php
<?php
require_once ('./vendor/autoload.php');
require_once ("./Lib_global.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\AsyncTcpConnection;
$worker = new Worker();
//开启进程数量
$worker->count = 4;
$worker->name = "rabbitmq_productor";
$date = date("Y-m-d");
Worker::$pidFile = "var/mq_service_productor.pid";
Worker::$logFile = "var/mq_service_productor_logFile.log";
Worker::$stdoutFile = "var/mq_service_productor_stdout.log";
$worker->onWorkerStart = function () {
global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
$rabbitmq_exchange_name = "exchange_name";
$rabbitmq_queueName = "queuePrefix_QueueName";
// 连接 rabbitmq 服务
$rabbit_connection = new AMQPStreamConnection(RABBITMQ_SERVER_IP, RABBITMQ_SERVER_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);
// 获取信道
$rabbit_channel = $rabbit_connection->channel();
//声明创建交换机
$rabbit_channel->exchange_declare( $rabbitmq_exchange_name , 'topic', false, true, false);
// 声明创建队列
$rabbit_channel->queue_declare( $rabbitmq_queueName , false, true, false, false);
// 绑定队列
$rabbit_channel->queue_bind($rabbitmq_queueName , $rabbitmq_exchange_name, $rabbitmq_queueName);
//可以修改时间间隔,如果为0.002秒,则每秒产生500*4=2000条
Timer::add( 0.002 , function() {
global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
//需要向rabbitmq队列投递消息的内容,通常为数组,经过json转换再发送
$data_all = array(
'name' => "张三",
'time' => time(),
);
$data_all_out_json = json_encode($data_all , JSON_UNESCAPED_UNICODE );
$data_all_out_msg = new AMQPMessage($data_all_out_json, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
//向队列里面写内容
@$rabbit_channel->basic_publish($data_all_out_msg , $rabbitmq_exchange_name , $rabbitmq_queueName);
});
};
Worker::runAll();
rabbitmq_comsumer.php
<?php
require_once ('./vendor/autoload.php');
require_once ("./Lib_global.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\AsyncTcpConnection;
$worker = new Worker();
//开启进程数量
$worker->count = 10;
$worker->name = "rabbitmq_comsumer";
$date = date("Y-m-d");
Worker::$pidFile = "var/rabbitmq_comsumer.pid";
Worker::$logFile = "var/rabbitmq_comsumer_logFile.log";
Worker::$stdoutFile = "var/rabbitmq_comsumer_stdout.log";
$worker->onWorkerStart = function () {
global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
$rabbitmq_exchange_name = "exchange_name";
$rabbitmq_queueName = "queuePrefix_QueueName";
// 连接 rabbitmq 服务
$rabbit_connection = new AMQPStreamConnection(RABBITMQ_SERVER_IP, RABBITMQ_SERVER_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);
// 获取信道
$rabbit_channel = $rabbit_connection->channel();
// 声明队列
$rabbit_channel->queue_declare( $rabbitmq_queueName , false, true, false, false);
// 绑定队列
$rabbit_channel->queue_bind($rabbitmq_queueName , $rabbitmq_exchange_name, $rabbitmq_queueName);
// 消费者订阅队列
$rabbit_channel->basic_consume($rabbitmq_queueName , '', false, false, false, false,
function ($msg){
global $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
$data_all_str = $msg->body;
// 消息确认,表明已经收到这条信息
@$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//echo "{$data_all_str}\n";
//这里是业务处理逻辑
//如果这条消息处理失败,你可以在这里将其再次放回消息队列(最好给消息做个放回去的次数判断,避免无限失败和无限循环)
});
//这里是重点,网上很多教程起的使用while死循环容易导致程序异步代码无法执行,这种方法能避免
//按照每个进程每秒处理1000条来设定定时器,每个进程每秒消费1000条,4个进程每秒消费4000条,经过实际验证,将时间改小也无法提升单个进程的处理速度
//实际测试,4个进程每秒的消费能力有4000左右,可以满足很多中小型系统的应用,如果想提升系统处理能力,
//可以增加消费者进程数量来解决,比如我将进程数量提升到10个,每秒处理能力约为1万
//这个机制,希望能力更强的你来进行优化
Timer::add( 0.0001 , function() {
global $rabbit_channel;
if( count($rabbit_channel->callbacks) > 0 ){
$rabbit_channel->wait();
}
});
};
Worker::runAll();
感谢分享
大佬,有没有准备集成到webman框架中?
webman插件有rabbitmq的消费者插件:
https://www.workerman.net/plugin/67
插件包含生产者、消费者;同步、异步;延迟队列、普通队列;
看见了的,在研究的,已在生产环境使用了?
用了很久了
@chaz6chez,安装测试了一下,发现每创建一个队列,默认创建了一个交换机,请教一下同一类型为何不使用同一个交换机呢?对于已经创建好的队列,如何指定交换机和队列名称呢。还有就是消息消费失败重试如何操作比较好呢?
@doit FastBuilder是一个实现点对点消费模式的消费队列Builder,在这个模型里就是“N个生产者对应一个业务对应一个交换机对应一个队列对应N个消费者”;简单讲就是一个业务对应一条队列。这样的好处是所有业务是隔离的,业务对于rabbitMQ是透明的,它仅仅只做队列的分发而已。
FastBuilder默认以ClassName为名,也有默认的消费失败重试方式,通常来说有两种重试机制,队头阻塞重试,回到队尾重试;
如果要实现其他消费模式的话,可以继承Builder,自行实现即可
666,感谢分享,赞一个,有机会试用一下
另外,对于其他方面,说一下个人愚见(跟楼主这个没有太大关系)
很多时候,当遇到的确需要使用MQ的情况,中小型公司,真的不会花额外的成本去用专业的MQ,最多用redis
我知道,可能很多人会说,这样很操蛋,但事实上,有很多公司,就是这样,成本能低就低
但redis目前有一个很大的问题,目前的webman队列,是使用的普通的list结构,这个是没有ack机制的
这对于某些对消息可靠性有比较高的要求的情况,就很尴尬
我们目前使用webman的redis队列插件,都是用在对消息可靠性要求不是特别高的场景,比如通知等
我有个想法,基于redis stream 写一套类似的消息队列插件
这样就可以真正满足实际需求了
有时间一定写一个出来,相信这个可能会更加实用
大佬,require_once ("./Lib_global.php");这个是什么文件,用到了吗,能发一下吗?
一些预定义的变量,设置PHP运行环境等
<?php
define('START_TIME', microtime(true));
define('START_MEM', memory_get_usage());
define('DS', DIRECTORY_SEPARATOR);
//defined('ROOT_PATH') or define('ROOT_PATH', dirname(realpath(APP_PATH)) . DS);
defined('ROOT_PATH') or define('ROOT_PATH', dirname($_SERVER['SCRIPT_FILENAME']) . DS);
defined('LOG_PATH') or define('LOG_PATH', ROOT_PATH . 'Log' . DS);
// 环境常量
define('IS_CLI', PHP_SAPI == 'cli' ? true : false);
define('IS_WIN', strpos(PHP_OS, 'WIN') !== false);
$config = array();
$global = array();
// do NOT run this script through a web browser
if (!isset($_SERVER['argv'][0]) || isset($_SERVER['REQUEST_METHOD']) || isset($_SERVER['REMOTE_ADDR'])) {
die('<br><strong>This script is only meant to run at the command line.</strong>');
}
/ let PHP run just as long as it has to /
ini_set('max_execution_time', '0');
//这个必须有
ini_set('memory_limit','320M');
//error_reporting('E_ALL');
/ this should be auto-detected, set it manually if needed /
$config["server_os"] = (strstr(PHP_OS, "WIN")) ? "win32" : "unix";
$config['root_path'] = ROOT_PATH;
$config['logPath'] = LOG_PATH;
chdir($config['root_path']);
谢谢啦,测了一下没这些也能跑,哈哈
妙
分享一个进一步优化的方法,不使用Timer驱动,使用Swoole协程方式
分享一下我写的rabbitmq客户端类库,支持定时与服务器握手,支持发生错误时进行生重连
<?php
/**
*/
//composer require php-amqplib/php-amqplib
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\AMQPReader;
class rabbitmq_client {
public $connection = "";
public $channel = "";
public $is_connected = false ;
public $exchange_name = "" ;
public $queen_name = "";
public $comsume_callback = null;
public $config_option = array(
'host' => "127.0.0.1",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'exchange_name' => 'default_exchange_name',
'queen_name' => 'default_queen_name',
);
}
生产者:
chdir(dirname($_SERVER['SCRIPT_FILENAME']));
include_once __DIR__ . '/vendor/autoload.php';
include_once("./Lib_global.php");
include_once("./Lib_functions_rabbitmq.php");
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
use Swoole\Coroutine;
$worker = new Worker();
//开启进程数量
$worker->count = 2;
$processName = "test_mq_pub";
$worker->name = $processName;
$date_ymd = date("Y-m-d");
Worker::$pidFile = ROOT_PATH."var/{$processName}.pid";
Worker::$logFile = ROOT_PATH."var/{$processName}_logFile.log";
Worker::$stdoutFile = ROOT_PATH."var/{$processName}_stdout.log";
Worker::$eventLoopClass = 'Workerman\Events\Swoole';
$redis = "";
$is_debug = false; //全局配置,是否开启调试模式
$rabbitmq_client = "";
function app_log($log){
global $workerId;
//将日志信息发送给日志服务器
$ts = round(microtime(true) - time() , 6);
@list($ts1 , $ts2) = @explode("." , $ts);
}
$worker->onWorkerStart = function() {
global $redis, $worker, $workerId , $rabbitmq_client;
};
Worker::runAll();
消费者:
chdir(dirname($_SERVER['SCRIPT_FILENAME']));
include_once __DIR__ . '/vendor/autoload.php';
include_once("./Lib_global.php");
include_once("./Lib_functions_rabbitmq.php");
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
use Swoole\Coroutine;
$worker = new Worker();
//开启进程数量
$worker->count = 2;
$processName = "test_mq_sub";
$worker->name = $processName;
$date_ymd = date("Y-m-d");
Worker::$pidFile = ROOT_PATH . "var/{$processName}.pid";
Worker::$logFile = ROOT_PATH . "var/{$processName}_logFile.log";
Worker::$stdoutFile = ROOT_PATH . "var/{$processName}_stdout.log";
Worker::$eventLoopClass = 'Workerman\Events\Swoole';
$redis = "";
$is_debug = false; //全局配置,是否开启调试模式
$rabbitmq_client = "";
function app_log($log)
{
global $workerId;
//将日志信息发送给日志服务器
$ts = round(microtime(true) - time(), 6);
@list($ts1, $ts2) = @explode(".", $ts);
}
$worker->onWorkerStart = function () {
global $redis, $worker, $workerId, $rabbitmq_client;
};
Worker::runAll();
在使用时发现或多或少有一些问题(最大的问题是CPU抢占问题,导致workerman内的基于定时任务长时间得不到执行),毕竟官方的内容都是同步机制的,我在想有没有可能使用异步实现。
经过长时间的研究,终于解决了这个问题,个人认为比官方基于bunny+React的方式更好使用一些。
https://www.workerman.net/a/1485