GatewayWorker做物联网循环采集数据问题

cxk

问题描述:​​

我正在使用GatewayWorker开发一个物联网平台,主要功能是作为TCP服务端与DTU设备(客户端)建立连接并进行通信。现在遇到一个需求:

需求场景​​

每个设备连接后,服务端需要​​定时循环下发一组指令​​(例如MODBUS协议采集指令)。

​​下发逻辑​​:必须​​逐条发送​​,即发送一条指令 → 等待设备返回响应 → 再发送下一条,直到当前设备的所有指令发送完成 → 等待下一次循环开始。

​​循环要求​​:整个过程需要周期性重复(例如每5分钟一轮)。

当前顾虑​​

如果直接用GatewayWorker的定时器(如Timer::add()),每一轮采集需要等待N次指令交互(发送+响应),​​耗时较长​​(可能几秒到十几秒)。

​​担心阻塞问题​​:
如果​​新设备尝试连接​​,是否会因为Worker进程被占用而无法及时处理?
高并发场景下(例如100个设备),这种串行化设计是否会导致性能瓶颈?

希望有经验的朋友能分享优化思路,感谢!

109 1 3
1个回答

xiaobai

你设备上线后,绑定了uid以后,就是谁发谁的,接受的时候直接处理就行了吧

  • cxk 1天前

    这是谷歌给的回答,应该是完美回答了,给后来的朋友参考下:
    您好!您提出的这个问题非常经典,是物联网平台中常见的需求和顾虑。直接在单个定时器回调中完成一整个设备的所有指令的串行收发,确实会因为同步等待响应而阻塞当前 Worker 进程,影响该进程处理其他连接的请求和新连接。

    下面我为您详细分析并提供解决方案:

    理解 GatewayWorker 的非阻塞特性

    GatewayWorker (基于 Workerman) 的核心优势在于其事件驱动和非阻塞 I/O 模型。每个 Worker 进程通过 epoll (Linux) 或类似机制高效地处理成千上万的并发连接。关键在于,任何一个回调函数(如 onMessage, onConnect, Timer::add 的回调)都不应该执行长时间的同步阻塞操作。

    您顾虑的阻塞问题分析

    1. Worker 进程被占用​:
      • 如果您在 Timer::add 的回调中,针对一个设备,执行 "发送指令1 -> (同步等待) -> 收到响应1 -> 发送指令2 -> (同步等待) -> 收到响应2 ..." 这样的逻辑,那么在这个设备完成一轮采集的整个过程中(可能几秒到十几秒),执行该回调的那个 Worker 进程将无法处理任何其他事情,包括其他已连接设备的消息、新设备的连接请求等。
      • 后果​:如果只有一个 Worker 进程,整个服务都会卡住。如果有多个 Worker 进程,那么被占用的那个 Worker 进程会卡住,其他 Worker 进程可以继续工作,但新连接可能会被分配到卡住的 Worker 上,导致连接超时。
    2. 高并发性能瓶颈​:
      • 假设有 4 个 Worker 进程,100 个设备。如果每个设备采集耗时 10 秒。
      • 理想情况(非阻塞)​:4 个 Worker 可以并发处理这 100 个设备的指令收发和响应。
      • 阻塞情况​:如果采用上述同步等待逻辑,一个 Worker 在同一时间只能处理一个设备的完整采集流程。那么 4 个 Worker 最多同时处理 4 个设备。其他 96 个设备就需要排队等待,导致采集延迟非常大,甚至无法按时完成。

    解决方案:异步化指令收发

    核心思想是将“发送 -> 等待响应 -> 发送下一条”这个过程异步化。每一次“发送指令”后,​不等待响应​,而是记录下当前设备的状态(例如,期望收到的响应类型,下一条要发送的指令等),然后当前回调函数立即结束,让 Worker 进程可以去处理其他事件。当设备响应到达时,在 onMessage 事件中根据之前记录的状态,发送下一条指令。

    具体实现步骤 (在 Events.php 中)

    假设我们为每个连接维护一个状态,表明它当前处于指令序列的哪一步。

    <?php
    use Workerman\Worker;
    use Workerman\Timer;
    use GatewayWorker\Lib\Gateway;
    
    class Events
    {
        /**
         * 业务进程启动时触发
         * @param \Workerman\Worker $businessWorker
         */
        public static function onWorkerStart($businessWorker)
        {
            // 全局定时器,例如每5分钟触发一次,用于启动所有设备的采集周期
            Timer::add(300, function() use ($businessWorker) { // 300秒 = 5分钟
                // 获取所有在线的客户端 client_id 列表
                // 注意:如果设备非常多,一次性获取所有 client_id 可能有性能问题,
                // 可以考虑分批处理或从持久化存储中获取活跃设备列表
                $all_clients = Gateway::getAllClientSessions(); // 或者 Gateway::getAllClientInfo();
    
                foreach ($all_clients as $client_id => $session_data) {
                    // 这里可以根据 $session_data 判断设备类型或是否需要采集
                    // 例如,可以约定设备连接后在 session 中设置一个标识
                    // if (isset($session_data['needs_collection']) && $session_data['needs_collection']) {
    
                    // 为了演示,我们假设所有连接都需要采集
                    // 并且我们使用 connection 对象来存储状态 (更推荐)
                    // 或者使用 Gateway::setSession($client_id, [...]) 更新会话
    
                    $connection = $businessWorker->connections[$client_id] ?? null; // 获取连接对象
                    if ($connection) {
                        // 初始化或重置该设备的采集状态
                        $connection->current_command_index = 0; // 当前指令索引
                        $connection->is_collecting = true;      // 标记正在采集中
                        // (可选) 清除可能存在的旧的单指令超时定时器
                        if (isset($connection->command_timeout_timer_id)) {
                            Timer::del($connection->command_timeout_timer_id);
                            unset($connection->command_timeout_timer_id);
                        }
    
                        echo "Starting collection for client: {$client_id}\n";
                        self::sendNextCommand($client_id, $connection);
                    }
                    // }
                }
            });
        }
    
        /**
         * 当客户端连接时触发
         * @param string $client_id 连接id
         */
        public static function onConnect($client_id)
        {
            echo "Client connected: {$client_id}\n";
            // 可以在这里初始化设备的一些信息,例如待采集的指令列表
            // Gateway::setSession($client_id, [
            //     'device_id' => 'DTU_XYZ', // 假设设备会上报ID,或根据client_id查询
            //     'commands' => [
            //         "MODBUS_CMD_1_HEX", // 指令内容,根据您的协议定义
            //         "MODBUS_CMD_2_HEX",
            //         "MODBUS_CMD_3_HEX"
            //     ],
            //     'current_command_index' => 0,
            //     'is_collecting' => false // 初始状态为非采集中
            // ]);
    
            // 或者直接在 connection 对象上设置 (推荐,更灵活)
            $connection = Gateway::getConnectionByClientId($client_id);
            if ($connection) {
                // 假设这是从数据库或配置文件加载的指令
                $connection->commands = self::getDeviceCommands($client_id);
                $connection->current_command_index = 0;
                $connection->is_collecting = false;
                // 可以考虑连接成功后立即开始一次采集,或者等待全局定时器
                // self::sendNextCommand($client_id, $connection);
            }
        }
    
        /**
         * 获取设备的指令列表 (示例)
         * @param string $client_id
         * @return array
         */
        public static function getDeviceCommands($client_id)
        {
            // 这里应该是根据设备ID或类型从数据库/配置加载实际指令
            // 示例指令 (请替换为您的真实指令,可能是二进制数据)
            return [
                "READ_REG_1", // 指令1的字符串表示或二进制数据
                "READ_REG_2",
                "READ_REG_3",
            ];
        }
    
        /**
         * 发送下一条指令的辅助函数
         * @param string $client_id
         * @param \Workerman\Connection\TcpConnection $connection (可选,如果外部已获取)
         */
        public static function sendNextCommand($client_id, $connection = null)
        {
            if (!$connection) {
                $connection = Gateway::getConnectionByClientId($client_id);
            }
    
            if (!$connection || !isset($connection->commands) || !$connection->is_collecting) {
                echo "Client {$client_id} not found or not ready for collection.\n";
                if ($connection) $connection->is_collecting = false;
                return;
            }
    
            $commands = $connection->commands;
            $index = $connection->current_command_index;
    
            if ($index < count($commands)) {
                $command_to_send = $commands[$index];
                echo "Client {$client_id}: Sending command #{$index}: {$command_to_send}\n";
                Gateway::sendToClient($client_id, $command_to_send); // 发送指令
    
                // 记录已发送,并更新索引以备下次使用 (在收到响应后)
                // $connection->current_command_index++; // 注意:这里递增也可以放到 onMessage 中处理成功后
    
                // 为当前指令设置一个超时定时器 (例如10秒)
                // 如果10秒内没有收到响应,则认为这条指令超时
                if (isset($connection->command_timeout_timer_id)) {
                    Timer::del($connection->command_timeout_timer_id); // 清除上一个指令的超时
                }
                $connection->command_timeout_timer_id = Timer::add(10, function() use ($client_id, $connection, $index) {
                    echo "Client {$client_id}: Timeout for command #{$index}\n";
                    // 超时处理逻辑:
                    // 1. 记录错误
                    // 2. 决定是重试当前指令、跳过当前指令,还是中止本轮采集
                    // 这里我们选择中止本轮采集,等待下个大周期
                    $connection->is_collecting = false;
                    unset($connection->command_timeout_timer_id); // 清除自身
                    // 可以选择发送下一条指令 (跳过)
                    // $connection->current_command_index++;
                    // self::sendNextCommand($client_id, $connection);
                }, null, false); // false 表示只执行一次
    
            } else {
                // 所有指令已发送完成
                echo "Client {$client_id}: All commands sent for this cycle.\n";
                $connection->is_collecting = false; // 标记本轮采集结束
                $connection->current_command_index = 0; // 重置索引,为下一个大周期做准备
                if (isset($connection->command_timeout_timer_id)) {
                    Timer::del($connection->command_timeout_timer_id); // 清除可能存在的最后一条指令的超时
                    unset($connection->command_timeout_timer_id);
                }
            }
        }
    
        /**
         * 当客户端发来消息时触发
         * @param string $client_id 连接id
         * @param mixed $message 完整的消息数据(注意:需要处理粘包分包问题)
         */
        public static function onMessage($client_id, $message)
        {
            $connection = Gateway::getConnectionByClientId($client_id);
            if (!$connection || !$connection->is_collecting) {
                // 非采集期间收到的消息,或连接已不存在
                echo "Client {$client_id}: Received message while not actively collecting or connection lost: {$message}\n";
                return;
            }
    
            echo "Client {$client_id}: Received response: {$message}\n";
    
            // 1. 清除当前指令的超时定时器
            if (isset($connection->command_timeout_timer_id)) {
                Timer::del($connection->command_timeout_timer_id);
                unset($connection->command_timeout_timer_id);
            }
    
            // 2. 解析响应 $message (判断是否是期望的响应,是否成功等)
            //    这部分逻辑高度依赖于您的通信协议 (如MODBUS)
            //    例如: isValidResponse($message, $connection->commands[$connection->current_command_index])
    
            // 3. 如果响应有效且是当前指令的响应
            // (为简化,此处假设任何收到的消息都是对上一条指令的有效响应)
            $connection->current_command_index++; // 移动到下一条指令
    
            // 4. 发送下一条指令
            self::sendNextCommand($client_id, $connection);
        }
    
        /**
         * 当用户断开连接时触发
         * @param string $client_id 连接id
         */
        public static function onClose($client_id)
        {
            echo "Client closed: {$client_id}\n";
            // 清理工作,例如清除与该连接相关的定时器
            $connection = Gateway::getConnectionByClientId($client_id); // onClose 时 $connection 对象可能已被清理
                                                                      // 但我们这里是为了尝试获取其上的自定义属性
            if ($connection && isset($connection->command_timeout_timer_id)) {
                Timer::del($connection->command_timeout_timer_id);
                // unset($connection->command_timeout_timer_id); // $connection 对象会被销毁,属性自然消失
            }
            // Gateway::setSession($client_id, null); // 清理会话(如果用了 session)
        }
    }

    代码逻辑说明:

    1. onWorkerStart​:
      • 设置一个全局周期性定时器(例如每5分钟)。
      • 此定时器到期后,遍历所有连接的客户端。
      • 为每个需要采集的客户端初始化/重置采集状态(如 current_command_index = 0, is_collecting = true)。
      • 调用 sendNextCommand() 开始发送该设备的第一条指令。
    2. onConnect​:
      • 当设备连接时,可以为其加载指令列表 (如 $connection->commands) 并初始化状态。
      • 您可以选择在此处立即开始一次采集,或者等待全局定时器的调度。
    3. getDeviceCommands($client_id)​:
      • 这是一个辅助函数,用于根据设备标识获取其应答的指令列表。实际应用中,这可能来自数据库或配置文件。
    4. sendNextCommand($client_id, $connection)​:
      • 这是核心的指令发送控制函数。
      • 检查当前设备是否还有未发送的指令 ($connection->current_command_index < count($connection->commands)).
      • 如果有,则发送当前指令 (Gateway::sendToClient())。
      • 关键​:发送后,为这条指令设置一个单次执行的超时定时器 (Timer::add(..., ..., ..., false)). 如果在指定时间内(如10秒)没有收到响应,这个定时器会触发,您可以处理超时逻辑(记录错误、尝试重发、或中止本轮采集)。
      • 如果所有指令都发送完毕,则标记本轮采集结束 ($connection->is_collecting = false),并重置指令索引,等待下一个大的采集周期。
    5. onMessage($client_id, $message)​:
      • 当收到设备响应时:
        • 首先清除之前为该指令设置的超时定时器 (Timer::del())。
        • 解析响应数据,判断是否正确、完整。
        • 如果响应有效,将当前指令索引 ($connection->current_command_index) 加 1。
        • 调用 sendNextCommand() 继续发送序列中的下一条指令。
    6. onClose($client_id)​:
      • 设备断开连接时,清理可能存在的单指令超时定时器。

    优势:

    • 非阻塞​:Worker 进程在发送指令或等待响应时不会被阻塞。Timer::add 的回调和 onMessage 的回调都应该快速执行完毕。
    • 高并发​:每个 Worker 进程可以同时管理数千个连接的这种异步指令收发流程。
    • 可管理性​:通过连接对象的属性(如 $connection->current_command_index, $connection->is_collecting)来维护每个设备的状态,清晰明了。
    • 超时控制​:不仅有总的采集周期,还有对单条指令的超时控制,增强了健壮性。

    重要注意事项:

    • 粘包/分包​:TCP是流式协议,您需要确保在 onMessage 中正确处理消息的边界。通常在 GatewayWorker 的 BusinessWorker 中设置 $worker->protocol 来指定一个协议类处理。例如,MODBUS RTU over TCP 通常每条消息有明确的长度或结束符,您可以自定义协议来解决。
    • 指令格式​:示例中的指令是字符串。实际应用中,它们可能是二进制数据,需要相应处理。
    • 错误处理和重试​:当单条指令超时或响应错误时,您需要定义详细的错误处理逻辑,例如:记录日志、重试N次、跳过当前指令、或中止整个设备的当前采集周期。
    • 状态持久化​:如果 Worker 进程重启,内存中的连接状态(如 current_command_index)会丢失。如果需要更强的持久性,可以考虑在关键状态变化时(如每发送一条指令后)将状态同步到 Redis 或数据库,但这会增加复杂性和开销。对于大多数物联网采集场景,如果进程重启,等待下一个大的采集周期重新开始通常是可以接受的。
    • Gateway::getSession vs $connection 对象属性​:
      • Gateway::setSessionGateway::getSession 是通过 Gateway 进程周转的,会有一定的 IPC 开销。
      • 直接在 $connection 对象上设置自定义属性 (如 $connection->commands = ...;) 更高效,因为这些属性直接存在于 BusinessWorker 进程内存中。在 Events.php 的方法中,通常可以通过 $worker->connections[$client_id]Gateway::getConnectionByClientId($client_id) 来获取连接对象。

    这个异步化的方案能够很好地解决您提出的顾虑,并确保 GatewayWorker 平台在高并发物联网场景下的性能和响应能力。

×
🔝