发一个基于webman自定义进程+websocket的推送功能,自带心跳,自带断线重连。消息通过订阅发布机制,使用起来非常简单方便,客户端与客户端直接可以直接发消息。可用于单聊、群聊、app消息推送、网页即使推送等。
<?php
namespace process;
use Workerman\Connection\TcpConnection;
use Workerman\Timer;
class WebsocketPush
{
/**
* 所有websocket客户端
* @var array
*/
protected $connections = [];
// Websocket连接上来时
public function onWebSocketConnect(TcpConnection $connection)
{
$connection->timer = Timer::add(25, function () use ($connection) {
$connection->send(json_encode(['type'=>'ping']));
});
}
// WebSocket发来消息时
public function onMessage(TcpConnection $connection, $data)
{
$data = json_decode($data, true);
$type = $data['type'] ?? null;
switch ($type) {
// 心跳
case 'ping':
return;
// 订阅
case 'subscribe':
$this->subscribe($connection, $data['subject']);
return;
// 发布
case 'publish':
$this->publish($data['subject'], $data['data'] ?? null);
return;
}
}
// WebSocket连接关闭时
public function onClose(TcpConnection $connection)
{
if (isset($connection->timer)) {
Timer::del($connection->timer);
}
foreach ($connection->subject??[] as $subject) {
unset($this->connections[$subject][$connection->id]);
}
}
// 订阅
protected function subscribe(TcpConnection $connection, string $subject)
{
$this->connections[$subject][$connection->id] = $connection;
if (empty($connection->subject)) {
$connection->subject = [];
}
$connection->subject[$subject] = $subject;
}
// 发布
protected function publish(string $subject, $data)
{
$connections = $this->connections[$subject] ?? [];
foreach ($connections as $connection) {
$connection->send(json_encode(['type' => 'publish', 'subject' => $subject, 'data' => $data]));
}
}
}
return [
// ...其它配置...
'websocket-push' => [
'handler' => process\WebSocketPush::class,
'listen' => 'websocket://0.0.0.0:5050',
'count => 1, // count必须为1
]
];
php start.php restart
var ws;
// 连接
function connect()
{
ws && ws.close();
ws = new WebSocket('ws://127.0.0.1:5050');
ws.onopen = function () {
// 订阅
ws.send(JSON.stringify({
type: 'subscribe',
subject: 'user-1'
}));
// 发布
ws.send(JSON.stringify({
type: 'publish',
subject: 'user-1',
data: 'hello'
}));
};
// 断线重连
ws.onclose = function () {
setTimeout(function () {
connect();
}, 1000);
};
// 收到消息
ws.onmessage = function (res) {
var data = JSON.parse(res.data);
switch (data.type) {
// 心跳消息
case 'ping':
return;
// 收到消息
case 'publish':
console.log(res.data);
}
};
}
// 定时心跳
setTimeout(function () {
ws.readyState === 1 && ws.send(JSON.stringify({
type: 'ping'
}));
}, 25000);
// 发起连接
connect();
收到打印
{"type":"publish","subject":"user-1","data":"hello"}
客户端对哪些消息感兴趣,使用
ws.send(JSON.stringify({
type: 'subscribe',
subject: 'user-1'
}));
订阅,例如当前用于对于自己的消息感兴趣,则订阅subject=user-1
,对于群id为20的消息感兴趣,则可订阅例如subject=group-20
。subject数据格式自己定,发布与订阅一致就行
客户端要对哪个主题发布数据,例如向群20发布消息
ws.send(JSON.stringify({
type: 'publish',
subject: 'group-20',
data: 'hello'
}));
订阅类似群聊吗
订阅群聊私聊都可
老大 回复好快啊