队列中使用redis 出现错误

Chuckle

问题描述

目前需求是 第三方接口有请求限制
接口请求频率限制:200次/秒。
消息条数限制:12000条/分钟。按接收消息数量计算条数,若一次发送给500个用户,计作500条。每次最多500用户
目前使用方案是动态双维度限流方案,
当投递数据交小时 是不会出现错误的,较大就会
目前是4个进程

初步怀疑是不是一直投递,然后队列消费不满足规则后,又投递,导致redis响应体过大超出缓存区导致的

程序代码

模拟投递

// 每次500用户)
        // 预期结果:
        // - 前24次请求成功(24×500=12000)
        // - 后续请求自动延迟到下一分钟
        // - 每秒请求数不超过200次
        for ($i = 1; $i <= 100; $i++) {
            $users = range(1, 500); // 每次500用户
            SmartBatchProducer::push($users, $i);
        }

投递代码

<?php

namespace mineout\timTool;

class SmartBatchProducer
{
    const MINUTE_LIMIT = 12000;  // 每分钟最大消息数
    const SECOND_LIMIT = 200;    // 每秒最大请求数
    const MAX_BATCH_SIZE = 500;  // 每批最大用户数

    public static function push(array $userIds, string $message)
    {
        $optimalBatch = self::calculateOptimalBatch();
        $batches = array_chunk($userIds, min($optimalBatch, self::MAX_BATCH_SIZE));
        foreach ($batches as $key => $batch) {
            \Webman\RedisQueue\Redis::connection('batchPush')->send('queue_tim_batch_push', [
                'users' => $batch,
                'message' => $message . '-' . $key,
            ]);
        }
    }

    private static function calculateOptimalBatch()
    {
        $currentMinute = (int)(time() / 60);
        $used = self::getMinRedis($currentMinute);
        $remaining = 12000 - $used;
        $elapsed = time() % 60;
        $timeLeft = max(60 - $elapsed, 1); // 防除零

        // QPS限制下的最大批量(每批500用户消耗24次请求配额)
        $maxByQps = floor(200 * $timeLeft * 500 / 12000);

        // 配额限制下的最大批量
        $maxByQuota = floor($remaining * 500 / (200 * $timeLeft));

        // 动态调整系数(根据时间衰减)
        $decayFactor = 1 - ($elapsed / 60) * 0.5; // 时间越晚越保守
        $finalBatch = min(500, max(10, min($maxByQps, $maxByQuota) * $decayFactor));

        return (int)$finalBatch;
    }

    private static function getMinRedis($currentMinute)
    {
        return \Webman\RedisQueue\Redis::connection('batchPush')->get("im:min:{$currentMinute}") ?: 0;
    }
}

队列消费者

<?php

namespace app\queue\timBatchPush;

use support\Log;
use Webman\RedisQueue\Consumer;

/**
 * 创建批量推送队列
 * @package app\queue\timGroup
 */
class TimBatchPush implements Consumer
{
    public $queue = 'queue_tim_batch_push';

    public $connection = 'batchPush';

    const MINUTE_LIMIT = 12000;  // 每分钟最大消息数
    const SECOND_LIMIT = 200;    // 每秒最大请求数
    const MAX_BATCH_SIZE = 500;  // 每批最大用户数

