指定客户端发送到客户端 出现错误 数据不能发送到指定客户端
<?php
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Protocols\Websocket;
require_once '/mnt/d/linux/Autoloader.php';
// websocket服务
$worker = new Worker ( 'websocket://0.0.0.0:8882' );
$worker->onWorkerStart = function($worker){
};
$worker->count = 1;
// 新增加一个属性,用来保存uid到connection的映射(uid是用户id或者客户端唯一标识)
$worker->uidConnections = array();
$worker->lastuid = 999999999;
$worker->onClose = function($connection)
{
global $worker;
if(isset($connection->uid))
{
// 连接断开时删除映射
unset($worker->uidConnections);
}
};
$worker->onMessage = function ($ws_connection, $message){
global $worker;
$msg = json_decode ( $message, true );
$mi_id=$msg;
if($mi_id==true){
if(!isset($ws_connection->uid))
{
// 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
$ws_connection->uid = $mi_id;
/* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
* 实现针对特定uid推送数据
*/
//echo "uid=".$ws_connection->uid."\r\n";
$worker->uidConnections = $ws_connection;
//print_r($worker->uidConnections);
//return $ws_connection->send($data);
}else{
echo "888";
}
}
//print_r($msg);
// 与远程task服务建立异步链接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
/* if(!$task_connection){
$task_connection = new AsyncTcpConnection ( 'tcp://101.201.104.58:6668' );
} */
global $task_connection ;
if(!$task_connection){
$task_connection = new AsyncTcpConnection ( 'tcp://127.0.0.1:6668' );
}
//echo "<pre>";
//print_r($task_connection);
$task_connection->websocketType = Websocket::BINARY_TYPE_ARRAYBUFFER;
if($msg=="stop"){
$md_id=$msg;
// 停止
$arr = "{"
. "\n\t\"type\":\"stop\"," .
"\n\t\"md_id\":\"$md_id\"" .
"\n}";
// 构造包体
$bin_body = $arr;
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
// 发送数据
$res = $task_connection->send ( $bin_data );
}
// 发送获取测试数据请求
if ($msg == "getmetadata") {
$Connid = $msg ;
$idx = $msg ;
$num = $msg ;
$meta_id=$msg;
//echo "meta_id="+$meta_id;
$arr = "{" . "\n\t\"type\":\"getmetadata\"," . "\n\t\"mi_id\":$Connid," . "\n\t\"idx\":$idx," . "\n\t\"meta_id\":$meta_id," . "\n\t\"n_rcrd\":$num" . "\n}";
$bin_body = $arr; // json_encode($arr);
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
//$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . pack ( "c", "0" );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
// 发送数据
$res = $task_connection->send ( $bin_data );
}
//每秒钟取一次数据
if ($msg == "sample") {
$Connid = $msg ;
$n_smpl = $msg ;
//$num = $msg ;
$meta_id=$msg;
//echo "meta_id="+$meta_id;
$arr = "{" . "\n\t\"type\":\"sample\"," . "\n\t\"mi_id\":$Connid," . "\n\t\"ml\":," . "\n\t\"n_smpl\":$n_smpl" . "\n}";
//echo "<pre>";
//print_r($arr);
$bin_body = $arr; // json_encode($arr);
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
//$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . pack ( "c", "0" );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
// 发送数据
$res = $task_connection->send ( $bin_data );
}
// 发送登陆数据
if ($msg == "setparam") {
// 任务及参数数据
$arr = "{" . "\n\t\"type\":\"setparam\"," . "\n\t\"report\":1," . "\n\t\"report_ml\":" . "\n}";
//echo $arr;
// 构造包体
$bin_body = $arr;
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body .$d;
//echo bin2hex($bin_data);
// 发送数据
$res = $task_connection->send ( $bin_data );
}
if($msg=="getmdinfo"){
// 登陆
$md_id=$msg;
//print_r($md_id);
$arr = "{" . "\n\t\"type\":\"login\"," . "\n\t\"oprt_id\":1000". "\n}";
// 构造包体
$bin_body = $arr;
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
//echo bin2hex($bin_data);
$res = $task_connection->send ( $bin_data );
$str="";
$md_id=explode(",", $md_id);
for($i=0;$i<count($md_id);$i++){
if($i==count($md_id)-1){
$str.="\"".$md_id."\"";
}else{
$str.="\"".$md_id."\",";
}
}
//echo $str;
//获取设备状态
$arr = "{" . "\n\t\"type\":\"getmdinfo\"," . "\n\t\"md_id\":". "\n}";
//echo $arr;
// 构造包体
$bin_body = $arr;
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
// 发送数据
$res = $task_connection->send ( $bin_data );
}
/**
*获取统计值
*/
if($msg=="getmiinfo"){
$mi_id=$msg;
$ml=$msg;
$start_begin=$msg;
$start_num=$msg;
$rng_min=$msg;
$rng_max=$msg;
if($start_begin&&$rng_max){
$arr = "{" .
"\n\t\"type\":\"getmiinfo\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"stat_begin\":".$start_begin."," .
"\n\t\"stat_num\":".$start_num."," .
"\n\t\"rng_min\":".$rng_min."," .
"\n\t\"rng_max\":".$rng_max."," .
"\n\t\"ml\":"
. "\n}";
//echo $arr;
}else{
if($rng_max&&$rng_min){
$arr = "{" .
"\n\t\"type\":\"getmiinfo\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"rng_min\":".$rng_min."," .
"\n\t\"rng_max\":".$rng_max."," .
"\n\t\"ml\":"
. "\n}";
//echo $arr;
}else if($start_begin&&$start_num){
$arr = "{" .
"\n\t\"type\":\"getmiinfo\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"stat_begin\":".$start_begin."," .
"\n\t\"stat_num\":".$start_num."," .
"\n\t\"ml\":"
. "\n}";
}else{
$arr = "{" .
"\n\t\"type\":\"getmiinfo\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"ml\":"
. "\n}";
}
//echo $arr;
}
//echo $arr."\r\n";
// 构造包体
$bin_body = $arr;
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
//echo bin2hex($bin_data);
$res = $task_connection->send ( $bin_data );
}
if($msg=="setmetadata"){
$idx=$msg;
$value=$msg;
$meta_id=$msg;
$mi_id=$msg;
$arr = "{" .
"\n\t\"type\":\"setmetadata\"," .
"\n\t\"mi_id\":".$mi_id."," .
"\n\t\"idx\":".$idx.",".
"\n\t\"meta_id\":".$meta_id.",".
"\n\t\"value\":".$value
. "\n}";
//echo $arr;
// 构造包体
$bin_body = $arr;
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
//echo bin2hex($bin_data);
$res = $task_connection->send ( $bin_data );
}
/**
* 开始
*/
if ($msg == "start") {
$si=$msg;
$md_id=$msg;
$ml=$msg;
//修改参数
//$arr = "{" . "\n\t\"type\":\"setparam\"," . "\n\t\"operator_id\":1000," . "\n\t\"report\":1," . "\n\t\"report_ml\":" . "\n}";
/* $arr = "{" .
"\n\t\"type\":\"setparam\"," .
"\n\t\"report\":1," .
"\n\t\"report_ml\":"
. "\n}"; */
$arr = "{" . "\n\t\"type\":\"setparam\"," . "\n\t\"report\":0," . "\n\t\"report_ml\":" . "\n}";
//echo $arr;
// 构造包体
$bin_body = $arr;
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body . $d;
//echo bin2hex($bin_data);
$res = $task_connection->send ( $bin_data );
// start00000003000000010102030405060708
$arr = "{" . "\n\t\"type\":\"start\"," . "\n\t\"md_id\":\"".$md_id."\"," . "\n\t\"si\":".$si."," . "\n\t\"max_idx\": 1048576," . "\n\t\"ml\":" . "\n}";
//echo "start=".$arr;
// 构造包体
$bin_body = $arr;
// 包体长度
$body_len = strlen ( $bin_body );
//$body_len = ($body_len + 4 - 1) / 4 * 4;
$body_len = $body_len + 8;
$d="";
if($body_len%4!=0){
$a=$body_len%4;
$b=4-$a;
for($i=0;$i<$b;$i++){
$d.=pack("c","0");
$body_len++;
}
}
// 构造头部
$bin_head = pack ( "c*", "8" );
$bin_head .= pack ( "c", "0" );
$bin_head .= pack ( "S", $body_len );
$bin_data = $bin_head . pack ( "cccc", 189, 172, 36, 19 ) . $bin_body .$d;
//echo bin2hex($bin_data);
// 发送数据
$res = $task_connection->send ( $bin_data );
}
//file_put_contents ( "test1.txt", $arr . "\r\n", FILE_APPEND );
// 异步获得结果
$task_connection->onMessage = function ($task_connection, $task_result) use($ws_connection) {
global $worker;
//file_put_contents ( "test3.txt", $task_result . "\r\n", FILE_APPEND );
$aa = unpack ( 'C*', substr ( $task_result, 0, 2 ) );
$bb = unpack ( 'S', substr ( $task_result, 2, 2 ) );
if ($aa == "7") {
$cc = unpack ( 'S', substr ( $task_result, 8, 2 ) );
$dd = unpack ( 'S', substr ( $task_result, 16, 2 ) );
if ($dd > 0) {
$nmd = $dd ;
//echo "meta_num=".$nmd."\r\n";
$indexcount = 0;//inde的和
$list=array();
for($zz = 0; $zz < $nmd; $zz ++) {
$idx = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 2, 2 ) );
$meta_id = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 4, 2 ) );
$n_rcrd = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 6, 2 ) );
$teststr = "";
$metalist = null;
if ($n_rcrd > 0) {
$index = $n_rcrd ;
for($kk = 0; $kk < $index; $kk ++) {
/* echo "aaa=";
echo 20 + 8 _ $nmd + $indexcount _ 4 + 4 + $kk * 4;
echo "\r\n"; */
$tmpid = unpack ( 'f', substr ( $task_result, 20 + 8 _ $nmd + $indexcount _ 4 + 4 + $kk * 4, 4 ) );
if($meta_id==1){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==2){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==41){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==3){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==4){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==5){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==6){
//ccc=floatnum(arr_,4);
//ccc=arr_;
//ccc=aaa.toFixed(4);
$tmpid=number_format($tmpid,4,".","");
}else if($meta_id==17){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==18){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==20){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==5){
$tmpid=number_format($tmpid,2,".","");
}else {
$tmpid=number_format($tmpid,2,".","");
}
//$metalist = $tmpid ;
$metalist =$tmpid;
}
$indexcount = $indexcount + $n_rcrd ;
//echo "count=" . $indexcount . "\r\n";
//echo "index=" . $n_rcrd . "\r\n";
$list = $metalist;
$teststr = json_encode ( $list );
}
$list = $meta_id ;
$list = $idx ;
//file_put_contents ( "test3.txt", " type:" . $aa . "\r\n" . " flags:" . $aa . "\r\n" . " plen:" . $bb . "\r\n" . " mi_id:" . $cc . "\r\n" . " n_mb:" . $dd . "\r\n" . " idx:" . $idx . "\r\n" . " meta_id:" . $meta_id . "\r\n" . " n_rcrd:" . $n_rcrd . "\r\n" . " Meta:" . $teststr . "\r\n", FILE_APPEND );
}
$arr=array("type"=>"getmetadata_reply","data"=>$list);
//echo "<pre>";
//print_r($arr);
$s=json_encode($arr);
$mid=$cc;
//echo "mid=".$mid."\r\n";
$ws_connection=$worker->uidConnections;
/* if($s==null){
$s=1;
} */
$ws_connection->send ( $s );
}
} else if($aa == "11"){
$cc = unpack ( 'S', substr ( $task_result, 8, 2 ) );
$dd = unpack ( 'S', substr ( $task_result, 16, 2 ) );
if ($dd > 0) {
$nmd = $dd ;
//echo "meta_num=".$nmd."\r\n";
$indexcount = 0;//inde的和
$list=array();
for($zz = 0; $zz < $nmd; $zz ++) {
$idx = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 2, 2 ) );
$meta_id = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 4, 2 ) );
$n_rcrd = unpack ( 'S', substr ( $task_result, 20 + 8 * $zz + 6, 2 ) );
$teststr = "";
$metalist = null;
if ($n_rcrd > 0) {
$index = $n_rcrd ;
for($kk = 0; $kk < $index; $kk ++) {
/* echo "aaa=";
echo 20 + 8 _ $nmd + $indexcount _ 4 + 4 + $kk * 4;
echo "\r\n"; */
$tmpid = unpack ( 'f', substr ( $task_result, 20 + 8 _ $nmd + $indexcount _ 4 + 4 + $kk * 4, 4 ) );
//$metalist = $tmpid ;
//$tmpid=number_format($tmpid,5,".","");
if($meta_id==1){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==2){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==41){
$tmpid=number_format($tmpid,1,".","");
}else if($meta_id==3){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==4){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==5){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==6){
//ccc=floatnum(arr_,4);
//ccc=arr_;
//ccc=aaa.toFixed(4);
$tmpid=number_format($tmpid,4,".","");
}else if($meta_id==17){
$tmpid=number_format($tmpid,3,".","");
}else if($meta_id==18){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==20){
$tmpid=number_format($tmpid,2,".","");
}else if($meta_id==5){
$tmpid=number_format($tmpid,2,".","");
}else {
$tmpid=number_format($tmpid,2,".","");
}
//echo $meta_id."\r\n";
$metalist =$tmpid;
}
$indexcount = $indexcount + $n_rcrd ;
//echo "count=" . $indexcount . "\r\n";
//echo "index=" . $n_rcrd . "\r\n";
$list = $metalist;
$teststr = json_encode ( $list );
}
$list = $meta_id ;
$list = $idx ;
//file_put_contents ( "test3.txt", " type:" . $aa . "\r\n" . " flags:" . $aa . "\r\n" . " plen:" . $bb . "\r\n" . " mi_id:" . $cc . "\r\n" . " n_mb:" . $dd . "\r\n" . " idx:" . $idx . "\r\n" . " meta_id:" . $meta_id . "\r\n" . " n_rcrd:" . $n_rcrd . "\r\n" . " Meta:" . $teststr . "\r\n", FILE_APPEND );
}
$arr=array("type"=>"sample_reply","data"=>$list);
//echo "<pre>";
//print_r($arr);
$s=json_encode($arr);
//$s=json_encode($arr);
//echo "<pre>";
//print_r($arr);
/* if($s==null){
$s=1;
} */
if(!isset($ws_connection->uid)){
$ws_connection->send ( $s );
}else{
$ws_connection=$worker->uidConnections;
$ws_connection->send ( $s );
}
}
}else {
$a = substr ( $task_result, 8, strrpos ( $task_result, '}' ) - 7 );
//echo $a."\r\n";
$s = str_replace ( "\n\t", "", $a );
$s = str_replace ( " ", "", $s );
//$s = str_replace ( "\r", "", $s );
//$s = str_replace ( "}]}]", "}]", $s );
$s = str_replace ( "}]\r\n}]", "}]", $s );
$s = str_replace ( "}\r\n}", "}", $s );
$s1 = json_decode ( $s ,true);
$s = json_decode ( $s);
if($s1=="start_reply"){
}
$s = json_encode ( $s );
//echo $s."\r\n";
//$ws_connection=$worker->uidConnections;
$ws_connection->send ( $s);
}
// 结果
// var_dump ( $task_result );
// var_dump ( $s );
// 获得结果后记得关闭异步链
// $task_connection->close();
// 通知对应的websocket客户端任务完成
// if($s)
/* $data = $s;
if ($aa == "7") {
}else{
} */
};
// 执行异步链接
$task_connection->connect ();
};
Worker::runAll ();
?>