请问如何做到接收http请求,推送mqtt消息?

vipbressanon

我简化了下代码,大致如下:


define('MAX_REQUEST', 5000);

$worker = new Worker('http://0.0.0.0:端口');

$worker->onWorkerStart = function($worker)
{
    global $mqtt;

    $mqtt = new Workerman\Mqtt\Client($url, $options);
    $mqtt->onConnect = function($mqtt) {

    };
    $mqtt->connect();
};

$worker->onMessage = function(TcpConnection $connection, $args)
{
    global $mqtt;
    // 已经处理请求数
    static $request_count = 0;

    $data = $args->post();

    // 此处报错
    $mqtt->publish($topic, json_encode($data));

    $connection->send('Status Code: 200 OK');

    // 如果请求数达到5000
    if(++$request_count >= MAX_REQUEST)
    {
        $mqtt->close();
        Worker::stopAll();
    }
};

Worker::runAll();

业务大概是:需要一个http服务端用来接收http的请求,拿到数据经过处理以后,在用mqtt进行推送。服务端启动时不会报错,但是当进程达到设定好的5000次执行关闭并重新开启的时候,由于mqtt还没连接上,就收到http请求并且执行mqtt->publish()了,导致mqtt报错,No connection to broker,请问大佬,有什么方法可以避免这个问题吗?

1393 1 2
1个回答

walkor 打赏

收到5000请求就退出这个逻辑感觉没什么必要。

如果想保留现有5000请求退出逻辑,就要做一个内存缓存,当mqtt没连接成功时,收到的请求放到内存缓存起来。当mqtt连接上后将缓存的数据发给mqtt。

伪代码类似

define('MAX_REQUEST', 5000);

$worker = new Worker('http://0.0.0.0:端口');

$queue = [];
$mqttConnected = false;

$worker->onWorkerStart = function($worker)
{
    global $mqtt;

    $mqtt = new Workerman\Mqtt\Client($url, $options);
    $mqtt->onConnect = function($mqtt) {
        global $mqttConnected, $queue;
        $mqttConnected = true;
        foreach ($queue as $topic => $data) {
            $mqtt->publish($topic, $data);
        }
        $queue = [];
    };
    $mqtt->connect();
};

$worker->onMessage = function(TcpConnection $connection, $args)
{
    global $mqtt, $mqttConnected, $queue;
    // 已经处理请求数
    static $request_count = 0;

    $data = $args->post();

    // 此处报错
    if ($mqttConnected) {
        $mqtt->publish($topic, json_encode($data));
    } else {
        $queue[$topic][] = json_encode($data);
    }

    $connection->send('Status Code: 200 OK');

    // 如果请求数达到5000
    if(++$request_count >= MAX_REQUEST)
    {
        $mqtt->close();
        Worker::stopAll();
    }
};

Worker::runAll();
  • 小W 2023-01-30

    我觉得$mqttConnected === false时,将 $connection->send放到$mqtt->onConnect 或者onError/onClose里,new Workerman\Mqtt\Client加上超时时间。这样基本上可以做到http响应和mqtt服务响应同步。

  • 小W 2023-01-30

    $connection->send内容也是根据mqtt状态不同而不同

  • vipbressanon 2023-04-24

    大佬,我后来把5000请求去掉了,最近项目运行时遇到个问题,还是mqtt的,debug如下:
    -> Send PINGREQ package
    -- Error: Connection closed4
    -- Connection closed
    -- Reconnect after 1 seconds
    -- Error: No connection to broker5
    -- Tcp connection established
    是心跳没收到回复然后把链接关掉了?但是重连以后,又报错没连上broker,漏发消息了,还请大佬帮忙分析解决下,感谢

  • walkor 2023-04-24

    可能是服务端没回应心跳连接断开了。publish有失败回调,失败的消息放到全局数组里,等$mqtt->onConnect触发时再依次发送。

  • vipbressanon 2023-04-25

    感谢大佬~

年代过于久远,无法发表回答
×
🔝