分享一个基于workerman的rabbitmq客户端生产者、消费者(基于Event实现事件驱动)

zgh419566

之前我写过一个基于定时器+swoole的rabbitmq生产者和消费者(https://www.workerman.net/q/8688
在使用时发现或多或少有一些问题(最大的问题是CPU抢占问题,导致workerman内的基于定时任务长时间得不到执行),毕竟官方的内容都是同步机制的,我在想有没有可能使用异步实现。

经过长时间的研究,终于解决了这个问题,个人认为比官方基于bunny+React的方式更好使用一些。

Lib_calss_rabbitmq.php

<?php
/*
 * 20230316 增加 rabbitmq_publish_v3
 * 20230320 使用event事件进行驱动
 *
 */

//composer require php-amqplib/php-amqplib

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\AMQPReader;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Swoole\Coroutine;
use Workerman\Events\EventInterface;

class rabbitmq_client {
    public $connection = "";
    public $channel = "";
    public $is_connected = false ;

    public $is_debug = false;

    public $exchange_name = "" ;
    public $queen_name = "";
    public $comsume_callback = null;

    public $socket = null;
    public $config_option = array(
        'host' => "127.0.0.1",
        'port' => "5672",
        'user' => "admin",
        'password' => "admin",
        'exchange_name' => 'default_exchange_name',
        'queen_name' => 'default_queen_name',
    );

    public function __construct($option = array()){
        $this->config_option['host'] = $option['host'];
        $this->config_option['port'] = $option['port'];
        $this->config_option['user'] = $option['user'];
        $this->config_option['password'] = $option['password'];
        $this->exchange_name = $option['exchange_name'];
        $this->queen_name = $option['queen_name'];

        //初始化Rabbitmq连接
        while( $this->is_connected == false ){
            if($this->connect() == true){
                break;
            }
            $this->app_log("rabbitmq server connect failed");
            sleep(1);
        }

        //执行定时握手任务
        Timer::add( 55 , function (){
            // 发送心跳数据
            $this->write_heartbeat();
        });
    }

    function app_log($log){
        //将日志信息发送给日志服务器
        $ts = round(microtime(true) - time() , 6);
        @list($ts1 , $ts2) = @explode("." , $ts);

        $logData = "{$ts1}.{$ts2} {$log}";

        //logToFile($logData);
        if(function_exists("logToScreen") == true){
            logToScreen($logData , true);
        }else{
            echo $logData."\n";
        }
    }

    function setDebug($is_debug = true){
        $this->is_debug = $is_debug;
    }

    public function connect(){
        try{
            $this->connection = new AMQPStreamConnection(
                $this->config_option['host'] ,
                $this->config_option['port'] ,
                $this->config_option['user'] ,
                $this->config_option['password'] ,
                '/' ,
                false ,
                'AMQPLAIN' ,
                null,
                'en_US' ,
                3.0 ,
                3.0 ,
                null ,
                true ,
                60
            );

            //ZGH debug
            //$this->app_log(get_class($this->connection));   //PhpAmqpLib\Connection\AMQPStreamConnection

            //$this->app_log(get_class($this->connection->getIo()));  //PhpAmqpLib\Wire\IO\StreamIO

            //$this->app_log(print_r($this->connection->getIo()->getSocket(),true));
            $this->socket = $this->connection->getIo()->getSocket();

            //$this->app_log(print_r(debug_backtrace(),true));
            //$this->app_log(print_r(debug_print_backtrace(),true));

            if( $this->connection ->isConnected() == true){

                $this->channel = $this->connection->channel();

                //声明交换机
                $this->channel->exchange_declare( $this->exchange_name , 'topic', false, true, false);

                // 声明队列
                $this->channel->queue_declare( $this->queen_name , false, true, false, false);

                // 绑定队列
                $this->channel->queue_bind($this->queen_name , $this->exchange_name , $this->queen_name );

                $this->is_connected = true;

                if($this->is_debug == true){
                    $this->app_log("rabbitmq connected");
                }
                return true;
            }
        }catch (Exception $e) {
            $this->app_log("error catched :".$e->getMessage());
            $this->is_connected = false;
        }
        return false;
    }

    public function reconnect(){
        if( $this->is_connected == false ){
            if( $this->connect() == true ){
                //重新连接到服务器
                $this->is_connected = true;
                return true;
            }
        }
        return false;
    }

    /**
     * @return void
     *
     * 向服务器发送
     */
    function write_heartbeat(){
        if($this->is_connected == true){
            try{
                //app_log("heartbeat");
                $pkt = new AMQPWriter();
                $pkt->write_octet(8);
                $pkt->write_short(0);
                $pkt->write_long(0);
                $pkt->write_octet(0xCE);
                $this->connection->write($pkt->getvalue());
            }catch (Exception $e) {
                $this->app_log("error catched :".$e->getMessage());
                $this->is_connected = false;
                $this->reconnect();
            }
        }else{
            // false
            $this->reconnect();
        }
    }

    /**
     * @param $data
     * @param $queen_name
     * @param $is_persistent
     * @param $is_debug
     * @return void
     */
    function publish( $data = "" , $is_persistent = true , $exchange_name_input = "" , $queen_name_input = ""){
        $delivery_mod = AMQPMessage::DELIVERY_MODE_PERSISTENT;
        if($is_persistent == false){
            $delivery_mod = AMQPMessage::DELIVERY_MODE_NON_PERSISTENT;
        }

        $exchange_name = "";
        if(strlen($exchange_name_input) > 0){
            $exchange_name = $exchange_name_input;
        }else{
            $exchange_name = $this->exchange_name;
        }

        $queen_name = "";
        if(strlen($queen_name_input) > 0){
            $queen_name = $queen_name_input;
        }else{
            $queen_name = $this->exchange_name;
        }

        $rabbit_msg = new AMQPMessage($data , ['content_type'=>'text/plain','delivery_mode'=>$delivery_mod]); //定义消息
        try{
            // 发送消息
            $this->channel->basic_publish($rabbit_msg, $exchange_name, $queen_name);
        }catch (Exception $e) {
            $this->app_log("error catched :".$e->getMessage());
            $this->is_connected = false;
            $this->reconnect();
        }
    }

    //在做消费时,对流量进行控制,防止出现丢数据
    function set_comsume_qos( $prefetch_size = 0 , $prefetch_count = 1 ){
        $this->channel->basic_qos( $prefetch_size , $prefetch_count ,false);   //当有消息在处理时不要发过来
    }

    /*
    function comsume_callback($msg){
        //收到MQ消息
        $message_body = $msg->body;
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        //echo "[x] Received ", $message_body, "\n";
        //redis_add_statistic( $redis , "rabbitmq:qos_test_consumption"  , 0.1);
    }
    */

    /*
    Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
    Swoole\Coroutine::set(['enable_deadlock_check' => false]);
     * */
    function comsume_swoole_go( $queen_name_input = ""){
        $queen_name = "";
        if(strlen($queen_name_input) > 0){
            $queen_name = $queen_name_input;
        }else{
            $queen_name = $this->queen_name;
        }
        if($this->is_connected == true) {
            go(function () {
                // 消费者订阅队列
                try {
                    if( !$this->comsume_callback ){
                        $this->app_log("function comsume_callback must be set");
                        return false;
                    }
                    $this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
                } catch (Exception $e) {
                    $this->app_log("error catched :" . $e->getMessage());
                    $this->is_connected = false;
                    $this->reconnect();
                }
            });
            // 添加事件驱动,收到消息时触发
            Worker::$globalEvent->add($this->socket, EventInterface::EV_READ, array($this, 'channel_wait'));

            //需要做一次初始化
            $this->channel_wait();
        }else{
            // false
            $this->connect();
        }
    }

    function comsume( $queen_name_input = ""){
        $queen_name = "";
        if(strlen($queen_name_input) > 0){
            $queen_name = $queen_name_input;
        }else{
            $queen_name = $this->queen_name;
        }

        if($this->is_connected == true) {
            // 消费者订阅队列
            try {
                if( !$this->comsume_callback ){
                    $this->app_log("function comsume_callback must be set");
                    return false;
                }
                $this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
            } catch (Exception $e) {
                $this->app_log("error catched :" . $e->getMessage());
                $this->is_connected = false;
                $this->reconnect();
            }

            // 添加事件驱动,收到消息时触发
            Worker::$globalEvent->add($this->socket, EventInterface::EV_READ, array($this, 'channel_wait'));

            //需要做一次初始化
            $this->channel_wait();
        }else{
            // false
            $this->connect();
        }
    }

    function channel_wait(){
        // 开始消费
        try {
            /*
            while ( count($this->channel->callbacks) ) {
                $this->channel->wait();
                usleep(1);
            }
            while ( $this->channel->is_consuming() ) {
                usleep(10);
                $this->channel->wait();
            }

            //$this->app_log(get_class($this->channel));   //PhpAmqpLib\Channel\AMQPChannel

            while ( $this->channel->is_consuming() ) {
                $this->channel->wait(null , true , 0.001);
                usleep(10);
            }
            */
            if( $this->channel->is_consuming() ) {
                $this->channel->wait(null , true , 0.001);
            }
        } catch (Exception $e) {
            $this->app_log("error catched when consuming:" . $e->getMessage());
            $this->is_connected = false;
            $this->reconnect();
        }
    }
}

function rabbitmq_publish_v3( $rabbitmq_client , $data = "" , $queen_name_input = "" , $exchange_name_input = "" ){
    $exchange_name = "";
    if(strlen($exchange_name_input) > 0){
        $exchange_name = $exchange_name_input;
    }else{
        $exchange_name = $rabbitmq_client->exchange_name;
    }

    $queen_name = "";
    if(strlen($queen_name_input) > 0){
        $queen_name = $queen_name_input;
    }else{
        $queen_name = $rabbitmq_client->queen_name;
    }

    $rabbitmq_client->publish($data , true , $exchange_name , $queen_name);
}

使用方法:

<?php
    //初始化Rabbitmq连接
    $rabbitmq_config_option = array();
    $rabbitmq_config_option['host'] = RABBITMQ_SERVER_IP;
    $rabbitmq_config_option['port'] = RABBITMQ_SERVER_PORT;
    $rabbitmq_config_option['user'] = RABBITMQ_USERNAME;
    $rabbitmq_config_option['password'] = RABBITMQ_PASSWORD;
    $rabbitmq_config_option['exchange_name'] = "upstream_exchange";
    $rabbitmq_config_option['queen_name'] = "upstream_queen";
    $rabbitmq_client = new rabbitmq_client($rabbitmq_config_option);

    //生产者,可以指定队列或者交换机:
    function rabbitmq_publish_v3( $rabbitmq_client , $data = "" , $queen_name_input = "" , $exchange_name_input = "" ){
        $exchange_name = "";
      if(strlen($exchange_name_input) > 0){
        $exchange_name = $exchange_name_input;
    }else{
        $exchange_name = $rabbitmq_client->exchange_name;
    }

    $queen_name = "";
    if(strlen($queen_name_input) > 0){
        $queen_name = $queen_name_input;
    }else{
        $queen_name = $rabbitmq_client->queen_name;
    }

      $rabbitmq_client->publish($data , true , $exchange_name , $queen_name);
   }

   //作为消费者使用
   $rabbitmq_client->comsume_callback = function ($msg)use($db,$workerId){
        //收到MQ消息
        $message_body = $msg->body;
        $data_arr = json_decode($message_body , true);
        //只有格式合格才进行确认
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        echo "[{$workerId}] Received ", $message_body, "\n";
    };

    $rabbitmq_client->comsume_swoole_go();    //协程方式 需要安装swoole
    //$rabbitmq_client->comsume();        //普通方式

特别说明,如果需要使用协程方式,需要安装swoole,并且在项目启动文件前面加上:
use Swoole\Coroutine;
Worker::$eventLoopClass = 'Workerman\Events\Swoole';
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
Swoole\Coroutine::set(['enable_deadlock_check' => false]);

1723 2 11
2个评论

owenzhang

感谢分享

  • 暂无评论
dkou

感谢分享

  • 暂无评论

zgh419566

230
积分
0
获赞数
0
粉丝数
2022-04-28 加入
×
🔝