错误提示:
error package. package_length=1195725856
使用workerman与TP5.0的整合方法,分别建立了webscoket服务与Channel的服务端、http服务,启动客户端连接。
两个问题:
1、Channel客户端与服务端连接失败;
2、error package. package_length=1195725856。
原计划尝试将这个功能嵌入到tp5.0里面
https://www.kancloud.cn/walkor/workerman/315202
个人修改后代码,(刚入门菜鸟,轻喷。)
贴上原代码:
tp5.0 :文件路径1:vendor/topthink/think-worker/src Worker.php
namespace think\worker;
use Workerman\Worker;
use Channel\Client;
use Channel\Server as Channelserver;
/**
* Worker控制器扩展类
* Channel相关文件:Client.php和.Server.php,安装在tp根目录:extend/Channel下
*/
abstract class Server
{
/**
* -----------------------------------------------------------------------------------------------------------
*
*/
protected $worker;
protected $socket = '';
protected $protocol = 'http';
protected $host = '0.0.0.0';
protected $port = '2346';
protected $processes = 4; //进程个数
protected $name = "ApiWebSocket"; //进程名称
protected $daemonize = false; //表示是否以daemon(守护进程)方式运行
protected $stdoutFile = LOG_PATH."workman_putout.log"; //所有的打印输出全部保存在日志文件workman_putout.log文件中
protected $logFile = LOG_PATH."workman_start_stop.log"; //此文件记录了workerman自身相关的日志,包括启动、停止等
//Channel服务扩展
protected $channelServer = false; //是否开启Channel服务,默认false,不开启服务
protected $channelHost = '0.0.0.0'; //是否开启Channel服务,默认false,不开启服务
protected $channelPort = 2206; //是否开启Channel服务,默认false,不开启服务
/**
* 架构函数
* @access public
*/
public function __construct()
{
//是否启动Channel服务,默认不开启
if($this->channelServer){
$channel_server = new Channelserver($this->channelHost, $this->channelPort);
}
// 实例化 Websocket 服务
$this->worker = new Worker($this->socket ?: $this->protocol . '://' . $this->host . ':' . $this->port);
// 设置进程数
$this->worker->count = $this->processes;
$this->worker->name = $this->name;
// 初始化
$this->init();
// 设置回调
foreach ( as $event) {
if (method_exists($this, $event)) {
$this->worker->$event = ;
}
}
// Run worker
Worker::runAll();
}
protected function init()
{
//输出日志文件
if (!file_exists($this->stdoutFile)) { // 如果不存在则创建
file_put_contents($this->stdoutFile, '');
// 检测是否有权限操作
if (!is_writeable($this->stdoutFile)) chmod($this->stdoutFile, 0777); // 如果无权限,则修改为0777最大权限
};
//开启、关闭日志文件
if (!file_exists($this->logFile)) { // 如果不存在则创建
file_put_contents($this->logFile, '');
// 检测是否有权限操作
if (!is_writeable($this->logFile)) chmod($this->logFile, 0777); // 如果无权限,则修改为0777最大权限
};
Worker::$daemonize = $this->daemonize;
Worker::$stdoutFile = $this->stdoutFile;
Worker::$logFile = $this->logFile;
}
}
文件2:websocket 服务端 启动Channel服务端 路径:application/push/controller/Worker.php
<?php
namespace app\push\controller;
use think\worker\Server;
use Workerman\Lib\Timer;
use Channel\Client;
use Channel\Server as Channelserver;
class Worker extends Server
{
protected $socket = 'websocket://api.3-gd.com:2886';
protected $processes = 2;
protected $name = "pusher"; //进程名称
protected $channelServer = true; //是否开启Channel服务,默认false,不开启服务
protected $channelHost = 'api.3-gd.com'; //是否开启Channel服务,默认false,不开启服务
protected $channelPort = 2207; //是否开启Channel服务,默认false,不开启服务
/**
* 数据传输内容格式要求
* a、必须包含用户ID、用户信息、请求路径或请求路径、请求或传输的参数、返回数据的处理方法
* b、参数全部为json字符串,接收后再处理为数组对象或json对象
* 接收数据后,此页面仅做分发和接收处理,其余数据由对应的模块功能进行处理
*
* 返回数据内容
* "send_msg" :返回的提示主要信息
* "send_desc":返回的主要描述信息
* "type" :返回的数据处理方式,如"message":一般提示信息、"alarm":重要提示信息、"data":接口返回数据
* "fun" :建议的数据处理方法及函数
*
*/
/**
* 收到信息
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
// 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
$connection->lastMessageTime = time();
// 处理接收到的数据
$input_info = json_decode( $data, true );
// 判断接收的数据是否为合法数据
if(!$input_info||!$input_info||!$input_info||!$input_info||!$input_info){
$arr = ;
$connection->send( json_encode($arr) );
}
// 将判断后的数据交给对应的控制器去处理
else{
//选择控制器并获取操作结果
$user_id = $input_info;
$user_info = $input_info;
$user_mvc = $input_info;
$user_data = $input_info;
$user_fun = $input_info;
//将数据交给其他模型处理
$do_action = model('push/PushComm');
$res = $do_action->sendDataToMVC($user_id,$user_info,$user_mvc,$user_data);
$arr=;
//将结果返回给客户端
$connection->send( json_encode($arr) );
}
}
/**
* 当连接建立时触发的回调函数
* @param $connection
*/
public function onConnect($connection)
{
}
/**
* 当连接断开时触发的回调函数
* @param $connection
*/
public function onClose($connection)
{
}
/**
* 当客户端的连接上发生错误时触发
* @param $connection
* @param $code
* @param $msg
*/
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
/**
* 每个进程启动
* @param $worker
*/
public function onWorkerStart($worker)
{
// 心跳间隔60秒
define('HEARTBEAT_TIME', 60);
// 链接保持提醒10秒
define('ALARM_TIME', 10);
Timer::add(1, function()use($worker){
$time_now = time();
foreach($worker->connections as $connection) {
// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
//$connection->send(time());
// 链接保持提醒时间
if ($time_now - $connection->lastMessageTime == HEARTBEAT_TIME - ALARM_TIME) {
//发送命令消息,提示应该重新发送消息,保持链接
$arr = ;
$connection->send( json_encode($arr) );
continue;
}
// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
$connection->close();
}
}
});
//连接Channel服务
// Channel客户端连接到Channel服务端
Client::connect( $this->channelHost, $this->channelPort);
// 以自己的进程id为事件名称
$event_name = $worker->id;
// 订阅worker->id事件并注册事件处理函数
Client::on($event_name, function($event_data)use($worker){
$to_connection_id = $event_data;
$message = $event_data;
if(!isset($worker->connections))
{
echo "connection not exists\n";
return;
}
$to_connection = $worker->connections;
$to_connection->send($message);
});
// 订阅广播事件
$event_name = '广播';
// 收到广播事件后向当前进程内所有客户端连接发送广播数据
Client::on($event_name, function($event_data)use($worker){
$message = $event_data;
foreach($worker->connections as $connection)
{
$connection->send($message);
}
});
}
}
文件3:http 服务端 启动Channel客户端 路径:application/push/controller/Workerserver.php
<?php
namespace app\push\controller;
use think\worker\Server;
use Workerman\Lib\Timer;
use Channel\Client;
use Channel\Server as Channelserver;
/**
* 基于Worker的多进程(分布式集群)推送系统
* 可以实现服务端后台任务向用户推送数据
* 后台用户定期执行某操作等
* 实现双向通讯
* -------------------------------特别说明---------------------------------
* Channel相关文件:Client.php和.Server.php,安装在tp根目录:extend/Channel下
*
* 用来处理http请求,向任意客户端推送数据,需要传workerID和connectionID
*
* 依赖WorkMan
*/
class Workerserver extends Server
{
protected $socket = 'http://api.3-gd.com:2887';
protected $processes = 1;
protected $name = "publisher"; //进程名称
protected $channelServer = false; //是否开启Channel服务,默认false,不开启服务
protected $channelHost = 'api.3-gd.com'; //是否开启Channel服务,默认false,不开启服务
protected $channelPort = 2207; //是否开启Channel服务,默认false,不开启服务
// 用来处理http请求,向任意客户端推送数据,需要传workerID和connectionID
/**
* 每个进程启动
* @param $worker
*/
public function onWorkerStart($worker)
{
//连接Channel服务
// Channel客户端连接到Channel服务端
Client::connect( $this->channelHost, $this->channelPort);
}
/**
* 收到信息
* @param $connection
* @param $data
*/
public function onMessage($connection, $data)
{
$connection->send('ok');
if(empty($_GET)) return;
// 是向某个worker进程中某个连接推送数据
if(isset($_GET) && isset($_GET))
{
$event_name = $_GET;
$to_connection_id = $_GET;
$content = $_GET;
Client::publish($event_name, array(
'to_connection_id' => $to_connection_id,
'content' => $content
));
}
// 是全局广播数据
else
{
$event_name = '广播';
$content = $_GET;
Client::publish($event_name, array(
'content' => $content
));
}
}
/**
* 当客户端的连接上发生错误时触发
* @param $connection
* @param $code
* @param $msg
*/
public function onError($connection, $code, $msg)
{
echo "error $code $msg\n";
}
}
求各位大神指点一下,谢谢
error package. package_length=1195725856 是有程序连错端口了,客户端协议与服务端协议不一致,解析出来的包长为1195725856。
手册中demo是ok的,你的具体哪里连错了得自己看了,代码太多了。
谢谢,我再查查。