国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

隊(duì)列中使用redis 出現(xiàn)錯(cuò)誤

Chuckle

問(wèn)題描述

目前需求是 第三方接口有請(qǐng)求限制
接口請(qǐng)求頻率限制:200次/秒。
消息條數(shù)限制:12000條/分鐘。按接收消息數(shù)量計(jì)算條數(shù),若一次發(fā)送給500個(gè)用戶,計(jì)作500條。每次最多500用戶
目前使用方案是動(dòng)態(tài)雙維度限流方案,
當(dāng)投遞數(shù)據(jù)交小時(shí) 是不會(huì)出現(xiàn)錯(cuò)誤的,較大就會(huì)
目前是4個(gè)進(jìn)程

初步懷疑是不是一直投遞,然后隊(duì)列消費(fèi)不滿足規(guī)則后,又投遞,導(dǎo)致redis響應(yīng)體過(guò)大超出緩存區(qū)導(dǎo)致的

程序代碼

模擬投遞

// 每次500用戶)
        // 預(yù)期結(jié)果:
        // - 前24次請(qǐng)求成功(24×500=12000)
        // - 后續(xù)請(qǐng)求自動(dòng)延遲到下一分鐘
        // - 每秒請(qǐng)求數(shù)不超過(guò)200次
        for ($i = 1; $i <= 100; $i++) {
            $users = range(1, 500); // 每次500用戶
            SmartBatchProducer::push($users, $i);
        }

投遞代碼

<?php

namespace mineout\timTool;

class SmartBatchProducer
{
    const MINUTE_LIMIT = 12000;  // 每分鐘最大消息數(shù)
    const SECOND_LIMIT = 200;    // 每秒最大請(qǐng)求數(shù)
    const MAX_BATCH_SIZE = 500;  // 每批最大用戶數(shù)

    public static function push(array $userIds, string $message)
    {
        $optimalBatch = self::calculateOptimalBatch();
        $batches = array_chunk($userIds, min($optimalBatch, self::MAX_BATCH_SIZE));
        foreach ($batches as $key => $batch) {
            \Webman\RedisQueue\Redis::connection('batchPush')->send('queue_tim_batch_push', [
                'users' => $batch,
                'message' => $message . '-' . $key,
            ]);
        }
    }

    private static function calculateOptimalBatch()
    {
        $currentMinute = (int)(time() / 60);
        $used = self::getMinRedis($currentMinute);
        $remaining = 12000 - $used;
        $elapsed = time() % 60;
        $timeLeft = max(60 - $elapsed, 1); // 防除零

        // QPS限制下的最大批量(每批500用戶消耗24次請(qǐng)求配額)
        $maxByQps = floor(200 * $timeLeft * 500 / 12000);

        // 配額限制下的最大批量
        $maxByQuota = floor($remaining * 500 / (200 * $timeLeft));

        // 動(dòng)態(tài)調(diào)整系數(shù)(根據(jù)時(shí)間衰減)
        $decayFactor = 1 - ($elapsed / 60) * 0.5; // 時(shí)間越晚越保守
        $finalBatch = min(500, max(10, min($maxByQps, $maxByQuota) * $decayFactor));

        return (int)$finalBatch;
    }

    private static function getMinRedis($currentMinute)
    {
        return \Webman\RedisQueue\Redis::connection('batchPush')->get("im:min:{$currentMinute}") ?: 0;
    }
}

隊(duì)列消費(fèi)者

<?php

namespace app\queue\timBatchPush;

use support\Log;
use Webman\RedisQueue\Consumer;

/**
 * 創(chuàng)建批量推送隊(duì)列
 * @package app\queue\timGroup
 */
class TimBatchPush implements Consumer
{
    public $queue = 'queue_tim_batch_push';

    public $connection = 'batchPush';

    const MINUTE_LIMIT = 12000;  // 每分鐘最大消息數(shù)
    const SECOND_LIMIT = 200;    // 每秒最大請(qǐng)求數(shù)
    const MAX_BATCH_SIZE = 500;  // 每批最大用戶數(shù)

