国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

FlowerMQ 基于workerman和redis實(shí)現(xiàn)的消息隊(duì)列

dazhaozhao

FlowerMQ

FlowerMQ 一個(gè)基于Workerman和Redis實(shí)現(xiàn)的消息隊(duì)列,一個(gè)小小工具,用來(lái)給主項(xiàng)目解耦的,也支持延遲隊(duì)列,失敗嘗試這些。

運(yùn)行依賴

  • php7.2
  • Redis5.0.4上,因?yàn)橛玫絉edis Stream
  • pecl依賴,redis擴(kuò)展
  • composer依賴,workerman/workerman 4.0以上

安裝

composer安裝

composer create-project mrtwenty/flower

下載安裝

  1. 下載或者 git clone
  2. 項(xiàng)目根目錄執(zhí)行命令,composer install

原理說(shuō)明

  1. workerman實(shí)現(xiàn)消費(fèi)端,開(kāi)多個(gè)進(jìn)程,在 onWorkerStart 函數(shù)里面,阻塞讀取,阻塞間隔5秒后,就重新阻塞,因?yàn)槭亲枞?,所以用了一個(gè)字符串key來(lái)處理停止的問(wèn)題,每次阻塞5秒,就斷開(kāi),以便判斷是否需要終止程序。
  2. 可以隨時(shí)停止消費(fèi)端,因?yàn)榭蛻舳税l(fā)送的消息都會(huì)存放redis stream 隊(duì)列里面。
  3. 一個(gè)pending進(jìn)程,每隔1秒檢查是否有未ack的消息,并嘗試消費(fèi)掉
  4. 一個(gè)delay進(jìn)程,負(fù)責(zé)處理延遲消息,利用redis的zset有序集合存儲(chǔ),起一個(gè)定時(shí)器,定時(shí)獲取可以執(zhí)行的消息,寫(xiě)入消費(fèi)端
  5. 遵循約定大于配置的方式,直接用默認(rèn)的即可。
  6. 默認(rèn)配置是app目錄下的config目錄,如果需要更改配置項(xiàng),可以在項(xiàng)目根目錄下,提供一個(gè).env的配置文件,替換掉
  7. 回收裁剪機(jī)制: 有三種模式,默認(rèn) no ,不做裁剪
    1. no,不做裁剪,所有消息保留。
    2. maxlen, 最大長(zhǎng)度回收,概率性觸發(fā) xtrim maxlen mq ~ 長(zhǎng)度 。
    3. minid, 最小已讀消息回收,概率行觸發(fā),xtrim minid mq ~ 消息id ,需要 redis server 6.2.0 以上。

可用命令

win

windows下僅限于開(kāi)發(fā),不適合做生產(chǎn)環(huán)境使用,啟動(dòng)需要開(kāi)三個(gè)命令行窗口,執(zhí)行 start、pending、delay命令

  1. php index.php start 啟動(dòng)消費(fèi)隊(duì)列
  2. php index.php pending 啟動(dòng)重試隊(duì)列
  3. php index.php delay 啟動(dòng)延遲隊(duì)列
  4. php index.php test 測(cè)試,執(zhí)行此命令會(huì)發(fā)送兩個(gè)消息給服務(wù),一個(gè)是即時(shí)消息,一個(gè)是延遲消息。
  5. php monitor.php start 運(yùn)行信息查看,會(huì)啟動(dòng)一個(gè)http進(jìn)程

linux

  1. php index.php start linux啟動(dòng)相當(dāng)于執(zhí)行了 start、pending、delay命令
  2. php index.php start -d 守護(hù)進(jìn)程啟動(dòng)
  3. php index.php stop 強(qiáng)制停止,可能會(huì)導(dǎo)致消息未ack,不建議使用
  4. php index.php stop -g 優(yōu)雅停止 (不加參數(shù)-g會(huì)強(qiáng)制干掉子進(jìn)程,加參數(shù)-g的話,會(huì)等子進(jìn)程處理完后再關(guān)閉)
  5. php index.php config 查看配置信息
  6. php index.php test 測(cè)試,執(zhí)行此命令會(huì)發(fā)送兩個(gè)消息給服務(wù),一個(gè)是即時(shí)消息,一個(gè)是延遲消息。
  7. php monitor.php start 運(yùn)行信息查看,會(huì)啟動(dòng)一個(gè)http進(jìn)程

