消息转发设计,如何尽最大可能支持高并发?

plantation

问题描述

使用GatewayWorker同时做客户端和服务端。

做客户端使用异步请求一个ws连接,不断接受消息,将其发送到Channel通道中。

做服务端订阅Channel通道,将消息转发给普通用户连接。

我的问题不是如何实现,而是如何在服务器配置有限的情况下,尽最大可能提升并发量。

服务器配置是4核4G高性能云服务器,目前不使用阶梯式递增压测,最高并发可维持在1w左右。

超过1w,就会exit_process进程,杀掉一些连接。

我想知道的是,请问大家还有什么方案来实现吗?
我目前还没有加上数据处理部分,我想加上数据处理部分并发又会下降不少。
需要实时转发的数据处理部分是否可以通过task来实现?
使用协程可以来处理不是需要实时转发的数据,比如把接收到的数据存储到数据库这些阻塞操作?

为此你搜索到了哪些方案及不适用的原因

开启4个Gateway进程,12个Business进程,1个Channel进程,1个Register进程。

目前有四个方案:
第一个是单一订阅频道&直接sendToAll消息。
第二个是在onConnect中加入一个连接分组策略,多个订阅频道&sendToGroup消息。
第三个是在onWorkerStart加入一个进程分组策略,即异步连接只产生在一个进程A中,通过心跳来判断进程A是否一直正常处理,不正常时打开第二个进程B来处理消息,如此反复。其他同第二个方案一样。
第四个是把客户端和服务端进行分布式部署。异步请求以及发送消息给通道的操作 与 用户连接订阅通道消息并发送消息给用户 完全分开部署。

第一个方案代码如下:

public static function onWorkerStart($businessWorker)
{
    // 异步建立连接
    if ($businessWorker->id === 0) {
        // 连接通道
        Channel\Client::connect('0.0.0.0', 2206);
        // 创建异步连接
        $connection = new AsyncTcpConnection('ws://101.82.23.49:9765');
        // 设置ssl
        $connection->transport = 'ssl';
        // 设置成功连接时
        $connection->onConnect = function ($con) {
            // 发送消息
            $con->send('{"op": "subscribe","args":[]}');
        };
        // 设置成功接收到消息时
        $connection->onMessage = function ($connection, $data)  {
            // 转发消息给通道接受
            Channel\Client::publish('data_channel', $data);
        };
        // 执行异步连接
        $connection->connect()
    }
    // 订阅心跳信号
    Channel\Client::on('data_channel', function($data) {
        Gateway::sendToAll($data);
    });
}

第二个方案代码如下:

use \Workerman\Redis\Connection as RedisConnection;
public static function onConnect($client_id)
{
    $redis = new RedisConnection('localhost', 6379);
    $group_ans = $redis->get('group_ans') ?: 0;
    $increment = $redis->get('increment') ?: 1;

    Label:
    if ($group_ans > self::MAX_GROUP - 1) {
        // group
        $group_ans = 0;
        // 增量
        $increment++;
        $redis->set('increment', $increment);
    }
    if (Gateway::getClientCountByGroup('group_' . $group_ans) > (int)(self::MIN_USERS * $increment)) {
        // group
        $group_ans++;
        $redis->set('group_ans', $group_ans);
        // callback
        goto Label;
    }
    Gateway::joinGroup($client_id, 'group_' . $group_ans);
}

第三个方案代码如下:

use \Workerman\Redis\Connection as RedisConnection;
public static function onWorkerStart($businessWorker)
{
    // 连接Redis
    self::$redis = new Redis();
    self::$redis->connect('localhost', 6379);

    // 连接通道
    Channel\Client::connect('0.0.0.0', 2206);

    // 如果是0号进程,作为主进程
    if ($businessWorker->id === 0) {
        // 获取数据的代码...
        self::getData($businessWorker);
    }

    // 如果是备份进程
    if ($businessWorker->id >= 1 && $businessWorker->id <= self::MAX_GROUP - 1) {
        // 设置
        self::$last_heartbeat_time[$businessWorker->id] = time();
        // 订阅心跳信号
        Channel\Client::on('heart_channel', function($data) use ($businessWorker) {
            self::$last_heartbeat_time[$businessWorker->id] = time();
        });
        // 定期检查心跳信号
        Timer::add(3, function() use ($businessWorker) {
            // 如果1秒钟没有收到上一个进程的心跳信号,并且成功获取了锁
            if (time() - self::$last_heartbeat_time[$businessWorker->id] > 1 && self::$redis->set('lock', 1, ['nx', 'ex' => 3])) {
                // 开始获取数据的代码...
                self::getData($businessWorker);
            }
        });
    }

    // 订阅通道数据
    Channel\Client::on('data_channel', function($data) use ($businessWorker) {
        // 发送
        Gateway::sendToGroup('group_' . $businessWorker->id, $data);
    });
}
1041 2 1
2个回答

zh7314

需要更改系统策略,比如超过一定用户链接数之后,使用读取缓存的方式获取消息,例如:使用redis缓存一下积累的消息,使用http方式获取一下,当然还有其他方式,如果只是单纯的去转发消息,很容易引发消息风暴,导致带宽跑满

  • 暂无评论
liziyu

有这么多热心大佬,WK一定会越来越好,用的都有信心。

  • 暂无评论
年代过于久远,无法发表回答
×
🔝