    // Lua腳本(原子化雙維度檢查)
    const LUA_SCRIPT = <<<LUA
--[[
  騰訊IM雙維度動(dòng)態(tài)限流腳本(生產(chǎn)級(jí)優(yōu)化版)
  優(yōu)化點(diǎn):
  1. 動(dòng)態(tài)計(jì)算有效分片范圍,避免無(wú)效查詢
  2. 確保dynamic_limit最小值為1
  3. 精確計(jì)算最早可用時(shí)間
  4. 增強(qiáng)邊界條件處理
--]]

-- KEYS[1]: 分鐘級(jí)計(jì)數(shù)器鍵前綴(示例:im:min)
-- KEYS[2]: 秒級(jí)分片鍵前綴(示例:im:win)
-- ARGV[1]: 當(dāng)前時(shí)間戳(毫秒)
-- ARGV[2]: 本批次用戶數(shù)量
-- ARGV[3]: 最大消息數(shù)(12000)
-- ARGV[4]: 最大請(qǐng)求次數(shù)(200)
-- ARGV[5]: 時(shí)間窗口(毫秒,60000)

local WINDOW_SPLIT = 200 -- 分片粒度200ms
local current_time = tonumber(ARGV[1])
local window_start = current_time - ARGV[5]
local MAX_PARTS = math.ceil(ARGV[5]/WINDOW_SPLIT) -- 計(jì)算總窗口分片數(shù)

-- ===== 分鐘級(jí)檢查 =====
local current_minute = math.floor(current_time / 60000)
local min_key = KEYS[1]..":"..current_minute
local min_count = tonumber(redis.call('GET', min_key) or 0)
local remaining = tonumber(ARGV[3]) - min_count

if remaining <= 0 then
    return {0, 'minute', 
        60 - (current_time % 60000)/1000, -- 剩余時(shí)間(秒)
        0,  -- 當(dāng)前窗口請(qǐng)求數(shù)
        0   -- 動(dòng)態(tài)限制值
    }
end

-- ===== 動(dòng)態(tài)窗口計(jì)算 =====
local current_part = math.floor(current_time / WINDOW_SPLIT)
local start_part = math.floor(window_start / WINDOW_SPLIT)

-- 生成有效分片范圍(僅包含時(shí)間窗口內(nèi)的分片)
local active_parts = {}
for i=0, MAX_PARTS-1 do
    local part_id = current_part - i
    if part_id * WINDOW_SPLIT >= window_start then
        table.insert(active_parts, part_id)
    else
        break -- 超出窗口范圍的分片無(wú)需處理
    end
end

-- 構(gòu)建分片鍵集合
local keys = {}
for _, part_id in ipairs(active_parts) do
    table.insert(keys, KEYS[2]..":"..part_id)       -- 計(jì)數(shù)鍵
    table.insert(keys, KEYS[2]..":ts:"..part_id)    -- 時(shí)間戳鍵
end

-- 批量獲取分片數(shù)據(jù)
local responses = {}
if #keys > 0 then
    responses = redis.call('MGET', unpack(keys))
end

-- 統(tǒng)計(jì)有效請(qǐng)求數(shù)
local total_reqs = 0
local oldest_valid_ts = current_time
for i=1, #responses, 2 do
    local count = tonumber(responses[i]) or 0
    local ts = tonumber(responses[i+1]) or 0

    -- 精確時(shí)間窗口校驗(yàn)
    if ts >= window_start then
        total_reqs = total_reqs + count
        if ts < oldest_valid_ts then
            oldest_valid_ts = ts -- 記錄最早有效分片時(shí)間
        end
    end
end

-- ===== 動(dòng)態(tài)速率計(jì)算 =====
local time_elapsed = (current_time % 60000) / 1000
local time_left = math.max(60 - time_elapsed, 0.1)
local raw_limit = remaining / (time_left * tonumber(ARGV[2]))
local dynamic_limit = math.max(1, math.min(
    tonumber(ARGV[4]),
    math.floor(raw_limit + 0.5) -- 四舍五入且最小值1
))

-- ===== 限流判斷 =====
if total_reqs >= dynamic_limit then
    local retry_after = (oldest_valid_ts + ARGV[5] - current_time) / 1000
    retry_after = math.max(retry_after, 0.1) -- 最小延遲0.1秒
    return {0, 'second', 
        retry_after,     -- 精確重試時(shí)間(秒)
        total_reqs,      -- 當(dāng)前窗口請(qǐng)求數(shù)
        dynamic_limit    -- 動(dòng)態(tài)限制值
    }
end

-- ===== 通過(guò)檢查,更新數(shù)據(jù) =====
-- 更新當(dāng)前分片
local current_win_key = KEYS[2]..":"..current_part
redis.call('INCRBY', current_win_key, 1)
redis.call('PEXPIRE', current_win_key, ARGV[5] + 2000)

-- 記錄分片時(shí)間戳
local ts_key = KEYS[2]..":ts:"..current_part
redis.call('SET', ts_key, current_time, 'PX', ARGV[5] + 2000)

-- 更新分鐘級(jí)計(jì)數(shù)
redis.call('INCRBY', min_key, tonumber(ARGV[2]))
redis.call('PEXPIRE', min_key, 61000)

return {1, 
    dynamic_limit, 
    math.floor(remaining / time_left), 
    total_reqs + 1, 
    #active_parts
}
LUA;