    // Lua脚本(原子化双维度检查)
    const LUA_SCRIPT = <<<LUA
--[[
  腾讯IM双维度动态限流脚本(生产级优化版)
  优化点:
  1. 动态计算有效分片范围,避免无效查询
  2. 确保dynamic_limit最小值为1
  3. 精确计算最早可用时间
  4. 增强边界条件处理
--]]

-- KEYS[1]: 分钟级计数器键前缀(示例:im:min)
-- KEYS[2]: 秒级分片键前缀(示例:im:win)
-- ARGV[1]: 当前时间戳(毫秒)
-- ARGV[2]: 本批次用户数量
-- ARGV[3]: 最大消息数(12000)
-- ARGV[4]: 最大请求次数(200)
-- ARGV[5]: 时间窗口(毫秒,60000)

local WINDOW_SPLIT = 200 -- 分片粒度200ms
local current_time = tonumber(ARGV[1])
local window_start = current_time - ARGV[5]
local MAX_PARTS = math.ceil(ARGV[5]/WINDOW_SPLIT) -- 计算总窗口分片数

-- ===== 分钟级检查 =====
local current_minute = math.floor(current_time / 60000)
local min_key = KEYS[1]..":"..current_minute
local min_count = tonumber(redis.call('GET', min_key) or 0)
local remaining = tonumber(ARGV[3]) - min_count

if remaining <= 0 then
    return {0, 'minute', 
        60 - (current_time % 60000)/1000, -- 剩余时间(秒)
        0,  -- 当前窗口请求数
        0   -- 动态限制值
    }
end

-- ===== 动态窗口计算 =====
local current_part = math.floor(current_time / WINDOW_SPLIT)
local start_part = math.floor(window_start / WINDOW_SPLIT)

-- 生成有效分片范围(仅包含时间窗口内的分片)
local active_parts = {}
for i=0, MAX_PARTS-1 do
    local part_id = current_part - i
    if part_id * WINDOW_SPLIT >= window_start then
        table.insert(active_parts, part_id)
    else
        break -- 超出窗口范围的分片无需处理
    end
end

-- 构建分片键集合
local keys = {}
for _, part_id in ipairs(active_parts) do
    table.insert(keys, KEYS[2]..":"..part_id)       -- 计数键
    table.insert(keys, KEYS[2]..":ts:"..part_id)    -- 时间戳键
end

-- 批量获取分片数据
local responses = {}
if #keys > 0 then
    responses = redis.call('MGET', unpack(keys))
end

-- 统计有效请求数
local total_reqs = 0
local oldest_valid_ts = current_time
for i=1, #responses, 2 do
    local count = tonumber(responses[i]) or 0
    local ts = tonumber(responses[i+1]) or 0

    -- 精确时间窗口校验
    if ts >= window_start then
        total_reqs = total_reqs + count
        if ts < oldest_valid_ts then
            oldest_valid_ts = ts -- 记录最早有效分片时间
        end
    end
end

-- ===== 动态速率计算 =====
local time_elapsed = (current_time % 60000) / 1000
local time_left = math.max(60 - time_elapsed, 0.1)
local raw_limit = remaining / (time_left * tonumber(ARGV[2]))
local dynamic_limit = math.max(1, math.min(
    tonumber(ARGV[4]),
    math.floor(raw_limit + 0.5) -- 四舍五入且最小值1
))

-- ===== 限流判断 =====
if total_reqs >= dynamic_limit then
    local retry_after = (oldest_valid_ts + ARGV[5] - current_time) / 1000
    retry_after = math.max(retry_after, 0.1) -- 最小延迟0.1秒
    return {0, 'second', 
        retry_after,     -- 精确重试时间(秒)
        total_reqs,      -- 当前窗口请求数
        dynamic_limit    -- 动态限制值
    }
end

-- ===== 通过检查,更新数据 =====
-- 更新当前分片
local current_win_key = KEYS[2]..":"..current_part
redis.call('INCRBY', current_win_key, 1)
redis.call('PEXPIRE', current_win_key, ARGV[5] + 2000)

-- 记录分片时间戳
local ts_key = KEYS[2]..":ts:"..current_part
redis.call('SET', ts_key, current_time, 'PX', ARGV[5] + 2000)

-- 更新分钟级计数
redis.call('INCRBY', min_key, tonumber(ARGV[2]))
redis.call('PEXPIRE', min_key, 61000)

return {1, 
    dynamic_limit, 
    math.floor(remaining / time_left), 
    total_reqs + 1, 
    #active_parts
}
LUA;

