用戶提交了一萬條左右的數(shù)據(jù)更新操作,用了一個(gè)A異步隊(duì)列將數(shù)據(jù)放進(jìn)去,在A隊(duì)列中,將一萬條數(shù)據(jù)循環(huán)放入B異步隊(duì)列,奇怪的事情發(fā)生了,B異步可能只有幾十條任務(wù),死活這一萬條數(shù)據(jù)加不進(jìn)B隊(duì)列。
嘗試去掉A隊(duì)列,直接循環(huán)將一萬條數(shù)據(jù)丟入B隊(duì)列,結(jié)果成功了。
//redis-queue/redis
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => null, // 密碼,字符串類型,可選參數(shù)
'db' => 2, // 數(shù)據(jù)庫
'prefix' => '', // key 前綴
'max_attempts' => 3, // 消費(fèi)失敗后,重試次數(shù)
'retry_seconds' => 5, // 重試間隔,單位秒
'wait_timeout'=>8640000,
'connect_timeout'=>86400,
]
],
];
//redis-queue/process
return [
'consumer' => [
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 1, // 可以設(shè)置多進(jìn)程同時(shí)消費(fèi)
'constructor' => [
// 消費(fèi)者類目錄
'consumer_dir' => app_path() . '/queue/redis'
],
'user' => 'www',
'group' => 'www',
],
];
// A隊(duì)列
public $queue = 'keyword-queue';
// 連接名,對應(yīng) plugin/webman/redis-queue/redis.php 里的連接`
public $connection = 'default';
// 消費(fèi)
public function consume($data)
{
foreach ($data['keywords'] as $key=>$keyword) {
// 隊(duì)列名
$queue = 'zhishu-queue';
// 數(shù)據(jù),可以直接傳數(shù)組,無需序列化
dump($keyword.' '.$key);
// 投遞消息
Client::send($queue, ['keyword' => $keyword, 'type' => $data['type'], 'id' => $data['id'],'status'=>$data['status']]);
}
}
//B隊(duì)列
// 要消費(fèi)的隊(duì)列名
public $queue = 'zhishu-queue';
// 連接名,對應(yīng) plugin/webman/redis-queue/redis.php 里的連接`
public $connection = 'default';
// 消費(fèi)
public function consume($data)
{
$lock = Cache::lock(KeywordsService::LOCK_BY_ID . $data['id'], 60);
$process_task_key = snowflake_id();
try {
$lock->block(60);
// 等待最多 5 秒后獲得的鎖...
$keyword = KeywordsService::getById($data['id']);
if ($keyword->status == 1 && $data['status'] == $keyword->status) {
$keyword->current += 1;
if ($keyword->current >= $keyword->total) {
$keyword->status = 2;
}
$keyword->save();
KeywordsService::saved($keyword, $process_task_key);
}
$lock?->release();
delete_process_key($process_task_key);
} catch (\Throwable $exception) {
$lock?->release();
delete_process_keys($process_task_key);
throw $exception;
}
"workerman/webman-framework": "^1.5.0"
文檔有說異步投送原理,異步投遞是先把消息放在本地內(nèi)存里,等進(jìn)程空閑時(shí)發(fā)送給redis。
如果數(shù)據(jù)還沒投送完進(jìn)程退出就會導(dǎo)致數(shù)據(jù)丟失。
還有如果你的進(jìn)程有l(wèi)ock相關(guān)的操作,我想也會影響投遞,因?yàn)檫M(jìn)程被你lock的時(shí)間段里,內(nèi)存中的消息無法投遞到redis。
感覺你應(yīng)該用同步投遞,文檔說重要數(shù)據(jù)應(yīng)該用同步投遞。