https://www.workerman.net/doc/gateway-worker/principle.html
下面是进一步的代码片段说明
* 设置消息回调
*
* @param \Workerman\Connection\ConnectionInterface $connection
* @param string $buffer
* @return void
*/
public function onMessage($connection, $buffer)
{
// 删除定时器
Timer::del($connection->timeout_timerid);
$data = @json_decode($buffer, true);
if (empty($data['event'])) {
$error = "Bad request for Register service. Request info(IP:".$connection->getRemoteIp().", Request Buffer:$buffer). See http://doc2.workerman.net/register-auth-timeout.html";
Worker::log($error);
return $connection->close($error);
}
$event = $data['event'];
$secret_key = isset($data['secret_key']) ? $data['secret_key'] : '';
// 开始验证
switch ($event) {
// 是 gateway 连接
case 'gateway_connect':
if (empty($data['address'])) {
echo "address not found\n";
return $connection->close();
}
if ($secret_key !== $this->secretKey) {
Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
return $connection->close();
}
$this->_gatewayConnections[$connection->id] = $data['address'];
$this->broadcastAddresses();
break;
// 是 worker 连接
case 'worker_connect':
if ($secret_key !== $this->secretKey) {
Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
return $connection->close();
}
$this->_workerConnections[$connection->id] = $connection;
$this->broadcastAddresses($connection);
break;
case 'ping':
break;
default:
Worker::log("Register unknown event:$event IP: ".$connection->getRemoteIp()." Buffer:$buffer. See http://doc2.workerman.net/register-auth-timeout.html");
$connection->close();
}
}
/**
* 向 BusinessWorker 广播 gateway 内部通讯地址
*
* @param \Workerman\Connection\ConnectionInterface $connection
*/
public function broadcastAddresses($connection = null)
{
$data = array(
'event' => 'broadcast_addresses',
'addresses' => array_unique(array_values($this->_gatewayConnections)),
);
$buffer = json_encode($data);
if ($connection) {
$connection->send($buffer);
return;
}
foreach ($this->_workerConnections as $con) {
$con->send($buffer);
}
}
/**
* 构造函数
*
* @param string $socket_name
* @param array $context_option
*/
public function __construct($socket_name, $context_option = array())
{
parent::__construct($socket_name, $context_option);
$this->_gatewayPort = substr(strrchr($socket_name,':'),1);
$this->router = array("\\GatewayWorker\\Gateway", 'routerBind');
$backtrace = debug_backtrace();
$this->_autoloadRootPath = dirname($backtrace[0]['file']);
}
/**
* client_id 与 worker 绑定
*
* @param array $worker_connections
* @param TcpConnection $client_connection
* @param int $cmd
* @param mixed $buffer
* @return TcpConnection
*/
public static function routerBind($worker_connections, $client_connection, $cmd, $buffer)
{
if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) {
$client_connection->businessworker_address = array_rand($worker_connections);
}
return $worker_connections[$client_connection->businessworker_address];
}
/**
* 发送数据给 worker 进程
*
* @param int $cmd
* @param TcpConnection $connection
* @param mixed $body
* @return bool
*/
protected function sendToWorker($cmd, $connection, $body = '')
{
$gateway_data = $connection->gatewayHeader;
$gateway_data['cmd'] = $cmd;
$gateway_data['body'] = $body;
$gateway_data['ext_data'] = $connection->session;
if ($this->_workerConnections) {
// 调用路由函数,选择一个worker把请求转发给它
/** @var TcpConnection $worker_connection */
$worker_connection = call_user_func($this->router, $this->_workerConnections, $connection, $cmd, $body);
if (false === $worker_connection->send($gateway_data)) {
$msg = "SendBufferToWorker fail. May be the send buffer are overflow. See http://doc2.workerman.net/send-buffer-overflow.html";
static::log($msg);
return false;
}
} // 没有可用的 worker
else {
// gateway 启动后 1-2 秒内 SendBufferToWorker fail 是正常现象,因为与 worker 的连接还没建立起来,
// 所以不记录日志,只是关闭连接
$time_diff = 2;
if (time() - $this->_startTime >= $time_diff) {
$msg = 'SendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready. See http://doc2.workerman.net/send-buffer-to-worker-fail.html';
static::log($msg);
}
$connection->destroy();
return false;
}
return true;
}
/**
* 当 worker 发来数据时
*
* @param TcpConnection $connection
* @param mixed $data
* @throws \Exception
*
* @return void
*/
public function onWorkerMessage($connection, $data)
{
$cmd = $data['cmd'];
if (empty($connection->authorized) && $cmd !== GatewayProtocol::CMD_WORKER_CONNECT && $cmd !== GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT) {
self::log("Unauthorized request from " . $connection->getRemoteIp() . ":" . $connection->getRemotePort());
$connection->close();
return;
}
switch ($cmd) {
// BusinessWorker连接Gateway
case GatewayProtocol::CMD_WORKER_CONNECT:
$worker_info = json_decode($data['body'], true);
if ($worker_info['secret_key'] !== $this->secretKey) {
self::log("Gateway: Worker key does not match ".var_export($this->secretKey, true)." !== ". var_export($this->secretKey));
$connection->close();
return;
}
$key = $connection->getRemoteIp() . ':' . $worker_info['worker_key'];
// 在一台服务器上businessWorker->name不能相同
if (isset($this->_workerConnections[$key])) {
self::log("Gateway: Worker->name conflict. Key:{$key}");
$connection->close();
return;
}
$connection->key = $key;
$this->_workerConnections[$key] = $connection;
$connection->authorized = true;
if ($this->onBusinessWorkerConnected) {
call_user_func($this->onBusinessWorkerConnected, $connection);
}
return;
.......... 省略其他判断
default :
$err_msg = "gateway inner pack err cmd=$cmd";
echo $err_msg;
}
}
/**
* 当 Gateway 启动的时候触发的回调函数
*
* @return void
*/
public function onWorkerStart()
{
// 分配一个内部通讯端口
$this->lanPort = $this->startPort + $this->id;
// 如果有设置心跳,则定时执行
if ($this->pingInterval > 0) {
$timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval / 2 : $this->pingInterval;
Timer::add($timer_interval, array($this, 'ping'));
}
// 如果BusinessWorker ip不是127.0.0.1,则需要加gateway到BusinessWorker的心跳
if ($this->lanIp !== '127.0.0.1') {
Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingBusinessWorker'));
}
if (!class_exists('\Protocols\GatewayProtocol')) {
class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
}
//如为公网IP监听,直接换成0.0.0.0 ,否则用内网IP
$listen_ip=filter_var($this->lanIp,FILTER_VALIDATE_IP,FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE)?'0.0.0.0':$this->lanIp;
// 初始化 gateway 内部的监听,用于监听 worker 的连接已经连接上发来的数据
$this->_innerTcpWorker = new Worker("GatewayProtocol://{$listen_ip}:{$this->lanPort}");
$this->_innerTcpWorker->reusePort = false;
$this->_innerTcpWorker->listen();
$this->_innerTcpWorker->name = 'GatewayInnerWorker';
// 重新设置自动加载根目录
Autoloader::setRootPath($this->_autoloadRootPath);
// 设置内部监听的相关回调
$this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');
$this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
$this->_innerTcpWorker->onClose = array($this, 'onWorkerClose');
// 注册 gateway 的内部通讯地址,worker 去连这个地址,以便 gateway 与 worker 之间建立起 TCP 长连接
$this->registerAddress();
if ($this->_onWorkerStart) {
call_user_func($this->_onWorkerStart, $this);
}
}
/**
* 存储当前 Gateway 的内部通信地址
*
* @return bool
*/
public function registerAddress()
{
$address = $this->lanIp . ':' . $this->lanPort;
foreach ($this->registerAddress as $register_address) {
$register_connection = new AsyncTcpConnection("text://{$register_address}");
$secret_key = $this->secretKey;
$register_connection->onConnect = function($register_connection) use ($address, $secret_key, $register_address){
$register_connection->send('{"event":"gateway_connect", "address":"' . $address . '", "secret_key":"' . $secret_key . '"}');
// 如果Register服务器不在本地服务器,则需要保持心跳
if (strpos($register_address, '127.0.0.1') !== 0) {
$register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
$register_connection->send('{"event":"ping"}');
});
}
};
$register_connection->onClose = function ($register_connection) {
if(!empty($register_connection->ping_timer)) {
Timer::del($register_connection->ping_timer);
}
$register_connection->reconnect(1);
};
$register_connection->connect();
}
}
/**
* 连接服务注册中心
*
* @return void
*/
public function connectToRegister()
{
foreach ($this->registerAddress as $register_address) {
$register_connection = new AsyncTcpConnection("text://{$register_address}");
$secret_key = $this->secretKey;
$register_connection->onConnect = function () use ($register_connection, $secret_key, $register_address) {
$register_connection->send('{"event":"worker_connect","secret_key":"' . $secret_key . '"}');
// 如果Register服务器不在本地服务器,则需要保持心跳
if (strpos($register_address, '127.0.0.1') !== 0) {
$register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
$register_connection->send('{"event":"ping"}');
});
}
};
$register_connection->onClose = function ($register_connection) {
if(!empty($register_connection->ping_timer)) {
Timer::del($register_connection->ping_timer);
}
$register_connection->reconnect(1);
};
$register_connection->onMessage = array($this, 'onRegisterConnectionMessage');
$register_connection->connect();
}
}
/**
* 当注册中心发来消息时
*
* @return void
*/
public function onRegisterConnectionMessage($register_connection, $data)
{
$data = json_decode($data, true);
if (!isset($data['event'])) {
echo "Received bad data from Register\n";
return;
}
$event = $data['event'];
switch ($event) {
case 'broadcast_addresses':
if (!is_array($data['addresses'])) {
echo "Received bad data from Register. Addresses empty\n";
return;
}
$addresses = $data['addresses'];
$this->_gatewayAddresses = array();
// 保存gateway列表,准备连接
foreach ($addresses as $addr) {
$this->_gatewayAddresses[$addr] = $addr;
}
$this->checkGatewayConnections($addresses);
break;
default:
echo "Receive bad event:$event from Register.\n";
}
}
/**
* 检查 gateway 的通信端口是否都已经连
* 如果有未连接的端口,则尝试连接
*
* @param array $addresses_list
*/
public function checkGatewayConnections($addresses_list)
{
if (empty($addresses_list)) {
return;
}
foreach ($addresses_list as $addr) {
if (!isset($this->_waitingConnectGatewayAddresses[$addr])) {
$this->tryToConnectGateway($addr);
}
}
}
/**
* 尝试连接 Gateway 内部通讯地址
*
* @param string $addr
*/
public function tryToConnectGateway($addr)
{
if (!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddresses[$addr]) && isset($this->_gatewayAddresses[$addr])) {
$gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
$gateway_connection->remoteAddress = $addr;
$gateway_connection->onConnect = array($this, 'onConnectGateway');
$gateway_connection->onMessage = array($this, 'onGatewayMessage');
$gateway_connection->onClose = array($this, 'onGatewayClose');
$gateway_connection->onError = array($this, 'onGatewayError');
$gateway_connection->maxSendBufferSize = $this->sendToGatewayBufferSize;
if (TcpConnection::$defaultMaxSendBufferSize == $gateway_connection->maxSendBufferSize) {
$gateway_connection->maxSendBufferSize = 50 * 1024 * 1024;
}
$gateway_data = GatewayProtocol::$empty;
$gateway_data['cmd'] = GatewayProtocol::CMD_WORKER_CONNECT;
$gateway_data['body'] = json_encode(array(
'worker_key' =>"{$this->name}:{$this->id}",
'secret_key' => $this->secretKey,
));
$gateway_connection->send($gateway_data);
$gateway_connection->connect();
$this->_connectingGatewayAddresses[$addr] = $addr;
}
unset($this->_waitingConnectGatewayAddresses[$addr]);
}
/**
* 当进程启动时一些初始化工作
*
* @return void
*/
protected function onWorkerStart()
{
if (function_exists('opcache_reset')) {
opcache_reset();
}
if (!class_exists('\Protocols\GatewayProtocol')) {
class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
}
if (!is_array($this->registerAddress)) {
$this->registerAddress = array($this->registerAddress);
}
$this->connectToRegister();
\GatewayWorker\Lib\Gateway::setBusinessWorker($this);
\GatewayWorker\Lib\Gateway::$secretKey = $this->secretKey;
if ($this->_onWorkerStart) {
call_user_func($this->_onWorkerStart, $this);
}
if (is_callable($this->eventHandler . '::onWorkerStart')) {
call_user_func($this->eventHandler . '::onWorkerStart', $this);
}
if (function_exists('pcntl_signal')) {
// 业务超时信号处理
pcntl_signal(SIGALRM, array($this, 'timeoutHandler'), false);
} else {
$this->processTimeout = 0;
}
// 设置回调 -- 主要的业务代码,注意是静态方法, eventHandler 可以自定义类
if (is_callable($this->eventHandler . '::onConnect')) {
$this->_eventOnConnect = $this->eventHandler . '::onConnect';
}
if (is_callable($this->eventHandler . '::onMessage')) {
$this->_eventOnMessage = $this->eventHandler . '::onMessage';
} else {
echo "Waring: {$this->eventHandler}::onMessage is not callable\n";
}
if (is_callable($this->eventHandler . '::onClose')) {
$this->_eventOnClose = $this->eventHandler . '::onClose';
}
if (is_callable($this->eventHandler . '::onWebSocketConnect')) {
$this->_eventOnWebSocketConnect = $this->eventHandler . '::onWebSocketConnect';
}
}
感谢分享