FlowerMQ 一個(gè)基于Workerman和Redis實(shí)現(xiàn)的消息隊(duì)列,一個(gè)小小工具,用來(lái)給主項(xiàng)目解耦的,也支持延遲隊(duì)列,失敗嘗試這些。
composer create-project mrtwenty/flower
git clone
composer install
xtrim maxlen mq ~ 長(zhǎng)度
。xtrim minid mq ~ 消息id
,需要 redis server 6.2.0 以上。windows下僅限于開(kāi)發(fā),不適合做生產(chǎn)環(huán)境使用,啟動(dòng)需要開(kāi)三個(gè)命令行窗口,執(zhí)行 start、pending、delay命令
flower配備了一個(gè)客戶端,方便在別的項(xiàng)目中使用:
composer install mrtwenty/flower-client
使用方式:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
//$mq需要與服務(wù)端三個(gè)配置信息相同
$mq = [
'name' => 'mq',
'delay_name' => 'mq_delay',
'fail_list' => 'mq_fail_list'
];
$client = new Client($redis, $mq);
//立即執(zhí)行
$res = $client->add(['test' => 'data']);
var_dump($res);
//延遲消息
$res = $client->add(['test' => 'data'], 3);
var_dump($res);
可以安裝此依賴包,當(dāng)然也可以根據(jù)自己需要用別的包
composer require workerman/mysql
配置信息可以在.env里面寫(xiě)入:
[mysql]
host = 127.0.0.1
username = root
password = 123456
database = test
port = 3306
代碼實(shí)現(xiàn):
<?php
declare(strict_types=1);
namespace app;
use app\library\BaseInterface;
/**
* 消費(fèi)類
*/
class Run implements BaseInterface
{
protected $db = null;
public function getDb()
{
if (is_null($this->db)) {
$config = config('mysql');
$host = $config['host'];
$port = $config['port'];
$user = $config['username'];
$password = $config['password'];
$database = $config['database'];
$this->db = new \Workerman\MySQL\Connection($host, $port, $user, $password, $database);
}
return $this->db;
}
/**
* 消費(fèi)方法,如何消費(fèi),取決用戶自己
*
* @param mixed $data
* @param mixed $id
* @return bool 返回true就會(huì)執(zhí)行ack確認(rèn)消息已消費(fèi)
*/
public function consumer($data, $id): bool
{
$db = $this->getDb();
$info = $db->row("SELECT * FROM `short_url` WHERE id=3");
print_r($info);
return true;
}
/**
* 超過(guò)嘗試的次數(shù),就會(huì)寫(xiě)入失敗隊(duì)列里面,并調(diào)用此方法,可以用此方法通知運(yùn)維
*
* @return void
*/
public function fail($data, $id)
{
print_r($data);
print_r($id);
}
}
由于是守護(hù)進(jìn)程,為了避免php業(yè)務(wù)代碼bug隱藏的內(nèi)存泄露,可以在消費(fèi)者執(zhí)行完一定數(shù)量的時(shí)候重啟進(jìn)程。具體實(shí)現(xiàn)請(qǐng)查看workerman手冊(cè)。
app\consumer\Run的consumer 返回false, 消息變成已讀取未ack,就會(huì)被xpending讀取到,并重試,不行就繼續(xù)xClaim 轉(zhuǎn)移消息,次數(shù)就會(huì)加1
我直接 composer create-project mrtwenty/flower 安裝
app\consumer\Run的consumer返回false,你說(shuō)的那個(gè)地方,我加了調(diào)試信息
[$msg_id, $consumer, $over_time, $fail_num] = $msg;
echo "fail_num", $fail_num, "\n";
echo "fail_num", $fail_num, "\n"; 我變量有打印的,你看那個(gè)截圖,輸出都是
fail_num1、fail_num2、fail_num3,后面的數(shù)字就是。
fail_num
這個(gè)你消息被讀取的次數(shù),你這里并沒(méi)有讀取(XreadGroup),就直接?。╔pending),就不會(huì)變的
不寫(xiě)代碼,用redis 命令執(zhí)行下流程。
# 創(chuàng)建消息test_mq,返回消息ID 1641570877368-0
XADD test_mq * key value
# 創(chuàng)建消費(fèi)組 test_group
XGROUP CREATE test_mq test_group 0
# hehe去讀取數(shù)據(jù),不進(jìn)行xack確認(rèn)消息,xpending就會(huì)有消息
XREADGROUP GROUP test_group hehe COUNT 1 STREAMS test_mq >
# 查看未xack的消息,此時(shí)得到 $fail_num=1
XPENDING test_mq test_group - + 1
# 嘗試消費(fèi)該消息,使用xRange讀取該消息內(nèi)容,結(jié)果還是沒(méi)成ack掉,那就轉(zhuǎn)移
# 轉(zhuǎn)移消息
XCLAIM test_mq test_group hehe 3000 消息ID
XCLAIM test_mq test_group hehe 3000 1641570877368-0
# 再次查看未xack的消息,此時(shí)得到 $fail_num=2
XPENDING test_mq test_group - + 1
# 再次嘗試消費(fèi)該消息,結(jié)果還是沒(méi)成,如此反復(fù)
Redis 版本
127.0.0.1:6379> info
# Server
redis_version:5.0.3
...
os:Linux 4.19.128-microsoft-standard x86_64
以下是不進(jìn)行xclaim
以下是進(jìn)行xclaim
我用php腳本跑結(jié)果和上面是一樣的。結(jié)果為啥和你跑的不一樣,哈哈
是不是由于你使用的是Windows版本,而我是Linux版本
os:Linux 4.19.128-microsoft-standard x86_64
127.0.0.1:6379[14]> info
# Server
redis_version:6.2.1
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:68b3db7bf8188804
redis_mode:standalone
os:Linux 4.15.0-137-generic x86_64
Redis官方源碼版本更新 5.0.4 就支持了
https://github.com/redis/redis/commit/f72f4ea311d31f7ce209218a96afb97490971d39
有一個(gè)很嚴(yán)重的問(wèn)題,在你修剪消息隊(duì)列的時(shí)候,如果消息還沒(méi)消費(fèi),直接會(huì)修剪掉到?jīng)]消費(fèi)到的消息,現(xiàn)象就是某個(gè)區(qū)間的消息全部丟失,我只是看了一下代碼,沒(méi)驗(yàn)證我的想法。
不是哦,你仔細(xì)看看代碼,測(cè)試一下,把GC評(píng)率調(diào)成100%,隊(duì)列最大長(zhǎng)度改短,消息數(shù)量超過(guò)隊(duì)列長(zhǎng)度,一定會(huì)丟失消息
不是, 最簡(jiǎn)單的浮現(xiàn)方式,你把最大隊(duì)列長(zhǎng)度改為10,GC返回true,推送1000條消息進(jìn)去, 你看消費(fèi)的消息有多少條,最多幾十條,不確定具體多少條是因?yàn)閤trim加了true參數(shù)
嗯,你說(shuō)的對(duì),我應(yīng)該在項(xiàng)目里面提醒一下使用這個(gè)項(xiàng)目的人,這是redis stream xtrim 本身的設(shè)計(jì),如果想要長(zhǎng)久的保存,可以將gc設(shè)置成永遠(yuǎn)不會(huì)執(zhí)行,這樣就不會(huì)執(zhí)行xtrim命令了,或者設(shè)置隊(duì)列長(zhǎng)度足夠長(zhǎng),xtrim 執(zhí)行了也不會(huì)裁剪到,這取決于開(kāi)發(fā)者的應(yīng)用場(chǎng)景,我自己用的時(shí)候,都是為了能夠及時(shí)觸發(fā)消息,基本都是即發(fā)即消費(fèi)。
其實(shí)裁剪也是有必要的,不然久了內(nèi)存會(huì)爆炸,或者被redis清空,在裁剪的時(shí)候應(yīng)該裁剪到所有分組消費(fèi)者 最小已消費(fèi)的消息位置,不應(yīng)該是固定裁剪。
額,我剛剛?cè)タ戳讼聄edis6.2的英文手冊(cè),好像支持新的方式,這是個(gè)好消息。
Redis version >= 6.2.0: Added the MINID
trimming strategy and the LIMIT
option.
你這里的
$fail_num
,消息被讀取次數(shù)永遠(yuǎn)是1