    public function consume($data)
    {
        $logs = PHP_EOL . "[DATA]\t" . var_export($data['message'], true) . PHP_EOL;
        // 參數(shù)驗(yàn)證
        if (empty($data['users']) || !is_array($data['users'])) {
            throw new \InvalidArgumentException("無(wú)效的用戶數(shù)據(jù)");
        }

        $users = $data['users'];
        $userCount = count($users);
        $currentTimestamp = microtime(true);

        // 執(zhí)行原子檢查
        $result = $this->getRedis()->eval(
            self::LUA_SCRIPT,
            [
                $this->minuteKey(),      // KEYS[1]
                $this->secondKey(),      // KEYS[2]
                $currentTimestamp,       // ARGV[1]
                $userCount,              // ARGV[2]
                self::MINUTE_LIMIT,      // ARGV[3]
                self::SECOND_LIMIT,       // ARGV[4]
                60000,
            ],
            2
        );

        if ($result[0] === 1) {
            $this->sendToTencentIM($users, $data['message']);
            $logs .= "[INFO]\t 發(fā)送成功, allowed_qps=> {$result[1]} 次/秒" . PHP_EOL;
            $this->writeLog($logs);
        } else {
            $this->handleRateLimit($data, $result, $logs);
        }

    }

    private function getRedis()
    {
        return \Webman\RedisQueue\Redis::connection('batchPush');
    }

    private function minuteKey(): string
    {
        return 'im:min:' . (int)(time() / 60);
    }

    private function secondKey(): string
    {
        $currentSecond = (int)time();
        $shard = crc32((string)$currentSecond) % 16;
        return "im:sec:{$currentSecond}:{$shard}";
    }

    private function sendToTencentIM(array $users, string $message)
    {
        // 實(shí)際請(qǐng)求代碼

    }

    private function handleRateLimit($data, array $result, $logs)
    {
        $retryData = $data;

        // 計(jì)算精確延遲
        $delay = match ($result[1]) {
            'minute' => 60 - (time() % 60) + 1,
            'second' => max(ceil($result[2] - microtime(true)), 0.1),
            default => 1
        };

        // 添加隨機(jī)抖動(dòng)(±5%)
        $delay *= mt_rand(950, 1050) / 1000;
        $delaySeconds = ceil($delay);

        \Webman\RedisQueue\Redis::connection('batchPush')->send($this->queue, $retryData, $delaySeconds);
        $allowed_qps = $result[3] ?? 0;
        $logs .= "[WARNING]\t 觸發(fā)限流, type=> {$result[1]}, allowed_qps=> {$allowed_qps} ,delay=> {$delaySeconds}" . PHP_EOL;
        $this->writeLog($logs);

    }

    public function onConsumeFailure(\Throwable $e, $package)
    {
        $data = $package['data'];
        $attempts = $package['attempts'];
        $logs = PHP_EOL . "[DATA]\t" . var_export($data['message'], true) . PHP_EOL;
        if ($attempts == 5) {
            $logs .= "[ERROR]\t 發(fā)送失敗, 超過(guò)最大重試次數(shù),不在重試" . PHP_EOL;
            $logs .= "[INFO]\t 錯(cuò)誤信息:" . $e->getMessage() . PHP_EOL;
            $this->writeLog($logs);
        } else {
            $delay = $package['max_attempts'] * ($attempts + 1);
            $logs .= "[ERROR]\t 發(fā)送失敗,{$delay}秒后重試" . PHP_EOL;
            $logs .= "[INFO]\t 錯(cuò)誤信息:" . $e->getMessage() . PHP_EOL;
            $this->writeLog($logs);
        }
    }

    private function writeLog($log)
    {
        Log::channel('tim_batch_push')->log('info', $log);
    }
}

報(bào)錯(cuò)信息

ValueError: strpos(): Argument #3 ($offset) must be contained in argument #1 ($haystack) in /webman-v2/vendor/workerman/redis/src/Protocols/Rephp:53

RuntimeException: Protocol Workerman\Redis\Protocols\Redis Error package. package_length=-1 in webman-v2/vendor/workerman/workerman/Connection/TcpConnection.php:724
Stack trace:

截圖報(bào)錯(cuò)信息里報(bào)錯(cuò)文件相關(guān)代碼

截圖

操作系統(tǒng)及workerman/webman等框架組件具體版本

系統(tǒng):macos
截圖

393 1 0
1個(gè)回答

walkor 打賞

vendor/webman/redis-queue/src/Redis.php
截圖
加三行代碼

if (strlen($buffer) < $pos + 2) {
    return 0;
}

試下

  • Chuckle 2025-04-07

    測(cè)試了一下,是沒(méi)有在報(bào)錯(cuò)了, 位置是在vendor/workman/redis/src/Protocols/Redis, 同時(shí)在改文件下面的decode方法內(nèi)添加改代碼?144行處,``` case '':
    if(0 === strpos($buffer, '
    -1')) {
    return [$type, null];
    }
    $pos = \strpos($buffer, "\r\n");
    $value = [];
    $count = (int)substr($buffer, 1, $pos - 1);
    while ($count --) {
    if (strlen($buffer) < $pos + 2) {
    return 0;
    }

  • walkor 2025-04-07

    好的,更新到 2.0.5

    composer require workerman/redis ^2.0.5 
??