服務(wù)端說(shuō)明

  1. 下載項(xiàng)目后,配置 .env
  2. 編寫(xiě)業(yè)務(wù)邏輯,app\consumer\Run.php 只需要編寫(xiě)這里,如果代碼有curl請(qǐng)求,記得要做好超時(shí)
  3. 啟動(dòng),php index.php start 即可。

客戶端說(shuō)明

flower配備了一個(gè)客戶端,方便在別的項(xiàng)目中使用:

composer install mrtwenty/flower-client

使用方式:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
//$mq需要與服務(wù)端三個(gè)配置信息相同
$mq    = [
    'name' => 'mq',
    'delay_name' => 'mq_delay',
    'fail_list' => 'mq_fail_list'
];
$client = new Client($redis, $mq);

//立即執(zhí)行
$res = $client->add(['test' => 'data']);
var_dump($res);
//延遲消息
$res = $client->add(['test' => 'data'], 3);
var_dump($res);

問(wèn)題

1.引入MySQL

可以安裝此依賴包,當(dāng)然也可以根據(jù)自己需要用別的包

composer require workerman/mysql

配置信息可以在.env里面寫(xiě)入:

[mysql]
host = 127.0.0.1
username = root
password = 123456
database = test
port     = 3306

代碼實(shí)現(xiàn):

<?php

declare(strict_types=1);

namespace app;

use app\library\BaseInterface;

/**
 * 消費(fèi)類
 */
class Run implements BaseInterface
{
    protected $db = null;

    public function getDb()
    {
        if (is_null($this->db)) {
            $config = config('mysql');
            $host     = $config['host'];
            $port     = $config['port'];
            $user     = $config['username'];
            $password = $config['password'];
            $database  = $config['database'];
            $this->db = new \Workerman\MySQL\Connection($host, $port, $user, $password, $database);
        }
        return $this->db;
    }

    /**
     * 消費(fèi)方法,如何消費(fèi),取決用戶自己
     *
     * @param mixed $data
     * @param mixed $id
     * @return bool 返回true就會(huì)執(zhí)行ack確認(rèn)消息已消費(fèi)
     */
    public function consumer($data, $id): bool
    {
        $db   = $this->getDb();
        $info = $db->row("SELECT * FROM `short_url` WHERE id=3");
        print_r($info);
        return true;
    }

    /**
     * 超過(guò)嘗試的次數(shù),就會(huì)寫(xiě)入失敗隊(duì)列里面,并調(diào)用此方法,可以用此方法通知運(yùn)維
     *
     * @return void
     */
    public function fail($data, $id)
    {
        print_r($data);
        print_r($id);
    }
}

2. 避免內(nèi)存泄露

由于是守護(hù)進(jìn)程,為了避免php業(yè)務(wù)代碼bug隱藏的內(nèi)存泄露,可以在消費(fèi)者執(zhí)行完一定數(shù)量的時(shí)候重啟進(jìn)程。具體實(shí)現(xiàn)請(qǐng)查看workerman手冊(cè)。

鏈接1、鏈接2

相關(guān)資料

  1. redis stream 手冊(cè)

  2. pecl redis 文檔

  3. workerman 手冊(cè)

引用

  1. monitor登錄頁(yè)模板
  2. env、config類這些學(xué)自thinkphp
  3. monitor 后端的一些代碼,學(xué)自webman
  4. monitor主頁(yè)面,用的layui
  5. 延遲隊(duì)列的思路抄的workerman的redis queue
  6. 感謝workerman、thinkphp、layui、redis
2255 7 0
7個(gè)評(píng)論

Tinywan

你好!你的 重試消費(fèi)失敗的消息,超出失敗次數(shù)確認(rèn)消息,寫(xiě)入隊(duì)列 這個(gè)方法永遠(yuǎn)不會(huì)被觸發(fā)。你確定你是否測(cè)試過(guò)?我看你這個(gè)是跑不通的

