这段时间本身比较忙,也很少在关注技术相关的点,上个月空了刚好有时间看看群里,结果发现大家在讨论协程以及webman/workerman的劣势-阻塞退化问题,本来说是稍稍提两下实现方向,结果一来二去直接弄了一个插件出来,经过反反复复修改,最后发布了webman-coroutine插件
workerman是标准的master/worker多进程模型,master只负责管理worker,而每个worker会启动event-loop进行事件监听,这里面包含了stream、timer等事件,所有事件公用一个event-loop,公用一套调度体系;每一个事件回调会触发注册的回调函数,整体是单线程的执行调度,也就是说如果回调函数里面有阻塞,那么会阻塞event-loop的循环,直到回调函数执行完毕才会执行下一个事件回调。
也就是说你把event-loop看作是一个队列,那么回调函数就是消费者,这个队列是一个单消费者的队列,当回调函数阻塞的时候,队列是没有其他消费者来消费回调的,这也就造成了队头阻塞问题,当队列buffer被占满时,生产者将无法投送事件到event-loop中,这会造成什么问题呢?假设我们有N个worker监听8080端口,当有消息的时候会触发一次start()
方法,而start()
方法是一个while(1){}
的死循环,那么每请求一次将占用一个worker,导致worker一直在等待start()
执行完毕才能释放控制权给event-loop,当N个任务后,所有worker将被占满,至此,workerman将无法接收8080端口的任何信息。
当然,现实环境下没有这么夸张,但是遇到一些长阻塞的方法时还是会存在并发量上不去的问题,那么在传统workerman的开发环境下怎么处理呢?开多一点worker;其实你把它看成一个消息队列就好理解,当消费能力上不去的时候,要么减少消费阻塞时长,要么就是增加消费者。webman也同理,因为webman是在事件回调函数内进行框架的加载和控制器方法的执行的。
有朋友会说,webman/workerman可以使用swoole作为底层驱动,只要安装swoole并将workerman的驱动设置为Swoole即可使用协程了;这种说法并不完全正确。
以下是workerman 4.x的swoole驱动实现:
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author Ares<aresrr#qq.com>
* @link http://www.workerman.net/
* @link https://github.com/ares333/Workerman
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
namespace Workerman\Events;
use Workerman\Worker;
use Swoole\Event;
use Swoole\Timer;
class Swoole implements EventInterface
{
protected $_timer = array();
protected $_timerOnceMap = array();
protected $mapId = 0;
protected $_fd = array();
// milisecond
public static $signalDispatchInterval = 500;
protected $_hasSignal = false;
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::add()
*/
public function add($fd, $flag, $func, $args = array())
{
switch ($flag) {
case self::EV_SIGNAL:
$res = \pcntl_signal($fd, $func, false);
if (! $this->_hasSignal && $res) {
Timer::tick(static::$signalDispatchInterval,
function () {
\pcntl_signal_dispatch();
});
$this->_hasSignal = true;
}
return $res;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$method = self::EV_TIMER === $flag ? 'tick' : 'after';
if ($this->mapId > \PHP_INT_MAX) {
$this->mapId = 0;
}
$mapId = $this->mapId++;
$t = (int)($fd * 1000);
if ($t < 1) {
$t = 1;
}
$timer_id = Timer::$method($t,
function ($timer_id = null) use ($func, $args, $mapId) {
try {
\call_user_func_array($func, (array)$args);
} catch (\Exception $e) {
Worker::stopAll(250, $e);
} catch (\Error $e) {
Worker::stopAll(250, $e);
}
// EV_TIMER_ONCE
if (! isset($timer_id)) {
// may be deleted in $func
if (\array_key_exists($mapId, $this->_timerOnceMap)) {
$timer_id = $this->_timerOnceMap[$mapId];
unset($this->_timer[$timer_id],
$this->_timerOnceMap[$mapId]);
}
}
});
if ($flag === self::EV_TIMER_ONCE) {
$this->_timerOnceMap[$mapId] = $timer_id;
$this->_timer[$timer_id] = $mapId;
} else {
$this->_timer[$timer_id] = null;
}
return $timer_id;
case self::EV_READ:
case self::EV_WRITE:
$fd_key = (int) $fd;
if (! isset($this->_fd[$fd_key])) {
if ($flag === self::EV_READ) {
$res = Event::add($fd, $func, null, SWOOLE_EVENT_READ);
$fd_type = SWOOLE_EVENT_READ;
} else {
$res = Event::add($fd, null, $func, SWOOLE_EVENT_WRITE);
$fd_type = SWOOLE_EVENT_WRITE;
}
if ($res) {
$this->_fd[$fd_key] = $fd_type;
}
} else {
$fd_val = $this->_fd[$fd_key];
$res = true;
if ($flag === self::EV_READ) {
if (($fd_val & SWOOLE_EVENT_READ) !== SWOOLE_EVENT_READ) {
$res = Event::set($fd, $func, null,
SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
$this->_fd[$fd_key] |= SWOOLE_EVENT_READ;
}
} else {
if (($fd_val & SWOOLE_EVENT_WRITE) !== SWOOLE_EVENT_WRITE) {
$res = Event::set($fd, null, $func,
SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
$this->_fd[$fd_key] |= SWOOLE_EVENT_WRITE;
}
}
}
return $res;
}
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::del()
*/
public function del($fd, $flag)
{
switch ($flag) {
case self::EV_SIGNAL:
return \pcntl_signal($fd, SIG_IGN, false);
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
// already remove in EV_TIMER_ONCE callback.
if (! \array_key_exists($fd, $this->_timer)) {
return true;
}
$res = Timer::clear($fd);
if ($res) {
$mapId = $this->_timer[$fd];
if (isset($mapId)) {
unset($this->_timerOnceMap[$mapId]);
}
unset($this->_timer[$fd]);
}
return $res;
case self::EV_READ:
case self::EV_WRITE:
$fd_key = (int) $fd;
if (isset($this->_fd[$fd_key])) {
$fd_val = $this->_fd[$fd_key];
if ($flag === self::EV_READ) {
$flag_remove = ~ SWOOLE_EVENT_READ;
} else {
$flag_remove = ~ SWOOLE_EVENT_WRITE;
}
$fd_val &= $flag_remove;
if (0 === $fd_val) {
$res = Event::del($fd);
if ($res) {
unset($this->_fd[$fd_key]);
}
} else {
$res = Event::set($fd, null, null, $fd_val);
if ($res) {
$this->_fd[$fd_key] = $fd_val;
}
}
} else {
$res = true;
}
return $res;
}
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::clearAllTimer()
*/
public function clearAllTimer()
{
foreach (array_keys($this->_timer) as $v) {
Timer::clear($v);
}
$this->_timer = array();
$this->_timerOnceMap = array();
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::loop()
*/
public function loop()
{
Event::wait();
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::destroy()
*/
public function destroy()
{
Event::exit();
posix_kill(posix_getpid(), SIGINT);
}
/**
*
* {@inheritdoc}
*
* @see \Workerman\Events\EventInterface::getTimerCount()
*/
public function getTimerCount()
{
return \count($this->_timer);
}
}
我们可以看到确实正确加载了Swoole的event-loop驱动,但仅仅也只是加载了event-loop,并没有在回调的注册部分加入协程,那么就相当于仅仅只是写了一个\Co\run()
,但是没有在\Co\run()
中创建协程进行运行,那么意味着当事件的回调函数中当监听8080端口进行处理,遇到了阻塞的时候还是无法出让当前控制权给event-loop,event-loop就没办法执行下一个8080端口的事件,为什么会这样呢?因为workerman使用stream_socket_server()
对外部网络进行监听,而如下代码又会等待回调:
// Workerman\Worker 2465-2476行
public function resumeAccept()
{
// Register a listener to be notified when server socket is ready to read.
if (static::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
if ($this->transport !== 'udp') {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
} else {
static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
}
$this->_pauseAccept = false;
}
}
那么即便swoole底层hook了系统函数,也只是将mainSocket的回调出让,但来自相同mainSocket的下一次事件是需要上一次事件完结恢复才可以继续接收的。
以上的问题会导致什么样的问题呢?
传统解决方案:多开worker
传统解决方案:自定义进程实现 或 使用外部服务
基于上述情况,我开发了webman/workerman可用的协程基建插件,webman-coroutine;
插件通过适配器模式和工厂模式的方法去兼容现目前市面上比较常见的几种协程驱动swow、swoole、php-fiber(ripple实现),将不同的底层驱动抽象适配为统一的调用方法,并且兼容非协程环境,也就意味着你用同一套代码写出来的业务可以较为平滑的切换在这些环境及非协程环境之间,且保证逻辑是正常运行。
插件为webman的开发框架重新实现了webserver,让原本不完备支持协程的框架可以完备的支持协程:
<?php
/**
* @author workbunny/Chaz6chez
* @email chaz6chez1993@outlook.com
*/
declare(strict_types=1);
namespace Workbunny\WebmanCoroutine;
use Webman\App;
use Webman\Http\Request;
use Workbunny\WebmanCoroutine\Handlers\HandlerInterface;
use Workbunny\WebmanCoroutine\Utils\Coroutine\Coroutine;
use Workbunny\WebmanCoroutine\Utils\WaitGroup\WaitGroup;
use Workerman\Connection\ConnectionInterface;
use Workerman\Worker;
/**
* 协程化web服务进程
*/
class CoroutineWebServer extends App
{
/**
* 每个连接的协程计数
*
* @var int[]
*/
protected static array $_connectionCoroutineCount = [];
/**
* 获取连接的协程计数
*
* @return int[]|int
*/
public static function getConnectionCoroutineCount(?string $connectionId = null): array|int
{
return $connectionId === null
? static::$_connectionCoroutineCount
: (static::$_connectionCoroutineCount[$connectionId] ?? 0);
}
/**
* 回收连接的协程计数
*
* @param string $connectionId
* @param bool $force
* @return void
*/
public static function unsetConnectionCoroutineCount(string $connectionId, bool $force = false): void
{
if (!$force and self::getConnectionCoroutineCount($connectionId) > 0) {
return;
}
unset(static::$_connectionCoroutineCount[$connectionId]);
}
/** @inheritdoc */
public function onWorkerStart($worker)
{
if (!\config('plugin.workbunny.webman-coroutine.app.enable', false)) {
return;
}
parent::onWorkerStart($worker);
/** @var HandlerInterface $handler */
$handler = Factory::getCurrentHandler();
$handler::initEnv();
}
/**
* 停止服务
*
* - 不用返回值和参数标定是为了兼容
*
* @param Worker|mixed $worker
* @return void
*/
public function onWorkerStop($worker, ...$params)
{
if (is_callable($call = [parent::class, 'onWorkerStop'])) {
call_user_func($call, $worker, ...$params);
}
}
/**
* 连接建立
*
* - 不用返回值和参数标定是为了兼容
*
* @param ConnectionInterface $connection
* @param mixed ...$params
* @return void
*/
public function onConnect($connection, ...$params): void
{
if (!is_object($connection)) {
return;
}
if (is_callable($call = [parent::class, 'onConnect'])) {
// 协程化创建连接
new Coroutine(function () use ($call, $connection, $params) {
call_user_func($call, $connection, ...$params);
});
}
}
/**
* 连接关闭
*
* - 不用返回值和参数标定是为了兼容
*
* @param ConnectionInterface|mixed $connection
* @param ...$params
* @return void
*/
public function onClose($connection, ...$params)
{
if (!is_object($connection)) {
return;
}
if (is_callable($call = [parent::class, 'onClose'])) {
// 协程化关闭连接
new Coroutine(function () use ($call, $connection, $params) {
call_user_func($call, $connection, ...$params);
});
}
self::unsetConnectionCoroutineCount(spl_object_hash($connection), true);
}
/**
* @link parent::onMessage()
* @param ConnectionInterface|mixed $connection
* @param Request|mixed $request
* @param ...$params
* @return null
* @link parent::onMessage()
*/
public function onMessage($connection, $request, ...$params)
{
if (!is_object($connection)) {
return null;
}
$connectionId = spl_object_hash($connection);
$params = func_get_args();
$res = null;
// 检测协程数
if (($consumerCount = \config('plugin.workbunny.webman-coroutine.app.consumer_count', 0)) > 0) {
// 等待协程回收
wait_for(function () use ($connectionId, $consumerCount) {
return self::getConnectionCoroutineCount($connectionId) <= $consumerCount;
});
}
$waitGroup = new WaitGroup();
$waitGroup->add();
// 请求消费协程
new Coroutine(function () use (&$res, $waitGroup, $params, $connectionId) {
$res = parent::onMessage(...$params);
// 计数 --
self::$_connectionCoroutineCount[$connectionId] --;
// 尝试回收
self::unsetConnectionCoroutineCount($connectionId);
// wg完成
$waitGroup->done();
});
// 计数 ++
self::$_connectionCoroutineCount[$connectionId] =
(isset(self::$_connectionCoroutineCount[$connectionId])
? self::$_connectionCoroutineCount[$connectionId] + 1
: 1);
// 等待
$waitGroup->wait();
return $res;
}
}
CoroutineWebServer是继承并代理了App的onMessage方法,将原本的方法执行回调化,并且做到了非侵入onMessage的执行逻辑,较为安全的支持了未来webman可能的升级改动。
另外对于workerman 4.x下的event驱动也做了兼容,除了增加了swow的事件驱动外,还重新实现了swoole的事件驱动:
<?php
/**
* @author workbunny/Chaz6chez
* @email chaz6chez1993@outlook.com
*/
declare(strict_types=1);
namespace Workbunny\WebmanCoroutine\Events;
use Swoole\Coroutine;
use Swoole\Event;
use Swoole\Process;
use Swoole\Timer;
use Workbunny\WebmanCoroutine\Exceptions\EventLoopException;
use Workerman\Events\EventInterface;
class SwooleEvent implements EventInterface
{
/** @var int[] All listeners for read event. */
protected array $_reads = [];
/** @var int[] All listeners for write event. */
protected array $_writes = [];
/** @var callable[] Event listeners of signal. */
protected array $_signals = [];
/** @var int[] Timer id to timer info. */
protected array $_timer = [];
/** @var int 定时器id */
protected int $_timerId = 0;
/**
* @param bool $debug 测试用
* @throws EventLoopException 如果没有启用拓展
*/
public function __construct(bool $debug = false)
{
if (!$debug and !extension_loaded('swoole')) {
throw new EventLoopException('Not support ext-swoole. ');
}
}
/** @inheritdoc */
public function add($fd, $flag, $func, $args = [])
{
switch ($flag) {
case EventInterface::EV_SIGNAL:
if (!isset($this->_signals[$fd])) {
if ($res = Process::signal($fd, $func)) {
$this->_signals[$fd] = $func;
}
return $res;
}
return false;
case EventInterface::EV_TIMER:
case EventInterface::EV_TIMER_ONCE:
$timerId = $this->_timerId++;
$this->_timer[$timerId] = Timer::after((int) ($fd * 1000), function () use ($timerId, $flag, $func) {
call_user_func($func);
if ($flag === EventInterface::EV_TIMER_ONCE) {
$this->del($timerId, $flag);
}
});
return $timerId;
case EventInterface::EV_READ:
if (\is_resource($fd)) {
if ($this->_reads[$key = (int) $fd] ?? null) {
$this->del($fd, EventInterface::EV_READ);
}
if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_READ)) {
$this->_reads[$key] = 1;
}
return (bool) $res;
}
return false;
case self::EV_WRITE:
if (\is_resource($fd)) {
if ($this->_writes[$key = (int) $fd] ?? null) {
$this->del($fd, EventInterface::EV_WRITE);
}
if ($res = Event::add($fd, $func, null, SWOOLE_EVENT_WRITE)) {
$this->_writes[$key] = 1;
}
return (bool) $res;
}
return false;
default:
return null;
}
}
/** @inheritdoc */
public function del($fd, $flag)
{
switch ($flag) {
case self::EV_SIGNAL:
if ($this->_signals[$fd] ?? null) {
if (Process::signal($fd, null)) {
unset($this->_signals[$fd]);
return true;
}
}
return false;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
if ($id = $this->_timer[$fd] ?? null) {
if (Timer::clear($id)) {
unset($this->_timer[$fd]);
return true;
}
}
return false;
case self::EV_READ:
if (
\is_resource($fd) and
isset($this->_reads[$key = (int) $fd]) and
Event::isset($fd, SWOOLE_EVENT_READ)
) {
if (Event::del($fd)) {
unset($this->_reads[$key]);
return true;
}
}
return false;
case self::EV_WRITE:
if (
\is_resource($fd) and
isset($this->_writes[$key = (int) $fd]) and
Event::isset($fd, SWOOLE_EVENT_WRITE)
) {
if (Event::del($fd)) {
unset($this->_writes[$key]);
return true;
}
}
return false;
default:
return null;
}
}
/** @inheritdoc */
public function loop()
{
// 阻塞等待
Event::wait();
// 确定loop为退出状态
exit(0);
}
/** @inheritdoc */
public function destroy()
{
// 移除所有定时器
$this->clearAllTimer();
// 退出所有协程
foreach (Coroutine::listCoroutines() as $coroutine) {
Coroutine::cancel($coroutine);
}
// 退出event loop
Event::exit();
$this->_reads = $this->_writes = [];
}
/** @inheritdoc */
public function clearAllTimer()
{
foreach ($this->_timer as $id) {
Timer::clear($id);
}
$this->_timer = [];
}
/** @inheritdoc */
public function getTimerCount()
{
return count($this->_timer);
}
}
在测试workerman 5.x的过程中还找到了一些workerman的swoole驱动的bug,我进行了PR,积极参与维护,fix: all coroutines must be canceled before Event::exit #1059
其他更多特性及功能请参考插件文档,插件也支持纯workerman开发环境,webman-coroutine文档
$a = new \stdClass();
$a->id = 1;
new Coroutine(function () use ($a) {
// 一些业务逻辑
$a->id = 2;
})
new Coroutine(function () use ($a) {
// 一些业务逻辑
$a->id = 3;
})
// 等待所有协程结束
// 由于每个协程的逻辑中可能存在协程切换出让,结合对象是堆数据且引用,最后的结果不能保证是1或者2或者3
// 数组同理
echo $a->id;
$a = 1;
new Coroutine(function () use (&$a) {
// 一些业务逻辑
$a = 2;
})
new Coroutine(function () use (&$a) {
// 一些业务逻辑
$a = 3;
})
// 等待所有协程结束
// 由于每个协程的逻辑中可能存在协程切换出让,变量是引用,最后的结果不能保证是1或者2或者3
echo $a;
static array $context = [];
$a = 1;
$id1 = new Coroutine(function () use (&$id1) {
$contextA = self::$context[$id]
// 一些业务逻辑
self::$context[$id1] = 2;
})
self::$context[$id1] = $a;
$id2 = new Coroutine(function () use (&$id2) {
$contextB = self::$context[$id]
// 一些业务逻辑
self::$context[$id2] = 3;
})
self::$context[$id1] = $a;
// 等待所有协程结束
// 这里会输出1
echo $a;
// 读取上下文内容,获取协程结果, 一般这里不推荐直接上下文读取,而是通过CSP模型的channel进行传递
// 还要注意上下文的回收,避免静态数组膨胀
echo self::$context[$id1];
echo self::$context[$id2];
// 以上并不是完整的上下文实现方案,只是一个伪代码!!
PDO在发送SQL后会阻塞等待SQL的执行结果,swow和swoole在底层hook了阻塞等待的过程,进行了协程切换
以pdo的mysql举例:
// https://github.com/php/php-src/blob/master/ext/pdo_mysql/mysql_driver.c
static zend_long mysql_handle_doer(pdo_dbh_t *dbh, const zend_string *sql)
{
pdo_mysql_db_handle *H = (pdo_mysql_db_handle *)dbh->driver_data;
PDO_DBG_ENTER("mysql_handle_doer");
PDO_DBG_INF_FMT("dbh=%p", dbh);
PDO_DBG_INF_FMT("sql=%.*s", (int)ZSTR_LEN(sql), ZSTR_VAL(sql));
if (mysql_real_query(H->server, ZSTR_VAL(sql), ZSTR_LEN(sql))) {
pdo_mysql_error(dbh);
PDO_DBG_RETURN(-1);
} else {
my_ulonglong c = mysql_affected_rows(H->server);
if (c == (my_ulonglong) -1) {
pdo_mysql_error(dbh);
PDO_DBG_RETURN(H->einfo.errcode ? -1 : 0);
} else {
/* MULTI_QUERY support - eat up all unfetched result sets */
MYSQL_RES* result;
while (mysql_more_results(H->server)) {
if (mysql_next_result(H->server)) {
pdo_mysql_error(dbh);
PDO_DBG_RETURN(-1);
}
result = mysql_store_result(H->server);
if (result) {
mysql_free_result(result);
}
}
PDO_DBG_RETURN((int)c);
}
}
}
以上代码可以简单理解为以下伪代码
$requestId = $mysqlClient->send('SQL');
while (1) {
$res = $mysqlClient->get($requestId);
if ($res) {
return $res;
}
// 超时等其他机制
// 协程sleep出让
}
$mysqlClient->send('SQL');
由DB服务器接收并依次执行(来源于同一个连接的多次SQL是顺序执行),但可能存在后者的协程结果唤起了前者协程的$res = $mysqlClient->get($requestId);
,从而导致数据错乱;这里本质上是因为PDO对象是堆数据,在多个协程中是竞态的,为了避免这样的情况,有以下方案解决:
目前webman/workerman的协程实现仅仅只是入了个门,主要解决了阻塞退化问题,能够简单的实现以下场景:
但还有很多基建需要社区出谋出力添砖加瓦,比如:
当然,在此之前,你可以使用所有基于swow\swoole\ripple\revolt协程驱动开发的协程版组件,但我希望未来可以整合这些协程实现的组件,能够有一个统一的使用方式(虽然难度相当大,但也想试试);
欢迎大佬们共建,issue和PR!
文章如有错误,敬请指正。谢谢!!
目前已经实现了较为基础的Utils\Pool
工具,可用于对象池化的实现
Utils\Poo\Debugger
可用于检测待池化的对象是否存在非法和风险,抛出的异常可以自行捕获进行日志监控或者是调试,详细参考测试用例
Utils\Poo\Pool
可用于实现连接池、资源锁等,具体可参考文档建议和意见都可以提交issue
欢迎各位大佬的PR!
大佬厉害
good job
学习了
前排
赞
感谢大佬分享!!!
兔神牛皮
大佬厉害,666
协程版的数据库 本质实现是 连接对象池吗
PDO的对象与数据库客户端连接一一对应,连接对象池打破了单例连接的这种做法,所以相同的数据库会存在多个客户端连接,这个对象池主要是为了合理的去管理上下文问题
看不懂,但依然给你点赞
有不懂的地方可以在这里提问,我尽我所能解答疑问
赞
大佬玩底层,我们只能摸摸大佬的风,也想PR出点力,奈何能力不够。
使用过程中有任何需要实现的特性或者找到的bug🐞也可以积极提issue呀,代码的注释很齐全,也可以看看源码,总有机会提pr的,加油!
大师👍
666
赞赞赞
兔子大佬太强了
大佬就是大佬,底层都能改,我看底层代码都费劲,看5分钟就能睡着
老哥依旧稳定输出
先赞后看
赞👍
先赞后看👍
发现没有点赞的地方
好文~
👍👍👍👍👍👍
👍👍👍👍👍👍
👍👍👍👍👍👍
👍👍👍👍👍👍
👍👍👍👍👍👍
👍👍👍👍👍👍
如此好的文章 应该置顶推荐
牛皮
看起来有点迷糊,那么现在的情况是由于workerman的数据库不支持连接池,所以使用这个协程组件的情况下不能进行数据库操作吗
数据库连接池暂不支持
插件内有Pool工具,通过Pool自行封装数据库或者连接相关的工具就可以了,暂时还不能直接使用webman/workerman的数据库插件
大佬,请问这个插件有生产环境使用吗?我目前正在为公司项目寻找框架,选定webman+gatewaywork,但是苦于没有协程,看到你这个插件,想用,又担心有坑
群里有大佬已经准备上生产了,我自己的项目目前还在测试环境没有上生产,这个项目目前是重心开发的,遇到任何问题都可以咨询
有群吗?
webman微信群
几群?
我都在
大佬,读了下文档,有以下几个问题不懂
1、webman 的webserver 要同时开启 CoroutineWebServer和webman自带的server吗?那样岂不是对外暴露了2个端口,webman自带的可以去掉吗?
2、操作数据库可以用hypderf/database 解决连接池问题吗?
3、上下文,用webman自带的context类保存包括对象,数组这些数据类型可以吗?还是必须要用协程id作为key区别开来
1.可以关闭webman自带的server
2.使用swoole驱动可以使用hyperf的db,但需要自己引入框架
3.一般使用无需关注上下文,跨协程处理数据建议使用channel
我想用到的上下文其实就是想保存当前这个请求的全局变量,类似fpm下的静态属性,处理完这个请求就销毁的
你思考的稍微简单了一些
怎么说?目前项目中需要的就是请求级别的变量,暂时没有跨协程的
请求+容器就跨协程啊,我建议如果想要上生产,自身对协程没有那么熟悉的话,直接用hyperf全家套
3.我理解的实际上swoole的协程都在同一个进程空间,可以共同使用进程内存资源,所以多个协程利用同一个数组保存数据是ok的,而webman自带的Context类就是根据协程id作为数组key,区分不同的协程资源的,适用的场景就是不需要协程间协作的场景;
而使用swoole的channel适用于需要协程间协作的场景,比如,一次请求下要记录到数据库、记录日志,并将记录的结果响应给调用方,这种情况下解决方案就是:
1.父协程创建协程A和协程B分别处理这两件事
2.父协程阻塞当前协程,等待协程A和协程B的执行结果
3.父协程得到了结果,并响应给调用放。
上述的情况使用共享变量的情况下就很难处理,想要阻塞父协程,不使用内置的waitGroup的情况下,就必须在父协程写个循环体,并且为了不阻塞进程,还需要写个IO,即sleep ,while(true){ sleep(1)}
而使用channel就很方便,直接在父协程pop两次,协程A完成后push(),协程B完成后push(),伪代码:
$chan = new Channel();
go(function(){
//
$chan->push()
})
go(function(){
//
$chan->push()
})
$chan->pop()
$chan->pop()
return response("ok");
workerman如果使用协程,协程也在一个进程空间,数据引用在协程之间是不隔离的,比如&的数据,比如对象,比如资源类型,比如数组,只要是引用类型的数据都存在协程间的竞争状态,这样的竞争状态会导致数据可能被污染,为了达到数据不被污染的效果,除了对数据加锁外,还可以通过channel进行有序传递,也就是当一个数据正在被一个协程消费的时候,其他的协程是没有从通道内获取到数据的,直到获取到数据的协程消费完毕,变相的其实也是一种锁的机制,在pop不到数据的时候协程会自动出让控制权;不论是上下文还是通道还是sync锁,都是一种竞争数据的并发安全操作。
先赞后看,养成习惯。