1、A用戶發(fā)起http請(qǐng)求,
2、http服務(wù)端,通過(guò)調(diào)用讓Gateway的ws服務(wù)端向ws客戶端B發(fā)送請(qǐng)求,
3、ws客戶端B收到數(shù)據(jù)后,發(fā)送回復(fù)給ws服務(wù)端,
4、ws服務(wù)端收到數(shù)據(jù)后,讓http服務(wù)器響應(yīng)A的http請(qǐng)求
請(qǐng)問(wèn)怎么實(shí)現(xiàn),ws過(guò)程是異步的,好像A用戶的http請(qǐng)求處理過(guò)程必須有等待,類似sleep來(lái)等待ws通訊完,才能拿到數(shù)據(jù)響應(yīng)給A。等待就堵塞,不好,還有什么好的方案嗎?
A用戶也發(fā)起ws數(shù)據(jù),這樣大家都異步了,有個(gè)標(biāo)識(shí)符,來(lái)回傳就行了;還有其他出路嗎?
有沒(méi)有什么方案、插件、工具,可以讓A用戶的請(qǐng)求掛起不堵塞,收到ws數(shù)據(jù)后,在響應(yīng)A的http請(qǐng)求
Revolt很簡(jiǎn)單,我剛改造了我的代碼。。。下面的代碼是從我項(xiàng)目里精簡(jiǎn)復(fù)制出來(lái)的,需要自己整理一下,無(wú)法直接運(yùn)行。。。僅供參考
// 將事件循環(huán)換成Revolt
\Workerman\Worker::$eventLoopClass = \Workerman\Events\Revolt::class;
// 代碼不完整,僅供參考,下面的代碼需放入onWorkerStart()中運(yùn)行
$listens = [];
// websocket服務(wù)端收到信息后,進(jìn)行[$command, $hash, $data] = unserialize($data)反序列化,
// 然后回復(fù)消息時(shí)$connection->send(serialize([$hash, $result]))
// 保證收到消息、回復(fù)消息時(shí)$hash值一致
$connection = new AsyncTcpConnection('ws://127.0.0.1:8877');
$connection->onMessage = function (TcpConnection $connection, $data) use ($listens) {
[$hash, $data] = unserialize($data);
if (!isset($listens[$hash])) {
return;
}
[$timerid, $suspension] = $listens[$hash];
unset($listens[$hash]);
\Workerman\Timer::del($timerid);
$suspension->resume($data);
};
$connection->connect();
function post(string $command, array $data, int $timeout = 10): mixed
{
global $listens, $connection;
$suspension = \Revolt\EventLoop::getSuspension();
$hash = intval(microtime(true) * 1000);
$timerid = \Workerman\Timer::add(
$timeout,
function () use ($hash, $listens, $suspension) {
if (isset($listens[$hash])) {
$suspension->throw(new \Exception('超時(shí)了'));
}
},
[],
false
);
$listens[$hash] = [$timerid, $suspension];
$connection->send(serialize([
$command,
$hash,
$data,
]));
return $suspension->suspend();
}
// 執(zhí)行
$value = $this->post('get', ['key'=>'good']);
var_dump($value);