    public function consume($data)
    {
        $logs = PHP_EOL . "[DATA]\t" . var_export($data['message'], true) . PHP_EOL;
        // 参数验证
        if (empty($data['users']) || !is_array($data['users'])) {
            throw new \InvalidArgumentException("无效的用户数据");
        }

        $users = $data['users'];
        $userCount = count($users);
        $currentTimestamp = microtime(true);

        // 执行原子检查
        $result = $this->getRedis()->eval(
            self::LUA_SCRIPT,
            [
                $this->minuteKey(),      // KEYS[1]
                $this->secondKey(),      // KEYS[2]
                $currentTimestamp,       // ARGV[1]
                $userCount,              // ARGV[2]
                self::MINUTE_LIMIT,      // ARGV[3]
                self::SECOND_LIMIT,       // ARGV[4]
                60000,
            ],
            2
        );

        if ($result[0] === 1) {
            $this->sendToTencentIM($users, $data['message']);
            $logs .= "[INFO]\t 发送成功, allowed_qps=> {$result[1]} 次/秒" . PHP_EOL;
            $this->writeLog($logs);
        } else {
            $this->handleRateLimit($data, $result, $logs);
        }

    }

    private function getRedis()
    {
        return \Webman\RedisQueue\Redis::connection('batchPush');
    }

    private function minuteKey(): string
    {
        return 'im:min:' . (int)(time() / 60);
    }

    private function secondKey(): string
    {
        $currentSecond = (int)time();
        $shard = crc32((string)$currentSecond) % 16;
        return "im:sec:{$currentSecond}:{$shard}";
    }

    private function sendToTencentIM(array $users, string $message)
    {
        // 实际请求代码

    }

    private function handleRateLimit($data, array $result, $logs)
    {
        $retryData = $data;

        // 计算精确延迟
        $delay = match ($result[1]) {
            'minute' => 60 - (time() % 60) + 1,
            'second' => max(ceil($result[2] - microtime(true)), 0.1),
            default => 1
        };

        // 添加随机抖动(±5%)
        $delay *= mt_rand(950, 1050) / 1000;
        $delaySeconds = ceil($delay);

        \Webman\RedisQueue\Redis::connection('batchPush')->send($this->queue, $retryData, $delaySeconds);
        $allowed_qps = $result[3] ?? 0;
        $logs .= "[WARNING]\t 触发限流, type=> {$result[1]}, allowed_qps=> {$allowed_qps} ,delay=> {$delaySeconds}" . PHP_EOL;
        $this->writeLog($logs);

    }

    public function onConsumeFailure(\Throwable $e, $package)
    {
        $data = $package['data'];
        $attempts = $package['attempts'];
        $logs = PHP_EOL . "[DATA]\t" . var_export($data['message'], true) . PHP_EOL;
        if ($attempts == 5) {
            $logs .= "[ERROR]\t 发送失败, 超过最大重试次数,不在重试" . PHP_EOL;
            $logs .= "[INFO]\t 错误信息:" . $e->getMessage() . PHP_EOL;
            $this->writeLog($logs);
        } else {
            $delay = $package['max_attempts'] * ($attempts + 1);
            $logs .= "[ERROR]\t 发送失败,{$delay}秒后重试" . PHP_EOL;
            $logs .= "[INFO]\t 错误信息:" . $e->getMessage() . PHP_EOL;
            $this->writeLog($logs);
        }
    }

    private function writeLog($log)
    {
        Log::channel('tim_batch_push')->log('info', $log);
    }
}

报错信息

ValueError: strpos(): Argument #3 ($offset) must be contained in argument #1 ($haystack) in /webman-v2/vendor/workerman/redis/src/Protocols/Rephp:53

RuntimeException: Protocol Workerman\Redis\Protocols\Redis Error package. package_length=-1 in webman-v2/vendor/workerman/workerman/Connection/TcpConnection.php:724
Stack trace:

截图报错信息里报错文件相关代码

截图

操作系统及workerman/webman等框架组件具体版本

系统:macos
截图

93 1 0
1个回答

walkor 打赏

vendor/webman/redis-queue/src/Redis.php
截图
加三行代码

if (strlen($buffer) < $pos + 2) {
    return 0;
}

试下

  • 暂无评论
×
🔝