项目需要实现延迟执行,发布消息后延迟 5~10 秒执行,
使用的 workerman/rabbitmq扩展,不要说哪个扩展能实现,已经用workerman/rabbitmq扩展写了很多多列了,想再换扩展有点费劲
$client = Client::factory([
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'heartbeat' => 60,
])->connect();
$channel = $client->channel();
$exchange_name = 'delayed_demo_exchange';
$queue_name = 'demo_delay_queue';
// 创建延迟交换机,交换机类型为 direct
$channel->exchangeDeclare($exchange_name, 'x-delayed-message', false, true, false, false, [
'x-delayed-type' => 'direct',
]);
// 创建延迟队列
$queueName = 'delayed_queue';
$channel->queueDeclare($queue_name, false, true, false, false);
$channel->queueBind($queue_name, $exchange_name);
// 发送消息
$message = 'Delayed Message at ' . time();
$channel->publish($message, ['delivery-mode' => 2,'x-delay' => 5000], '', $queue_name);
// 将消息发布到延迟交换机
$channel->publish($msg, $exchangeName, $queueName);
Bunny\Exception\ClientException: PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange
type in /Users/winds/Project/PHP/b_new/vendor/bunny/bunny/src/Bunny/ClientMethods.php:1280<br />
Stack trace:<br />
#0 /Users/winds/Project/PHP/b_new/vendor/bunny/bunny/src/Bunny/ClientMethods.php(1240): Bunny\AbstractClient->awaitQueueDeclareOk(1)<br />
#1 /Users/winds/Project/PHP/b_new/vendor/bunny/bunny/src/Bunny/ChannelMethods.php(111): Bunny\AbstractClient->queueDeclare(1, 'demo_delay_queu...', false, true, false, false, false, Array)<br />
#2 /Users/winds/Project/PHP/b_new/app/comm/service/pub/PubMq.php(506): Bunny\Channel->queueDeclare('demo_delay_queu...', false, true, false, false)<br />
#3 /Users/winds/Project/PHP/b_new/app/api/controller/CommonController.php(27): app\comm\service\pub\PubMq::demoPub(123456)<br />
#4 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(335): app\api\controller\CommonController->login(Object(support\Request))<br />
#5 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(358): Webman\App::Webman\{closure}(Object(support\Request))<br />
#6 /Users/winds/Project/PHP/b_new/app/middleware/GlobalMiddleware.php(15): Webman\App::Webman\{closure}(Object(support\Request))<br />
#7 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(351): app\middleware\GlobalMiddleware->process(Object(support\Request), Object(Closure))<br />
#8 /Users/winds/Project/PHP/b_new/vendor/webman/log/src/Middleware.php(96): Webman\App::Webman\{closure}(Object(support\Request))<br />
#9 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(351): Webman\Log\Middleware->process(Object(support\Request), Object(Closure))<br />
#10 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(662): Webman\App::Webman\{closure}(Object(support\Request))<br />
#11 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/App.php(156): Webman\App::findRoute(Object(Workerman\Connection\TcpConnection), '/api/v1/Common/...', 'POST/api/v1/Com...', Object(support\Request), 200)<br />
#12 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Connection/TcpConnection.php(749): Webman\App->onMessage(Object(Workerman\Connection\TcpConnection), Object(support\Request))<br />
#13 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Events/Select.php(400): Workerman\Connection\TcpConnection->baseRead(Resource id #349)<br />
#14 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(1735): Workerman\Events\Select->run()<br />
#15 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(1537): Workerman\Worker::forkOneWorkerForLinux(Object(Workerman\Worker))<br />
#16 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(1517): Workerman\Worker::forkWorkersForLinux()<br />
#17 /Users/winds/Project/PHP/b_new/vendor/workerman/workerman/src/Worker.php(585): Workerman\Worker::forkWorkers()<br />
#18 /Users/winds/Project/PHP/b_new/vendor/workerman/webman-framework/src/support/App.php(152): Workerman\Worker::runAll()<br />
#19 /Users/winds/Project/PHP/b_new/start.php(5): support\App::run()<br />
#20 {main}
系统:macos 15.2
webman版本:1.6.8
workerman/rabbitmq版本:2.0.0
延迟交换机插件启用了嘛?
启动了
消费
生产
<?php
namespace app\command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Output\OutputInterface;
use Workerman\RabbitMQ\Client;
class Delay extends Command
{
protected static $defaultName = 'delay';
protected static $defaultDescription = 'delay';
}
写了个demo 你先对照一下
直接贴您的代码运行通过,感谢大神!