RabbitMQ 斷開后,主動(dòng) Worker::stopAll 是出于什么考慮呢?
我想實(shí)現(xiàn)服務(wù)自動(dòng)恢復(fù),如果RabbitMQ斷開之后,會(huì)定時(shí)重連。我的服務(wù)在onWorkerStart時(shí)比較耗時(shí)。
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0){
Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
}
return $this;
});
輸出為:
RabbitMQ client disconnected: [506] Connection closed by server unexpectedly
子進(jìn)程stopAll就會(huì)退出當(dāng)前子進(jìn)程,主進(jìn)程會(huì)重新拉起子進(jìn)程,在重新拉起子進(jìn)程的過程中,子進(jìn)程會(huì)對rabbitmq進(jìn)行重連,這是最簡單的重連機(jī)制的實(shí)現(xiàn)方案;當(dāng)然還有的做法是可以不用重啟進(jìn)程,在進(jìn)程內(nèi)部用代碼實(shí)現(xiàn)重連,代碼復(fù)雜度會(huì)更高,相對這種方案,維護(hù)起來也不方便,兩種方案比較起來也沒有明顯的性能差異,所以選擇一種代碼維護(hù)方便的即可。
之前重啟方案是我提交的PR,你也可以按照以下代碼進(jìn)行修改
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0) {
//定時(shí)器執(zhí)行stopAll
//在配置中增加一個(gè)重啟定時(shí)器字段,比如restart_time
if (($restartTime = $this->options['restart_time'] ?? 0) > 0) {
$this->eventLoop->add(restartTime, EventInterface::EV_TIMER, function() use ($replyCode, $replyText) {
Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
});
return null;
} else {
Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
}
}
return $this;
});
如果你有空,你可以提交這個(gè)pr,如果暫時(shí)沒空,等會(huì)兒我去提交個(gè)pr
我自己實(shí)現(xiàn)的重連方案沒并有使用 worker::stopAll, 我是希望不重啟子進(jìn)程。 我現(xiàn)在想重寫Client ,去掉Woker::stopAll(), 之后出現(xiàn)這樣的錯(cuò)誤:
Fatal error: Uncaught Bunny\Exception\ClientException: Broken pipe or closed connection. in /home/vagrant/code/wanlay-scheduler/vendor/bunny/bunny/src/Bunny/AbstractClient.php:311
Stack trace:
return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
if (!empty($this->channels)) {
throw new \LogicException("All channels have to be closed by now.");
}
if($replyCode !== 0){
return null;
}
return $this->connectionClose($replyCode, $replyText, 0, 0);
})->then(function () use ($replyCode, $replyText){
$this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
$this->closeStream();
$this->init();
if($replyCode !== 0){
Log::error("RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
}
return $this;
});
不建議自行實(shí)現(xiàn)重啟功能,本身客戶端高度依賴eventloop,很多準(zhǔn)備工作是在進(jìn)程啟動(dòng)的時(shí)候做的,比如加入定時(shí)器、加入事件監(jiān)聽,如果自行實(shí)現(xiàn)重啟,需要對這些工作都要重新再做一遍
你上述方式這樣寫,僅僅只是關(guān)閉了流的監(jiān)聽關(guān)閉了心跳定時(shí)器移除了緩沖區(qū),但是你重連的時(shí)候,你還需要把流的監(jiān)聽事件加上,并且為rabbitmq-client的連接進(jìn)行重新建立,重新獲取client-id等等
謝謝大佬 !太給力了, 我最開始的思路是想,如果斷開了連接,通過定時(shí)器去嘗試連接。 連接成功后覆蓋之前的連接對象。
目前發(fā)送MQ僅僅是服務(wù)的一部分,直接重啟我覺得影響了我其他的工作。
謝謝大佬 !太給力了, 我最開始的思路是想,如果斷開了連接,通過定時(shí)器去嘗試連接。 連接成功后覆蓋之前的> 連接對象。
目前發(fā)送MQ僅僅是服務(wù)的一部分,直接重啟我覺得影響了我其他的工作。
那你可以參考這個(gè)client是如何進(jìn)行連接的,然后在你的定時(shí)器里把連接工作做完,然后重連這里就按你自己寫的方式寫;當(dāng)然我個(gè)人覺得重啟當(dāng)前進(jìn)程是最快的做法,絲毫不會(huì)影響其他的工作。
這個(gè)我再做一個(gè)特性吧,就在重連這個(gè)地方增加一個(gè)注冊的回調(diào)事件,如果加入了注冊回調(diào)事件,就執(zhí)行回調(diào)事件,如果沒有回調(diào)事件,就自動(dòng)重啟
@walkor 老大,根據(jù)需求,新增了用戶自定義注冊回調(diào);我又調(diào)整了一個(gè)pr,已提交
https://github.com/walkor/rabbitmq/pull/15
@mo 你可以參考https://github.com/walkor/rabbitmq/pull/15的方式,在回調(diào)里面得到傳入的client實(shí)例,然后對client實(shí)例進(jìn)行處理銷毀,重新new一個(gè)client代替你之前使用的client實(shí)例即可