//如果該消息重試次數(shù)大于3,保存到失敗隊(duì)列
if ($fail_num > $try_fail_num) {
    //消息內(nèi)容
    $data = json_decode($msg_content['data'], true);
    $this->redis->lPush($fail_list, json_encode(['time' => time(), 'data' => $data], JSON_UNESCAPED_UNICODE));
    $this->client->ack($group_name, $msg_id);
    $run->fail($data, $msg_id);
    continue;
}

你這里的 $fail_num,消息被讀取次數(shù)永遠(yuǎn)是1

  • dazhaozhao 2022-01-07

    app\consumer\Run的consumer 返回false, 消息變成已讀取未ack,就會(huì)被xpending讀取到,并重試,不行就繼續(xù)xClaim 轉(zhuǎn)移消息,次數(shù)就會(huì)加1

  • Tinywan 2022-01-07

    xClaim 轉(zhuǎn)移消息 。$fail_num 還是 1

dazhaozhao

是運(yùn)行環(huán)境問(wèn)題,還是redis或者php版本問(wèn)題?
我這里試了可以。

  • 暫無(wú)評(píng)論
dazhaozhao


我直接 composer create-project mrtwenty/flower 安裝
app\consumer\Run的consumer返回false,你說(shuō)的那個(gè)地方,我加了調(diào)試信息

[$msg_id, $consumer, $over_time, $fail_num] = $msg;
echo "fail_num", $fail_num, "\n";

  • Tinywan 2022-01-07

    $fail_num 這個(gè)變量沒(méi)有打印,打印的話一直是1

  • dazhaozhao 2022-01-08

    這個(gè)變量的值是xpending得到的,請(qǐng)看我的評(píng)論。

  • dazhaozhao 2022-01-08

    echo "fail_num", $fail_num, "\n"; 我變量有打印的,你看那個(gè)截圖,輸出都是
    fail_num1、fail_num2、fail_num3,后面的數(shù)字就是。

  • Tinywan 2022-01-08

    fail_num 這個(gè)你消息被讀取的次數(shù),你這里并沒(méi)有讀取(XreadGroup),就直接?。╔pending),就不會(huì)變的

dazhaozhao

不寫(xiě)代碼,用redis 命令執(zhí)行下流程。

    # 創(chuàng)建消息test_mq,返回消息ID 1641570877368-0
    XADD test_mq * key value
    # 創(chuàng)建消費(fèi)組 test_group
    XGROUP CREATE test_mq test_group 0
    # hehe去讀取數(shù)據(jù),不進(jìn)行xack確認(rèn)消息,xpending就會(huì)有消息
    XREADGROUP GROUP test_group hehe COUNT 1 STREAMS test_mq >
    # 查看未xack的消息,此時(shí)得到 $fail_num=1
    XPENDING test_mq test_group - + 1

    # 嘗試消費(fèi)該消息,使用xRange讀取該消息內(nèi)容,結(jié)果還是沒(méi)成ack掉,那就轉(zhuǎn)移

    # 轉(zhuǎn)移消息
    XCLAIM test_mq test_group hehe 3000 消息ID
    XCLAIM test_mq test_group hehe 3000 1641570877368-0

    # 再次查看未xack的消息,此時(shí)得到 $fail_num=2
    XPENDING test_mq test_group - + 1

    # 再次嘗試消費(fèi)該消息,結(jié)果還是沒(méi)成,如此反復(fù)

  • Tinywan 2022-01-08

    又排查了一下,你這需要6.0才支持的,而不是5.0就支持

Tinywan

Redis 版本

127.0.0.1:6379> info
# Server
redis_version:5.0.3
...
os:Linux 4.19.128-microsoft-standard x86_64

以下是不進(jìn)行xclaim

截圖

以下是進(jìn)行xclaim

截圖

我用php腳本跑結(jié)果和上面是一樣的。結(jié)果為啥和你跑的不一樣,哈哈

是不是由于你使用的是Windows版本,而我是Linux版本 os:Linux 4.19.128-microsoft-standard x86_64

再次驗(yàn)證,Redis Server 版本需要 5.0.4 及最新版本

截圖

