workman-json-rpc调用服务时好时坏,异常recvData empty,无报错!急!!!

花儿

用workman-json-rpc做了一个微信发送大量模板消息的程序。客户端代码如下

            $address_array = array(
                'tcp://127.0.0.1:2015'
            );
            // 配置服务端列表
            RpcClient::config($address_array);
            $Weixin = RpcClient::instance('Weixin');

            $Common = new Common;
            //获取公众号消息模板ID
            $templateid = $Common->configsFromName("wxmbtzid");
            //获取是先发现模板消息还是先发送客服消息
            $usekfmessage = $Common->configsFromName("usekfmessage");

            $config = get_addon_config('third');
            $wxconfig = $config["wechat"];
            $appid = $wxconfig["app_id"];

            $access_token = getAccessToken();
            //发送消息
            foreach ($param as $k => $value){
                $Weixin->asend_sendMsg($usekfmessage,$templateid,$access_token,$value);
            }

            //接受消息,断开连接
            foreach ($param as $k => $value){
                $data[$k] = $Weixin->arecv_sendMsg($usekfmessage,$templateid,$access_token,$value);
            }

我做了发现WORKERMAN执行经常报错recvData empty。

然后在服务端查看状态

发现有一条连接断开了。这种需要怎么处理,我现在才发送了4条连接,就断开了一条。我需要的是一次性要发送1000条以上才行。请问大家有没有什么好的办法

2725 3 0
3个回答

walkor 打赏

recvData empty 说明业务执行太慢了,超时了。
如果你的业务很慢,不建议用JsonRpc,用消息队列更好一些。

  • 花儿 2020-11-06

    我的业务大概是1-2秒执行1条吧。用消息队列的话,就只能用定时任务?

  • 花儿 2020-11-06

    但是我需要一次性就发送1000条消息啊。用消息队列做不到吧

  • walkor 2020-11-06

    可以做到

  • 花儿 2020-11-06

    @1:workerman\mqtt是这个吗

  • 花儿 2020-11-06

    @1:话说客服端怎么用啊。只有一个服务端代码,没有客服端代码。感觉不会用

  • 花儿 2020-11-09

    @1:请问一下呢。如果我要一次性的发送1W条的话。还需要用什么方法呀。

  • walkor 2020-11-09

    和1000条用法没区别。
    不过最好分成100条一个任务,1万个分成100个任务。

  • 花儿 2020-11-09

    @1:我的意思是说呢,我需要发1W条,但是用发1000条的时间就能发出去1W条。我想了2个办法,1个是// 启动多少服务进程
    //$worker->count = 10;第一个办法就是发布多条订阅,然后您觉得哪一种办法可行性高一点呢。

walkor 打赏

有时候一些项目运行在apache或者php-fpm环境,无法使用workerman/redis-queue项目,可以参考如下函数实现发送

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = 'redis-queue-waiting';
    $queue_delay = 'redis-queue-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => 0,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

其中,参数$redis为redis实例。例如redis扩展用法类似如下:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
  • 暂无评论
花儿

这个是我原来的一个订阅的。
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
$client->subscribe('sendmsg', function($data){
$Weixin = new Weixin();
$starttime = time();
foreach ($data as $k => $value){
$result = $Weixin->sendMsg($value);
var_dump($result);
}
$endtime = time();
echo $endtime-$starttime;
});
};
按照上面的说法,我需要发布10条订阅吗?,代码如下
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
$client->subscribe('sendmsg', function($data){
$Weixin = new Weixin();
$starttime = time();
foreach ($data as $k => $value){
$result = $Weixin->sendMsg($value);
var_dump($result);
}
$endtime = time();
echo $endtime-$starttime;
});

$client->subscribe('sendmsg2', function($data){
    $Weixin = new Weixin();
    $starttime = time();
    foreach ($data as $k => $value){
        $result = $Weixin->sendMsg($value);
        var_dump($result);
    }
    $endtime = time();
    echo $endtime-$starttime;
});

};

  • walkor 2020-11-09

    订阅不用改动。我的意思是向队列send的数据,如果原本是send 1次,每次data里是1万条,改成send 100次,每次data里100条。这样可以利用多进程快速消费这些信息。
    设置下消费者 $worker->count = 10;,这样开启多个消费进程并行消费,比你在一个进程里消费1万条快很多。

  • 花儿 2020-11-09

    好的。明白了

年代过于久远,无法发表回答
×
🔝