workerman实现客户端订阅第三方,订阅数据取自外部怎么实现,外部数据变动自动订阅而不是重启服务
代码实现已经可以正常订阅并转发给系统内部启动的gateway进程
namespace app\pushserver\controller;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
use GatewayWorker\Lib\Gateway;
use app\common\model\Currency;
class Huobisub{
private $worker;
//订阅K线数据
private $subscribed;
//订阅k线时序
private $kline_periods;
public function __construct(){
$this->worker = new Worker();
$this->worker->count = 10;
$this->worker->name = 'Huobi Websocket';
$this->worker->onWorkerStart = function($worker){
$klinesub = Currency::getKlineSub();
$this->subscribed = $klinesub['subscribed'];
$this->kline_periods = $klinesub['kline_periods'];
// ssl需要访问443端口
$con = new AsyncTcpConnection('ws://api.exmple.com:443/linear-swap-ws');
// 设置以ssl加密方式访问,使之成为wss
$con->transport = 'ssl';
$con->onConnect = function(AsyncTcpConnection $con) use ($worker) {
echo "進程啓動:".$worker->id. PHP_EOL;
foreach($this->subscribed as $topic=>$subinfo){
$data = ['sub' => $topic, 'id' => $topic];
$con->send(json_encode($data));
}
};
$con->onMessage = function(AsyncTcpConnection $con, $data) use ($worker) {
//echo "进程答复:".$worker->id. PHP_EOL;
$data = gzdecode($data);
Gateway::sendToGroup($data['ch'],$data);
};
$con->onClose = function ($con) {
// 如果连接断开,则在1秒后重连
echo 'work exited,reconnecting......' . PHP_EOL;
$con->reConnect(1);
};
$con->connect();
};
//运行所有Worker;
Worker::runAll();
}
}