開啟10個進(jìn)程,每個進(jìn)程執(zhí)行不同的任務(wù)1、2、3、……10,然后主線程和子進(jìn)程通訊分別交互不同的信息,怎么實(shí)現(xiàn)?
子進(jìn)程A:--> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
子進(jìn)程B:--> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
子進(jìn)程C:--> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
主進(jìn)程和需要先和A通信,然后和B,然后和C,……有先后順序
你這種工作邏輯類似于流水線;
可以開N個消費(fèi)者,通過隊(duì)列來實(shí)現(xiàn)你要的功能;
工作流:1隊(duì)列 --->>> 2隊(duì)列 --->>> 3隊(duì)列 --->>> 4隊(duì)列 --->>> 5隊(duì)列 --->>> 6隊(duì)列
沒這么簡單。
每個消費(fèi)者都需要運(yùn)行10秒鐘cpu繁重的任務(wù),然后通信,然后繼續(xù)運(yùn)行繁重任務(wù),然后繼續(xù)通訊,循環(huán)
然后不同的消費(fèi)者通信又有先后順序,怎么實(shí)現(xiàn)?
以TaskProcess 類為例,這個類只做2個事情
按你的流水線,在進(jìn)程配置內(nèi)新增進(jìn)程配置(一定要設(shè)置構(gòu)造函數(shù) 參數(shù))
完畢。
進(jìn)程配置
'pipeline_1' => [
'count' => 1,
'handler' => \process\TaskProcess::class,
'constructor' => [
'type' => '任務(wù)類型1',
'queue' => '隊(duì)列1'
],
],
'pipeline_2' => [
'count' => 5,
// 根據(jù)任務(wù)繁重情況設(shè)置進(jìn)程數(shù)
'handler' => \process\TaskProcess::class,
'constructor' => [
'type' => '任務(wù)類型2',
'queue' => '隊(duì)列2'
],
],
'pipeline_3' => [
'count' => 3,
// 根據(jù)任務(wù)繁重情況設(shè)置進(jìn)程數(shù)
'handler' => \process\TaskProcess::class,
'constructor' => [
'type' => '任務(wù)類型3',
'queue' => '隊(duì)列3'
],
],
<?php
namespace process;
use support\Redis;
use Workerman\Timer;
class TaskProcess
{
/**
* @var string
*/
protected string $type;
/**
* @var string
*/
protected string $queue;
/**
* 構(gòu)造函數(shù)
* @param string $type
* @param string $queue
*/
public function __construct(string $type, string $queue)
{
$this->type = $type;
$this->queue = $queue;
}
/**
* 根據(jù)類型,得到下一個工作流隊(duì)列名稱
* @return string
*/
public function getNextQueue(): string
{
//todo...
switch ($this->type) {
case '':
return '';
default:
return '';
}
}
/**
* 進(jìn)程啟動時執(zhí)行
* @return void
*/
public function onWorkerStart(): void
{
Timer::add(1, function () {
//當(dāng)前工作流隊(duì)列內(nèi)有任務(wù)
if (Redis::lLen($this->queue)) {
$data = Redis::lPop($this->queue);
//todo... 完成當(dāng)前流程,得到結(jié)果
$result = call_user_func([$this, $this->type], $data);
//todo... 投遞到下個工作流隊(duì)列
Redis::rPush($this->getNextQueue(), $result);
}
});
}
/**
* step1
* @param $data
* @return void
*/
protected function step1($data): void
{
//todo... 業(yè)務(wù)邏輯
}
/**
* step2
* @param $data
* @return void
*/
protected function step2($data): void
{
//todo... 業(yè)務(wù)邏輯
}
/**
* step3
* @param $data
* @return void
*/
protected function step3($data): void
{
//todo... 業(yè)務(wù)邏輯
}
/**
* step4
* @param $data
* @return void
*/
protected function step4($data): void
{
//todo... 業(yè)務(wù)邏輯
}
}
而且用redis獲取數(shù)據(jù)時多個進(jìn)程間會相互爭奪數(shù)據(jù),而需求是進(jìn)程1 的數(shù)據(jù)傳給進(jìn)程2,進(jìn)程2 的傳給進(jìn)程3……,有順序的
回復(fù)1:Redis::lPop($this->queue);是定時器拉起執(zhí)行的,執(zhí)行處理任務(wù)的過程中就是阻塞的。
回復(fù)2:redis獲取時多進(jìn)程會相互爭奪數(shù)據(jù),納尼?實(shí)例化進(jìn)程對象的時候,隊(duì)列名都不一樣,爭奪屁的數(shù)據(jù)??
回復(fù)3:核心邏輯是各個任務(wù)進(jìn)程并行處理任務(wù);用redis的列表(隊(duì)列)實(shí)現(xiàn)了n個任務(wù)進(jìn)程通信,并行處理任務(wù),不理解嗎?(實(shí)例化的時候,每個進(jìn)程監(jiān)聽的隊(duì)列名字不是同一個,理解不了嗎)
理解。
回復(fù)1:靠定時器拉起,中間會有浪費(fèi)的時間,64個進(jìn)程一輪下來就浪費(fèi)64秒,有沒有實(shí)時響應(yīng)的辦法?
丟包報錯的原因是你執(zhí)行過程中是堵塞的,進(jìn)程通信組件發(fā)過去,也收不到;
所以,while(1){
usleep(10000);
//todo... 從當(dāng)前隊(duì)列內(nèi)拿上個結(jié)果,處理后丟進(jìn)下個進(jìn)程隊(duì)列。。。
}
需要阻塞,數(shù)據(jù)來了立馬執(zhí)行立馬傳給下一個進(jìn)程,分散到多個進(jìn)程的目的就是為了最大化減少運(yùn)行時間。
子進(jìn)程A:--> 運(yùn)行10秒計(jì)算任務(wù) --> 等待數(shù)據(jù)1,收到后馬上處理數(shù)據(jù)然后返回數(shù)據(jù)2 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
子進(jìn)程B:--> 運(yùn)行10秒計(jì)算任務(wù) --> 等待數(shù)據(jù)2,處理成數(shù)據(jù)3返回 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
子進(jìn)程C:--> 運(yùn)行10秒計(jì)算任務(wù) --> 等待數(shù)據(jù)3,處理成數(shù)據(jù)4返回 --> 運(yùn)行10秒計(jì)算任務(wù) --> 和主進(jìn)程通信 -->……
最終的目的是要分散到64個進(jìn)程同時計(jì)算
A進(jìn)程(pipeline_1進(jìn)程)等待隊(duì)列1的任務(wù),定時器每秒檢查【隊(duì)列1】?有則執(zhí)行,結(jié)果傳遞到【隊(duì)列2】
B進(jìn)程(pipeline_2進(jìn)程)等待隊(duì)列2的任務(wù),定時器每秒檢查【隊(duì)列2】?有則執(zhí)行,結(jié)果傳遞到【隊(duì)列3】
C進(jìn)程(pipeline_3進(jìn)程)等待隊(duì)列3的任務(wù),定時器每秒檢查【隊(duì)列3】?有則執(zhí)行,結(jié)果傳遞到【隊(duì)列4】
……
X進(jìn)程(pipeline_X進(jìn)程)收到結(jié)果,入庫 或者 進(jìn)行下一輪的調(diào)度;
進(jìn)程間通信用Redis的列表,這么難理解嗎?
主進(jìn)程,子進(jìn)程。
你應(yīng)該是沒看完手冊,
webman,workerman 你知道主進(jìn)程是干嘛用的嗎,
你開的每一個進(jìn)程都是平級的,workerman,webman的主進(jìn)程用來
for ($process_sub>= 你設(shè)置的進(jìn)程數(shù)){
創(chuàng)建一個子進(jìn)程
}
workerman,webman的主進(jìn)程主要是用來supervise用的,你還想主進(jìn)程做啥呢。
你要是希望一個一個順序處理,應(yīng)該用到隊(duì)列
你這樣的想法,實(shí)際上是線程模型的思想,線程模型的思想主要是讓主線程分配,子線程執(zhí)行,主線程回收數(shù)據(jù);
進(jìn)程模型一般不會有這樣的思路,通常來說不會通過主進(jìn)程來進(jìn)行任務(wù)的分配,而是通過一些共同特征,讓子進(jìn)程自己通過特征來獲取自己應(yīng)該獲取的任務(wù),而主進(jìn)程只負(fù)責(zé)一件事情,就是子進(jìn)程的正常運(yùn)行和整體服務(wù)的正常運(yùn)行;
如果你想要強(qiáng)行實(shí)現(xiàn)主進(jìn)程分配任務(wù)讓固定子進(jìn)程進(jìn)行處理,我建議每一個子進(jìn)程都與主進(jìn)程分別建立一條專屬的channel,然后通過發(fā)布訂閱的方式進(jìn)行業(yè)務(wù)處理
進(jìn)程很難在同一時間并行處理,線程也如此,因?yàn)橐慌_服務(wù)器上運(yùn)行的進(jìn)程有很多,每個進(jìn)程都會有自己主動和被動出讓cpu執(zhí)行時間的間歇,同時并行的數(shù)量和cpu數(shù)量是相同的;
進(jìn)程間通訊如果使用socket,那么就存在數(shù)據(jù)需要在用戶態(tài)和內(nèi)核態(tài)進(jìn)行拷貝操作,其實(shí)效率沒有那么高;
如果在共享內(nèi)存中,操作會被加鎖,可能會存在互斥;
把各個業(yè)務(wù)拆分成A\B\C\D來分別執(zhí)行,在理論上可能效率很高,但在實(shí)際的操作過程中存在上述甚至更多的影響效率的點(diǎn),所以還不如將業(yè)務(wù)放在一個進(jìn)程中執(zhí)行,比如假設(shè)一個延遲并不高的業(yè)務(wù),通過拆分到不同進(jìn)程執(zhí)行,最后的結(jié)果只可能比單進(jìn)程慢,而不會快;
假設(shè)存在慢業(yè)務(wù),完全可以做成生產(chǎn)消費(fèi)模式,將存在大量ip的或者阻塞時間較長的交給消費(fèi)隊(duì)列來執(zhí)行