给rabbitmq发送数据,前台运行完全没问题,但是一旦 -d 到后台就不能发送出去了。mqtt不受影响,都正常

dorke

问题描述

给rabbitmq发送数据,前台运行完全没问题,但是一旦 -d 到后台就不能发送出去了。

程序代码或配置

<?php

use \Workerman\Worker;
use \Workerman\Mqtt\Client;
use \Workerman\Connection\AsyncTcpConnection;

require_once __DIR__ . '/../../vendor/autoload.php';
require_once __DIR__ . '/../common/common.php';
// require_once __DIR__ . '/../common/thinkorm.php';
// use Applications\App\WeightedCalGPS;
// use Applications\App\WiFiLocationEstimator;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
global $exchangeName;
global $my_routing_key;
//创建连接和channel

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'tonghao', 'th@123.456','tonghao');
$channel = $connection->channel();
$queueName = 'StaffPositioning.DT';
$my_routing_key = 'StaffPositioning.PYDW';
//创建交换机
$exchangeName = 'StaffPositioning_Broadcast';
$channel->exchange_declare($exchangeName, 'topic',false,true,false);
//消息队列
$channel->queue_bind($queueName, $exchangeName, $my_routing_key);

//worker数据
$worker = new Worker();
$worker->count = 1;

$worker->onWorkerStart = function ($worker)  use ($channel) {

    $mqtt_options = ['username' => 'admin', 'password' => 'public', 'debug' => false];
    $mqtt = new Workerman\Mqtt\Client('127.0.01:1883', $mqtt_options);    
    $mqtt->onConnect = function () use ($mqtt) {
        var_dump("work_mqtt ..........");
        $mqtt->subscribe('/test/+');
    };
    // 消息 定位工牌1-6是人员定位,7-10是卡车定位
    $mqtt->onMessage = function ($topic, $message)  use ($channel) {
        //echo $message;
        global $exchangeName;
        global $my_routing_key;        
        var_dump($message);
        var_dump('==========dddddddd=============');
        $data = json_decode($message,true);
        print_r($data);
        echo date("Y-m-d H:i:s ");
        $event = $data['event'];
        if($event == 'status'){
            $dev_id = $data['deviceId'];
            $type = $data['data']['type'];
            $time = $data['time'];
            $longitude = $latitude = '';
            $owner = isset($data['data']['owner'])?$data['data']['owner']:'';
            $height = isset($data['data']['height'])?$data['data']['height']:'';
            if(($owner == 'person') && ($type == 'gps') && ($data['data']['longitude']!='NULL') && ($data['data']['latitude']!='NULL')){
                $longitude = $data['data']['longitude'];
                $latitude = $data['data']['latitude'];
                //echo $longitude.'===='.$latitude;
            }elseif($type == 'lbs'){
                /*$ble_arr = [];
                $lbs_arr = $data['data']['ble'];
                if($lbs_arr){
                    foreach ($lbs_arr as $k=>$v){
                        $ble_arr[$v['id']]=$v;
                    }
                }
                //检查周围ble数据
                $ble_local = ib_lon_lat($ble_arr);
                if($ble_local){
                    //信号强度
                    $local = wifi_rssi($ble_local);
                    $longitude = $local['Longitude'];
                    $latitude = $local['Latitude'];
                }*/
            }else{
                return false;
            }
            // 创建消息////////////////////////
            if(($latitude && $longitude)&&($owner == 'person')){
                $messageData = [
                    'cardID'=>$dev_id,
                    'x'=>['latitude'=>$latitude,'longitude'=>$longitude,'height'=>$height,'update_time'=>$time]
                ];
                print_r($messageData);
                $message = new AMQPMessage(json_encode($messageData));
                // 发送消息到队列
                $channel->basic_publish($message, $exchangeName, $my_routing_key);
                echo "消息已发送到队列 :".$exchangeName;
            }
            /////////////////////////////////////////end
        }
    };
    // 关闭
    $mqtt->onClose = function () use ($mqtt) {
        var_dump("mqtt close");
        sleep(1);
        $mqtt->connect();
    };
    $mqtt->connect();

};
// 如果不是在根目录启动,则运行runAll方法
if (!defined('GLOBAL_START')) {
    Worker::runAll();
}

重现问题的步骤

1、mq上建立相应的路由和 queue

2、start 运行没有问题,mq可以收到数据

3、start -d mq就完全没有数据了

操作系统环境及workerman/webman等具体版本

centos7.9

Workerman[test_rabbitmq.php] start in DAEMON mode
------------ WORKERMAN --------------------
Workerman version:4.1.11          PHP version:7.4.33           
----------- WORKERS --------------
proto   user            worker          listen          processes    status           
tcp     root            none            none            1             [OK]            
-------------------------------------
Input "php test_rabbitmq.php stop" to stop. Start success.
532 1 0
1个回答

walkor 打赏

文档有说,不能在脚本里直接初始化连接类的资源,包括数据库、redis等。
所以

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'tonghao', 'th@123.456','tonghao');

应该放在onWorkerStart里初始化

  • dorke 2023-11-19

    收到,一针见血,测试没有问题,确实这个问题,文档还是看的少

年代过于久远,无法发表回答
×
🔝