thinkphp8.0用workerman根据用户userid主动推送给多个客户端(包含一对一聊天)

haomc

效果图片:

前端测试代码(按F12):
测试链接:http://test.com/?userid=1000

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>WebSocket Example</title>
</head>
<body>
    <p>用户:<input type="text" id="userToId" value="1000"></p>
    <p>内容:<input type="text" id="msg"></p>
    <p><button id="sendMessageButton">发送消息</button></p>

    <script>
        function getUrlParams() {
            const url = window.location.search
          const paramsRegex = /[?&]+([^=&]+)=([^&]*)/gi;
          const params = {};
          let match;
          while (match = paramsRegex.exec(url)) {
            params[match[1]] = match[2];
          }
          return params;
        }
        // 创建 WebSocket 连接
        const ws = new WebSocket('ws://localhost:8481');
        let data = getUrlParams()
        // 连接打开时触发
        ws.onopen = function() {
            console.log('已连接到服务器.',data.userid);
            ws.send(JSON.stringify({
                type: 'bind',
                userId: data.userid,
            }));
            // 间隔30秒通知一次服务器我还在线
            setInterval(() => {
                ws.send(JSON.stringify({
                    type: 'ping'
                }));
            },50000);
        };

        // 收到服务器消息时触发
        ws.onmessage = function(event) {
            console.log(event.data);
            // 可以在这里添加更多的处理逻辑,比如更新页面内容
            // alert('Received from server: ' + event.data);
        };

        // 连接关闭时触发
        ws.onclose = function() {
            console.log('已断开与服务器的连接');
        };

        // 连接出错时触发
        ws.onerror = function(error) {
            console.error('WebSocket Error:', error);
        };

        // 获取按钮元素
        const sendMessageButton = document.getElementById('sendMessageButton');
        const msg = document.getElementById('msg');
        const userToId = document.getElementById('userToId');

        // 为按钮添加点击事件监听器
        sendMessageButton.addEventListener('click', function() {
            // 检查 WebSocket 连接是否已打开
            if (ws.readyState === WebSocket.OPEN) {
                // 发送消息给服务器'发送:' + msg.value
                ws.send(JSON.stringify({
                    type: 'text',
                    userToId: userToId.value,
                    message: msg.value,
                }));
                // console.log('Message sent to server: Hello Server!');
            } else {
                console.error('WebSocket未打开。就绪状态:', ws.readyState);
            }
        });

    </script>
</body>
</html>

后端发送代码:
测试链接:http://test.com/tests/t
```php
public function t()
    {
        // 建立socket连接到内部推送端口
        $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
        // 推送的数据,包含uid字段,表示是给这个uid推送
        $data = [
            'type' => 'text',
            'userToId' => '1000',
            'message' => '服务器端TTT的通知消息'.rand(10,99)
        ];
        // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
        fwrite($client, json_encode($data)."\n");
        // 读取推送结果
        echo fread($client, 8192);

        return 11;
    }

运行:php think websocket:server

```php

<?php
//文件路径: app\command\WebSocketServer.php
namespace app\command;

use think\console\Command;
use think\console\Input;
use think\console\Output;
use Workerman\Connection\TcpConnection;
use Workerman\Timer;
use Workerman\Worker;
/*
 * 配置信息 路径: \config\console.php
 * return [
    // 指令定义
    'commands' => [       
        'websocket:server' => app\command\WebSocketServer::class,
    ],
];

 */
class WebSocketServer extends Command
{
    protected function configure(): void
    {
        $this->setName('websocket:server')->setDescription('Start WebSocket Server');
    }

    protected function execute(Input $input, Output $output): void
    {
        // 心跳间隔55秒
        define('HEARTBEAT_TIME', 55);
        $worker = new Worker('websocket://0.0.0.0:8481');
        $worker->count = 1;
        $worker->connectionList = [];
        // 进程启动后设置一个每10秒运行一次的定时器
        $worker->onWorkerStart = function ($worker)  use ($output){
            Timer::add(10, function()use($worker){
                $time_now = time();
                foreach($worker->connections as $connection) {
                    // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
                    if (empty($connection->lastMessageTime)) {
                        $connection->lastMessageTime = $time_now;
                        continue;
                    }
                    // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
                    if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
                        $connection->send('您长时间未请求服务器,链接已经断开');
                        $connection->close();
                    }
                }
            });

            // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
            $inner_text_worker = new Worker('Text://0.0.0.0:5678');
            $inner_text_worker->onMessage = function($connection, $buffer) use ($worker)
            {
                // $data数组格式,里面有uid,表示向那个uid的页面推送数据
                $data = json_decode($buffer, true);
                //聊天
                if ($data['type'] == 'text')
                {
                    if (isset($worker->connectionList[$data['userToId']]))
                    {
                        $conns = $worker->connectionList[$data['userToId']];
                        foreach ($conns as $conn){
                            $conn->send('用户'.$data['userToId'].':'.$data['message']);
                        }
                        $connection->send('我:成功了');
                    }else{
                        $connection->send('我:对方下线了哈');
                    }
                }
            };
            $inner_text_worker->listen();

//            $output->writeln('用户onWorkerStart'  );
        };

        $worker->onConnect = function(TcpConnection $connection) use ($output, $worker)
        {
            // 指令输出
            $output->writeln('用户:'.$worker->id.'-'.$connection->id.'链接成功');
        };

        $worker->onMessage = function ($connection, $data) use ($output, $worker) {
            // 给connection临时设置一个lastMessageTime属性,用来记录上次收到消息的时间
            $connection->lastMessageTime = time();

            $data = json_decode($data, true);
            //绑定用户ID
            if ($data['type'] == 'bind' && !isset($connection->userId))
            {
                $connection->userId = $data['userId'];
                $worker->connectionList[$connection->userId][$connection->id] = $connection;
            }

            //聊天
            if ($data['type'] == 'text')
            {
                if (isset($worker->connectionList[$data['userToId']]))
                {
                    $conns = $worker->connectionList[$data['userToId']];
                    foreach ($conns as $conn){
                        $conn->send('用户'.$data['userToId'].':'.$data['message']);
                    }
                }else{
                    $connection->send('我:对方下线了');
                }
            }

//            // 指令输出
//            $output->writeln('用户:'.$connection->id.',客户端接收到数据:'.json_encode($worker->connections)  );
            //向客户端自己发送数据
            if (!empty($data['message'])){
                $connection->send('我:'.$data['message']);
            }
        };
        // 客户端连接断开时,断开对应的链接连接
        $worker->onClose = function(TcpConnection $connection) use ($output, $worker)
        {

            if(isset($worker->connectionList[$connection->userId]))
            {
                // 连接断开时删除映射
                if (count($worker->connectionList[$connection->userId]) == 1){
                    unset($worker->connectionList[$connection->userId]);
                }else{
                    unset($worker->connectionList[$connection->userId][$connection->id]);
                }
            }
            // 指令输出
            $output->writeln('用户:'.$connection->id.'已经断开链接。'.$connection->userId);
        };

        Worker::runAll();
    }
}
166 0 0
0个评论

haomc

220
积分
0
获赞数
0
粉丝数
4天前 加入
×
🔝