FlowerMQ 一个基于Workerman和Redis实现的消息队列,一个小小工具,用来给主项目解耦的,也支持延迟队列,失败尝试这些。
composer create-project mrtwenty/flower
git clone
composer install
xtrim maxlen mq ~ 长度
。xtrim minid mq ~ 消息id
,需要 redis server 6.2.0 以上。windows下仅限于开发,不适合做生产环境使用,启动需要开三个命令行窗口,执行 start、pending、delay命令
flower配备了一个客户端,方便在别的项目中使用:
composer install mrtwenty/flower-client
使用方式:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
//$mq需要与服务端三个配置信息相同
$mq = [
'name' => 'mq',
'delay_name' => 'mq_delay',
'fail_list' => 'mq_fail_list'
];
$client = new Client($redis, $mq);
//立即执行
$res = $client->add(['test' => 'data']);
var_dump($res);
//延迟消息
$res = $client->add(['test' => 'data'], 3);
var_dump($res);
可以安装此依赖包,当然也可以根据自己需要用别的包
composer require workerman/mysql
配置信息可以在.env里面写入:
[mysql]
host = 127.0.0.1
username = root
password = 123456
database = test
port = 3306
代码实现:
<?php
declare(strict_types=1);
namespace app;
use app\library\BaseInterface;
/**
* 消费类
*/
class Run implements BaseInterface
{
protected $db = null;
public function getDb()
{
if (is_null($this->db)) {
$config = config('mysql');
$host = $config['host'];
$port = $config['port'];
$user = $config['username'];
$password = $config['password'];
$database = $config['database'];
$this->db = new \Workerman\MySQL\Connection($host, $port, $user, $password, $database);
}
return $this->db;
}
/**
* 消费方法,如何消费,取决用户自己
*
* @param mixed $data
* @param mixed $id
* @return bool 返回true就会执行ack确认消息已消费
*/
public function consumer($data, $id): bool
{
$db = $this->getDb();
$info = $db->row("SELECT * FROM `short_url` WHERE id=3");
print_r($info);
return true;
}
/**
* 超过尝试的次数,就会写入失败队列里面,并调用此方法,可以用此方法通知运维
*
* @return void
*/
public function fail($data, $id)
{
print_r($data);
print_r($id);
}
}
由于是守护进程,为了避免php业务代码bug隐藏的内存泄露,可以在消费者执行完一定数量的时候重启进程。具体实现请查看workerman手册。
你这里的
$fail_num
,消息被读取次数永远是1app\consumer\Run的consumer 返回false, 消息变成已读取未ack,就会被xpending读取到,并重试,不行就继续xClaim 转移消息,次数就会加1
是运行环境问题,还是redis或者php版本问题?
我这里试了可以。
我直接 composer create-project mrtwenty/flower 安装
app\consumer\Run的consumer返回false,你说的那个地方,我加了调试信息
[$msg_id, $consumer, $over_time, $fail_num] = $msg;
echo "fail_num", $fail_num, "\n";
你
$fail_num
这个变量没有打印,打印的话一直是1这个变量的值是xpending得到的,请看我的评论。
echo "fail_num", $fail_num, "\n"; 我变量有打印的,你看那个截图,输出都是
fail_num1、fail_num2、fail_num3,后面的数字就是。
fail_num
这个你消息被读取的次数,你这里并没有读取(XreadGroup),就直接取(Xpending),就不会变的不写代码,用redis 命令执行下流程。
又排查了一下,你这需要6.0才支持的,而不是5.0就支持
再次验证,Redis Server 版本需要 5.0.4 及最新版本
https://github.com/redis/redis/commit/f72f4ea311d31f7ce209218a96afb97490971d39
那这样子就需要redis5.0.4以上了,谢谢你的测试,回头我写在说明里
嗯嗯!搞了两天终于搞明白为啥了,哈哈!
有一个很严重的问题,在你修剪消息队列的时候,如果消息还没消费,直接会修剪掉到没消费到的消息,现象就是某个区间的消息全部丢失,我只是看了一下代码,没验证我的想法。
他这只会修剪XPENDING 消息
不是哦,你仔细看看代码,测试一下,把GC评率调成100%,队列最大长度改短,消息数量超过队列长度,一定会丢失消息
不是, 最简单的浮现方式,你把最大队列长度改为10,GC返回true,推送1000条消息进去, 你看消费的消息有多少条,最多几十条,不确定具体多少条是因为xtrim加了true参数
你是说 stream 队列长度?
嗯, 修剪只能修剪stream ,哪有修剪XPENDING 一说?
嗯,你说的对,我应该在项目里面提醒一下使用这个项目的人,这是redis stream xtrim 本身的设计,如果想要长久的保存,可以将gc设置成永远不会执行,这样就不会执行xtrim命令了,或者设置队列长度足够长,xtrim 执行了也不会裁剪到,这取决于开发者的应用场景,我自己用的时候,都是为了能够及时触发消息,基本都是即发即消费。
其实裁剪也是有必要的,不然久了内存会爆炸,或者被redis清空,在裁剪的时候应该裁剪到所有分组消费者 最小已消费的消息位置,不应该是固定裁剪。
嗯,你这就更细腻的操作了,但是redis 好像并没有提供相关的命令,xtrim本身是先裁剪掉旧的消息。
额,我刚刚去看了下redis6.2的英文手册,好像支持新的方式,这是个好消息。
Redis version >= 6.2.0: Added the
MINID
trimming strategy and theLIMIT
option.我新增了一种gc机制,大佬帮忙看看 ,minid