目前需求是 第三方接口有請(qǐng)求限制
接口請(qǐng)求頻率限制:200次/秒。
消息條數(shù)限制:12000條/分鐘。按接收消息數(shù)量計(jì)算條數(shù),若一次發(fā)送給500個(gè)用戶,計(jì)作500條。每次最多500用戶
目前使用方案是動(dòng)態(tài)雙維度限流方案,
當(dāng)投遞數(shù)據(jù)交小時(shí) 是不會(huì)出現(xiàn)錯(cuò)誤的,較大就會(huì)
目前是4個(gè)進(jìn)程
初步懷疑是不是一直投遞,然后隊(duì)列消費(fèi)不滿足規(guī)則后,又投遞,導(dǎo)致redis響應(yīng)體過(guò)大超出緩存區(qū)導(dǎo)致的
模擬投遞
// 每次500用戶)
// 預(yù)期結(jié)果:
// - 前24次請(qǐng)求成功(24×500=12000)
// - 后續(xù)請(qǐng)求自動(dòng)延遲到下一分鐘
// - 每秒請(qǐng)求數(shù)不超過(guò)200次
for ($i = 1; $i <= 100; $i++) {
$users = range(1, 500); // 每次500用戶
SmartBatchProducer::push($users, $i);
}
投遞代碼
<?php
namespace mineout\timTool;
class SmartBatchProducer
{
const MINUTE_LIMIT = 12000; // 每分鐘最大消息數(shù)
const SECOND_LIMIT = 200; // 每秒最大請(qǐng)求數(shù)
const MAX_BATCH_SIZE = 500; // 每批最大用戶數(shù)
public static function push(array $userIds, string $message)
{
$optimalBatch = self::calculateOptimalBatch();
$batches = array_chunk($userIds, min($optimalBatch, self::MAX_BATCH_SIZE));
foreach ($batches as $key => $batch) {
\Webman\RedisQueue\Redis::connection('batchPush')->send('queue_tim_batch_push', [
'users' => $batch,
'message' => $message . '-' . $key,
]);
}
}
private static function calculateOptimalBatch()
{
$currentMinute = (int)(time() / 60);
$used = self::getMinRedis($currentMinute);
$remaining = 12000 - $used;
$elapsed = time() % 60;
$timeLeft = max(60 - $elapsed, 1); // 防除零
// QPS限制下的最大批量(每批500用戶消耗24次請(qǐng)求配額)
$maxByQps = floor(200 * $timeLeft * 500 / 12000);
// 配額限制下的最大批量
$maxByQuota = floor($remaining * 500 / (200 * $timeLeft));
// 動(dòng)態(tài)調(diào)整系數(shù)(根據(jù)時(shí)間衰減)
$decayFactor = 1 - ($elapsed / 60) * 0.5; // 時(shí)間越晚越保守
$finalBatch = min(500, max(10, min($maxByQps, $maxByQuota) * $decayFactor));
return (int)$finalBatch;
}
private static function getMinRedis($currentMinute)
{
return \Webman\RedisQueue\Redis::connection('batchPush')->get("im:min:{$currentMinute}") ?: 0;
}
}
隊(duì)列消費(fèi)者
<?php
namespace app\queue\timBatchPush;
use support\Log;
use Webman\RedisQueue\Consumer;
/**
* 創(chuàng)建批量推送隊(duì)列
* @package app\queue\timGroup
*/
class TimBatchPush implements Consumer
{
public $queue = 'queue_tim_batch_push';
public $connection = 'batchPush';
const MINUTE_LIMIT = 12000; // 每分鐘最大消息數(shù)
const SECOND_LIMIT = 200; // 每秒最大請(qǐng)求數(shù)
const MAX_BATCH_SIZE = 500; // 每批最大用戶數(shù)
// Lua腳本(原子化雙維度檢查)
const LUA_SCRIPT = <<<LUA
--[[
騰訊IM雙維度動(dòng)態(tài)限流腳本(生產(chǎn)級(jí)優(yōu)化版)
優(yōu)化點(diǎn):
1. 動(dòng)態(tài)計(jì)算有效分片范圍,避免無(wú)效查詢
2. 確保dynamic_limit最小值為1
3. 精確計(jì)算最早可用時(shí)間
4. 增強(qiáng)邊界條件處理
--]]
-- KEYS[1]: 分鐘級(jí)計(jì)數(shù)器鍵前綴(示例:im:min)
-- KEYS[2]: 秒級(jí)分片鍵前綴(示例:im:win)
-- ARGV[1]: 當(dāng)前時(shí)間戳(毫秒)
-- ARGV[2]: 本批次用戶數(shù)量
-- ARGV[3]: 最大消息數(shù)(12000)
-- ARGV[4]: 最大請(qǐng)求次數(shù)(200)
-- ARGV[5]: 時(shí)間窗口(毫秒,60000)
local WINDOW_SPLIT = 200 -- 分片粒度200ms
local current_time = tonumber(ARGV[1])
local window_start = current_time - ARGV[5]
local MAX_PARTS = math.ceil(ARGV[5]/WINDOW_SPLIT) -- 計(jì)算總窗口分片數(shù)
-- ===== 分鐘級(jí)檢查 =====
local current_minute = math.floor(current_time / 60000)
local min_key = KEYS[1]..":"..current_minute
local min_count = tonumber(redis.call('GET', min_key) or 0)
local remaining = tonumber(ARGV[3]) - min_count
if remaining <= 0 then
return {0, 'minute',
60 - (current_time % 60000)/1000, -- 剩余時(shí)間(秒)
0, -- 當(dāng)前窗口請(qǐng)求數(shù)
0 -- 動(dòng)態(tài)限制值
}
end
-- ===== 動(dòng)態(tài)窗口計(jì)算 =====
local current_part = math.floor(current_time / WINDOW_SPLIT)
local start_part = math.floor(window_start / WINDOW_SPLIT)
-- 生成有效分片范圍(僅包含時(shí)間窗口內(nèi)的分片)
local active_parts = {}
for i=0, MAX_PARTS-1 do
local part_id = current_part - i
if part_id * WINDOW_SPLIT >= window_start then
table.insert(active_parts, part_id)
else
break -- 超出窗口范圍的分片無(wú)需處理
end
end
-- 構(gòu)建分片鍵集合
local keys = {}
for _, part_id in ipairs(active_parts) do
table.insert(keys, KEYS[2]..":"..part_id) -- 計(jì)數(shù)鍵
table.insert(keys, KEYS[2]..":ts:"..part_id) -- 時(shí)間戳鍵
end
-- 批量獲取分片數(shù)據(jù)
local responses = {}
if #keys > 0 then
responses = redis.call('MGET', unpack(keys))
end
-- 統(tǒng)計(jì)有效請(qǐng)求數(shù)
local total_reqs = 0
local oldest_valid_ts = current_time
for i=1, #responses, 2 do
local count = tonumber(responses[i]) or 0
local ts = tonumber(responses[i+1]) or 0
-- 精確時(shí)間窗口校驗(yàn)
if ts >= window_start then
total_reqs = total_reqs + count
if ts < oldest_valid_ts then
oldest_valid_ts = ts -- 記錄最早有效分片時(shí)間
end
end
end
-- ===== 動(dòng)態(tài)速率計(jì)算 =====
local time_elapsed = (current_time % 60000) / 1000
local time_left = math.max(60 - time_elapsed, 0.1)
local raw_limit = remaining / (time_left * tonumber(ARGV[2]))
local dynamic_limit = math.max(1, math.min(
tonumber(ARGV[4]),
math.floor(raw_limit + 0.5) -- 四舍五入且最小值1
))
-- ===== 限流判斷 =====
if total_reqs >= dynamic_limit then
local retry_after = (oldest_valid_ts + ARGV[5] - current_time) / 1000
retry_after = math.max(retry_after, 0.1) -- 最小延遲0.1秒
return {0, 'second',
retry_after, -- 精確重試時(shí)間(秒)
total_reqs, -- 當(dāng)前窗口請(qǐng)求數(shù)
dynamic_limit -- 動(dòng)態(tài)限制值
}
end
-- ===== 通過(guò)檢查,更新數(shù)據(jù) =====
-- 更新當(dāng)前分片
local current_win_key = KEYS[2]..":"..current_part
redis.call('INCRBY', current_win_key, 1)
redis.call('PEXPIRE', current_win_key, ARGV[5] + 2000)
-- 記錄分片時(shí)間戳
local ts_key = KEYS[2]..":ts:"..current_part
redis.call('SET', ts_key, current_time, 'PX', ARGV[5] + 2000)
-- 更新分鐘級(jí)計(jì)數(shù)
redis.call('INCRBY', min_key, tonumber(ARGV[2]))
redis.call('PEXPIRE', min_key, 61000)
return {1,
dynamic_limit,
math.floor(remaining / time_left),
total_reqs + 1,
#active_parts
}
LUA;
public function consume($data)
{
$logs = PHP_EOL . "[DATA]\t" . var_export($data['message'], true) . PHP_EOL;
// 參數(shù)驗(yàn)證
if (empty($data['users']) || !is_array($data['users'])) {
throw new \InvalidArgumentException("無(wú)效的用戶數(shù)據(jù)");
}
$users = $data['users'];
$userCount = count($users);
$currentTimestamp = microtime(true);
// 執(zhí)行原子檢查
$result = $this->getRedis()->eval(
self::LUA_SCRIPT,
[
$this->minuteKey(), // KEYS[1]
$this->secondKey(), // KEYS[2]
$currentTimestamp, // ARGV[1]
$userCount, // ARGV[2]
self::MINUTE_LIMIT, // ARGV[3]
self::SECOND_LIMIT, // ARGV[4]
60000,
],
2
);
if ($result[0] === 1) {
$this->sendToTencentIM($users, $data['message']);
$logs .= "[INFO]\t 發(fā)送成功, allowed_qps=> {$result[1]} 次/秒" . PHP_EOL;
$this->writeLog($logs);
} else {
$this->handleRateLimit($data, $result, $logs);
}
}
private function getRedis()
{
return \Webman\RedisQueue\Redis::connection('batchPush');
}
private function minuteKey(): string
{
return 'im:min:' . (int)(time() / 60);
}
private function secondKey(): string
{
$currentSecond = (int)time();
$shard = crc32((string)$currentSecond) % 16;
return "im:sec:{$currentSecond}:{$shard}";
}
private function sendToTencentIM(array $users, string $message)
{
// 實(shí)際請(qǐng)求代碼
}
private function handleRateLimit($data, array $result, $logs)
{
$retryData = $data;
// 計(jì)算精確延遲
$delay = match ($result[1]) {
'minute' => 60 - (time() % 60) + 1,
'second' => max(ceil($result[2] - microtime(true)), 0.1),
default => 1
};
// 添加隨機(jī)抖動(dòng)(±5%)
$delay *= mt_rand(950, 1050) / 1000;
$delaySeconds = ceil($delay);
\Webman\RedisQueue\Redis::connection('batchPush')->send($this->queue, $retryData, $delaySeconds);
$allowed_qps = $result[3] ?? 0;
$logs .= "[WARNING]\t 觸發(fā)限流, type=> {$result[1]}, allowed_qps=> {$allowed_qps} ,delay=> {$delaySeconds}" . PHP_EOL;
$this->writeLog($logs);
}
public function onConsumeFailure(\Throwable $e, $package)
{
$data = $package['data'];
$attempts = $package['attempts'];
$logs = PHP_EOL . "[DATA]\t" . var_export($data['message'], true) . PHP_EOL;
if ($attempts == 5) {
$logs .= "[ERROR]\t 發(fā)送失敗, 超過(guò)最大重試次數(shù),不在重試" . PHP_EOL;
$logs .= "[INFO]\t 錯(cuò)誤信息:" . $e->getMessage() . PHP_EOL;
$this->writeLog($logs);
} else {
$delay = $package['max_attempts'] * ($attempts + 1);
$logs .= "[ERROR]\t 發(fā)送失敗,{$delay}秒后重試" . PHP_EOL;
$logs .= "[INFO]\t 錯(cuò)誤信息:" . $e->getMessage() . PHP_EOL;
$this->writeLog($logs);
}
}
private function writeLog($log)
{
Log::channel('tim_batch_push')->log('info', $log);
}
}
ValueError: strpos(): Argument #3 ($offset) must be contained in argument #1 ($haystack) in /webman-v2/vendor/workerman/redis/src/Protocols/Rephp:53
RuntimeException: Protocol Workerman\Redis\Protocols\Redis Error package. package_length=-1 in webman-v2/vendor/workerman/workerman/Connection/TcpConnection.php:724
Stack trace:
系統(tǒng):macos
vendor/webman/redis-queue/src/Redis.php
加三行代碼
if (strlen($buffer) < $pos + 2) {
return 0;
}
試下
測(cè)試了一下,是沒(méi)有在報(bào)錯(cuò)了, 位置是在vendor/workman/redis/src/Protocols/Redis, 同時(shí)在改文件下面的decode方法內(nèi)添加改代碼?144行處,``` case '':
if(0 === strpos($buffer, '-1')) {
return [$type, null];
}
$pos = \strpos($buffer, "\r\n");
$value = [];
$count = (int)substr($buffer, 1, $pos - 1);
while ($count --) {
if (strlen($buffer) < $pos + 2) {
return 0;
}