workerman实现tcp客户端中间件,该用什么样的方法提供服务?

xiaopi

问题描述

现有第三方服务,实现了TCP服务端,称为服务A。基本用法就是通过创建TCP客户端连接服务A,对服务A发送各种指令,获得响应,以及订阅服务A的各种事件,服务A会主动推送事件通知过来。
我想用wokerman的做一个中间层,负责与服务A通信,并以wokerman为核心提供一些服务,共内部的其他程序使用。流程为本地其他项目————调用workerman中间层————调用服务A,其中wokerman中间层负责将服务A的响应结果返回给本地其他项目

我想知道wokerman中间层提供何种服务才能简便供本地其他项目调用?
已知与服务A进行TCP交互,TCP服务端会响应的数据格式为两种,一种是同步命令型,一种是异步订阅型
同步命令型:指的是TCP客户端发出命令指令,TCP服务端会阻塞的返回指令的执行结果,与http协议一样 请求/响应
异步订阅型:指的是TCP客户端发出订阅指令,TCP服务端会立即返回订阅结果,并定时向TCP客户端发送订阅的数据,直到客户端收集了足够的数据,主动取消订阅或者断开连接为止。

初步实现wokerman中间层的想法是:
1.中间层搭建一个http服务端,用来接收本地其他项目的url请求(指令)
2.中间层接收到url发送过来的指令后,判断是否为同步命令型指令,如果是,则建立TCP客户端与服务A交互,并同步等待服务A的响应结果,并将响应结果作为response响应给http客户端;如果是异步订阅型指令,则要求http客户端发送的参数中必须包含回调url,以便中间件接收到服务A订阅数据后,通过该回调响应给http客户端。
3.为了解决workerman中间层服务A交互时的TCP边界问题,使用workerman的特性,定制了与服务A的通讯协议(封装了解包/发包协议)定制协议,中间件与服务A交互的方式如下:

$con = new \Workerman\Connection\AsyncTcpConnection('TestNL://14.103.39.10:3315');
    $con->onConnect = function ($con) {
        // 发送验证
        $con->send("xxxx");
    };
    $con->onMessage = function ($con, $data) {
        // 获取解包/合包后的服务A响应的数据
        var_data($data)
    };

那么问题就出现了,我使用workerman的定制协议,必须要用AsyncTcpConnection建立TCP客户端,而且我又想提供http服务端及时的把数据响应给http客户端,这在workerman或者webman中根本不允许的啊,AsyncTcpConnection是异步的,我不能阻塞http的worker进程用来等待AsyncTcpConnection的message的响应啊。请问应该如何解决这种情况呢?

为此你搜索到了哪些方案及不适用的原因

这种需求应该很常见,请问大佬们如何实现呢?

502 3 4
3个回答

walkor 打赏

问题描述得很详细,非常赞,一看就有回答的欲望。
workerman做这样的业务非常擅长,以下是示例代码

安装 workerman/http-client

为了异步执行http回调,要装下 workerman/http-client

composer require workerman/http-client

服务端代码

以下是服务端代码示例 start.php

<?php

use Workerman\Connection\TcpConnection;
use Workerman\Http\Client;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('http://0.0.0.0:2345');

// 保存订阅主题->回调url的数组
global $subjects;
$subjects = [];

// 进程启动时建立一个到A服务的连接
$worker->onWorkerStart = function() {
    $con = new AsyncTcpConnection('ws://echo.websocket.org:443');
    $con->transport = 'ssl'; // tcp协议不需要设置
    $con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
        $con->send('hello');
    };
    // 收到A服务的订阅数据后转发给订阅者
    $con->onMessage = function(AsyncTcpConnection $con, $subject) {
        global $subjects;
        foreach ($subjects[$subject] ?? [] as $callbackUrl) {
            $httpClient = new Client();
            $httpClient->post($callbackUrl, ['subject' => $subject]);
            echo "callback $callbackUrl\n";
        }
    };
    // 为了模拟A主动推送订阅消息,这里定时向A服务发送数据,A会返回hello
    \Workerman\Timer::add(10, function () use ($con) {
        $con->send('hello');
    });
    $con->connect();
};

$worker->onMessage = function (TcpConnection $httpConnection, Request $request) {
    $callbackUrl = $request->get('callback');
    // 订阅主题
    $subject = $request->get('subject');
    // 没有回调url或者订阅主题认为是同步指令
    if (!$callbackUrl || !$subject) {
        $con = new AsyncTcpConnection('ws://echo.websocket.org:443');
        $con->transport = 'ssl'; // tcp协议不需要设置
        $con->onWebSocketConnect = function (AsyncTcpConnection $con) {
            $con->send('hello');
        };
        $con->onMessage = function (AsyncTcpConnection $con, $data) use ($httpConnection) {
            $httpConnection->send($data);
            $con->close();
        };
        $con->connect();
        return;
    }
    // 记录主题和回调之间的关系
    global $subjects;
    if (!isset($subjects[$subject])) {
        $subjects[$subject] = [];
    }
    if (in_array($callbackUrl, $subjects[$subject])) {
        $httpConnection->send('already subscribed');
        return;
    }
    $subjects[$subject][] = $callbackUrl;
    $httpConnection->send('subscribed');
};

Worker::runAll();

启动

php start.php start

测试

同步请求url类似 http://127.0.0.1:2345
订阅测试url类似 http://127.0.0.1:2345/?subject=hello&callback=<url地址>

说明

为了方便测试,上面代码用的wss协议测试,你需要改成自己的协议

  • xiaopi 2024-04-28

    感谢老大,思路太棒了,在worker进程开启的时候就建立一个tcp长连接负责分发A服务的响应,然后http请求时使用短连接,查完就关闭连接。稍微改一下直接可以用了。接下来我ab压测下单进程的性能,以及是否可以多进程,或者移植到webman上。
    按我现在想的,应该可以直接开启多进程增大并发,因为每个进程开启的时候就建立tcp客户端连接,然后http客户端发送订阅指令时,无论subject分发到哪个worker进程去A服务上订阅都无所谓。

    可能要改的是就是进程开启时的定时器,定时将存储的订阅指令发送到TCP服务端中
    类似下面代码

    $worker->onWorkerStart = function() {
        $con = new AsyncTcpConnection('ws://echo.websocket.org:443');
        $con->transport = 'ssl'; // tcp协议不需要设置
        $con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
            $con->send('hello');
        };
        // 收到A服务的订阅数据后转发给订阅者
        $con->onMessage = function(AsyncTcpConnection $con, $subject) {
            global $subjects;
            foreach ($subjects[$subject] ?? [] as $callbackUrl) {
                $httpClient = new Client();
                $httpClient->post($callbackUrl, ['subject' => $subject]);
                echo "callback $callbackUrl\n";
            }
        };
        //  定时获取全局变量中待发送的订阅指令,发送后在onMessage中监听订阅响应
        \Workerman\Timer::add(10, function () use ($con) {
            global $subjects;
            foreach ($subjects as $subject) {
                $con->send('订阅指令');
            }
            if (empty($subjects)) {
                $con->send('发送心跳');
            }
        });
        $con->connect();
    };
walkor 打赏

多进程时要加一句 $worker->reusePort = true;

$worker = new Worker('http://0.0.0.0:2345');
$worker->reusePort = true;
JackDx

学习了。

  • 暂无评论
×
🔝