127.0.0.1:6379[14]> info
# Server
redis_version:6.2.1
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:68b3db7bf8188804
redis_mode:standalone
os:Linux 4.15.0-137-generic x86_64
  • Tinywan 2022-01-08

    很可惜,各大云廠商現(xiàn)在都不支持6.0,最高是5.0,不支持這個(gè)消息轉(zhuǎn)移后自動(dòng)增加1

Tinywan
  • dazhaozhao 2022-01-08

    那這樣子就需要redis5.0.4以上了,謝謝你的測(cè)試,回頭我寫(xiě)在說(shuō)明里

  • Tinywan 2022-01-08

    嗯嗯!搞了兩天終于搞明白為啥了,哈哈!

小陽(yáng)光

有一個(gè)很嚴(yán)重的問(wèn)題,在你修剪消息隊(duì)列的時(shí)候,如果消息還沒(méi)消費(fèi),直接會(huì)修剪掉到?jīng)]消費(fèi)到的消息,現(xiàn)象就是某個(gè)區(qū)間的消息全部丟失,我只是看了一下代碼,沒(méi)驗(yàn)證我的想法。

  • Tinywan 2022-01-19

    他這只會(huì)修剪XPENDING 消息

  • 小陽(yáng)光 2022-01-19

    不是哦,你仔細(xì)看看代碼,測(cè)試一下,把GC評(píng)率調(diào)成100%,隊(duì)列最大長(zhǎng)度改短,消息數(shù)量超過(guò)隊(duì)列長(zhǎng)度,一定會(huì)丟失消息

  • Tinywan 2022-01-19

    你是說(shuō)他消費(fèi)失敗的消息再次消費(fèi)是會(huì)這種情況是嗎?

  • 小陽(yáng)光 2022-01-19

    不是, 最簡(jiǎn)單的浮現(xiàn)方式,你把最大隊(duì)列長(zhǎng)度改為10,GC返回true,推送1000條消息進(jìn)去, 你看消費(fèi)的消息有多少條,最多幾十條,不確定具體多少條是因?yàn)閤trim加了true參數(shù)

  • Tinywan 2022-01-19

    你是說(shuō) stream 隊(duì)列長(zhǎng)度?

  • 小陽(yáng)光 2022-01-19

    嗯, 修剪只能修剪stream ,哪有修剪XPENDING 一說(shuō)?

  • dazhaozhao 2022-01-19

    嗯,你說(shuō)的對(duì),我應(yīng)該在項(xiàng)目里面提醒一下使用這個(gè)項(xiàng)目的人,這是redis stream xtrim 本身的設(shè)計(jì),如果想要長(zhǎng)久的保存,可以將gc設(shè)置成永遠(yuǎn)不會(huì)執(zhí)行,這樣就不會(huì)執(zhí)行xtrim命令了,或者設(shè)置隊(duì)列長(zhǎng)度足夠長(zhǎng),xtrim 執(zhí)行了也不會(huì)裁剪到,這取決于開(kāi)發(fā)者的應(yīng)用場(chǎng)景,我自己用的時(shí)候,都是為了能夠及時(shí)觸發(fā)消息,基本都是即發(fā)即消費(fèi)。

  • 小陽(yáng)光 2022-01-19

    其實(shí)裁剪也是有必要的,不然久了內(nèi)存會(huì)爆炸,或者被redis清空,在裁剪的時(shí)候應(yīng)該裁剪到所有分組消費(fèi)者 最小已消費(fèi)的消息位置,不應(yīng)該是固定裁剪。

  • dazhaozhao 2022-01-19

    嗯,你這就更細(xì)膩的操作了,但是redis 好像并沒(méi)有提供相關(guān)的命令,xtrim本身是先裁剪掉舊的消息。

  • dazhaozhao 2022-01-19

    額,我剛剛?cè)タ戳讼聄edis6.2的英文手冊(cè),好像支持新的方式,這是個(gè)好消息。
    Redis version >= 6.2.0: Added the MINID trimming strategy and the LIMIT option.

  • dazhaozhao 2022-01-21

    我新增了一種gc機(jī)制,大佬幫忙看看 ,minid

年代過(guò)于久遠(yuǎn),無(wú)法發(fā)表評(píng)論

dazhaozhao

2240
積分
0
獲贊數(shù)
0
粉絲數(shù)
2016-08-17 加入
??