做个备忘录
此机制会产生进程数量的链接
// bootstap/Mqtt.php
<?php
namespace bootstrap;
use Webman\Bootstrap;
class Mqtt implements Bootstrap
{
protected static $mqtt = null;
protected static $connected = false;
public static function start($worker)
{
$mqtt = new \Workerman\Mqtt\Client('mqtt://0.0.0.1:1883', [
'username' => 'xxx',
'password' => 'xxx',
'debug' => true
]);
$mqtt->connect();
$mqtt->onConnect = function ($mqtt) {
self::$connected = true;
if (!empty($mqtt->waitQueue)) {
foreach ($mqtt->waitQueue as $item) {
$mqtt->publish($item[0], $item[1]);
}
$mqtt->waitQueue = [];
}
};
static::$mqtt = $mqtt;
}
public static function publish($t, $m)
{
if (static::$connected === false) {
static::$mqtt->waitQueue[] = [$t, $m];
return;
}
static::$mqtt->publish($t, $m);
}
}
// config/bootstap.php
...
return [
...
bootstrap\Mqtt::class,
];
推送使用方式
use bootstrap\Mqtt;
Mqtt::publish($topic, $content);
// process/MqttClient.php
<?php
namespace process;
class MqttClient
{
protected static $mqtt = null;
protected static $connected = false;
public function onWorkerStart()
{
$mqtt = new \Workerman\Mqtt\Client('mqtt://0.0.0.1:1883', [
'username' => 'xxx',
'password' => 'xxx',
'debug' => true
]);
$mqtt->connect();
$mqtt->onConnect = function ($mqtt) {
self::$connected = true;
};
// \Workerman\Timer::add(2, function () use ($mqtt) {
// $mqtt->publish('workerman', 'hello workerman mqtt');
// });
static::$mqtt = $mqtt;
}
public function onMessage($connection, string $data)
{
if (self::$connected) {
// 通过data中的信息动态发布
$arr = json_decode($data, true);
self::$mqtt->publish($arr['topic'], $arr['content']);
}
$connection->close($data);
}
}
// config/process.php
'mqttclient' => [
'handler' => process\MqttClient::class,
'listen' => 'tcp://0.0.0.0:8789',
'count' => 1, // 进程数
],
推送使用方式
$client = stream_socket_client('tcp://0.0.0.0:8789');
$data = [
'topic' => $topic,
'content' => $content,
];
stream_socket_sendto($client, json_encode($data));
good good
+1
这个MQTT服务端要自己创建吗?
emqx docker运行就行
“利用webman启动时自动运行机制”使用这个方法,执行命令行命令出错了。
有人遇到过吗?
php webman list 不对吧?