現(xiàn)有第三方服務,實現(xiàn)了TCP服務端,稱為服務A
。基本用法就是通過創(chuàng)建TCP客戶端連接服務A
,對服務A
發(fā)送各種指令,獲得響應,以及訂閱服務A的各種事件,服務A會主動推送事件通知過來。
我想用wokerman的做一個中間層,負責與服務A通信,并以wokerman為核心提供一些服務,共內部的其他程序使用。流程為本地其他項目————調用workerman中間層————調用服務A
,其中wokerman中間層
負責將服務A
的響應結果返回給本地其他項目
我想知道wokerman中間層提供何種服務才能簡便供本地其他項目調用?
已知與服務A進行TCP交互,TCP服務端會響應的數(shù)據(jù)格式為兩種,一種是同步命令型
,一種是異步訂閱型
。
同步命令型
:指的是TCP客戶端發(fā)出命令指令,TCP服務端會阻塞的返回指令的執(zhí)行結果,與http協(xié)議一樣 請求/響應
異步訂閱型
:指的是TCP客戶端發(fā)出訂閱指令,TCP服務端會立即返回訂閱結果,并定時向TCP客戶端發(fā)送訂閱的數(shù)據(jù),直到客戶端收集了足夠的數(shù)據(jù),主動取消訂閱或者斷開連接為止。
初步實現(xiàn)wokerman中間層的想法是:
1.中間層搭建一個http服務端,用來接收本地其他項目的url請求(指令)
2.中間層接收到url發(fā)送過來的指令后,判斷是否為同步命令型指令
,如果是,則建立TCP客戶端與服務A交互,并同步等待服務A的響應結果,并將響應結果作為response響應給http客戶端;如果是異步訂閱型指令
,則要求http客戶端發(fā)送的參數(shù)中必須包含回調url,以便中間件接收到服務A訂閱數(shù)據(jù)后,通過該回調響應給http客戶端。
3.為了解決workerman中間層
與服務A
交互時的TCP邊界問題,使用workerman的特性,定制了與服務A的通訊協(xié)議(封裝了解包/發(fā)包協(xié)議)定制協(xié)議,中間件與服務A交互的方式如下:
$con = new \Workerman\Connection\AsyncTcpConnection('TestNL://14.103.39.10:3315');
$con->onConnect = function ($con) {
// 發(fā)送驗證
$con->send("xxxx");
};
$con->onMessage = function ($con, $data) {
// 獲取解包/合包后的服務A響應的數(shù)據(jù)
var_data($data)
};
那么問題就出現(xiàn)了,我使用workerman的定制協(xié)議,必須要用AsyncTcpConnection建立TCP客戶端,而且我又想提供http服務端及時的把數(shù)據(jù)響應給http客戶端,這在workerman或者webman中根本不允許的啊,AsyncTcpConnection是異步的,我不能阻塞http的worker進程用來等待AsyncTcpConnection的message的響應啊。請問應該如何解決這種情況呢?
這種需求應該很常見,請問大佬們如何實現(xiàn)呢?
問題描述得很詳細,非常贊,一看就有回答的欲望。
workerman做這樣的業(yè)務非常擅長,以下是示例代碼
為了異步執(zhí)行http回調,要裝下 workerman/http-client
composer require workerman/http-client
以下是服務端代碼示例 start.php
<?php
use Workerman\Connection\TcpConnection;
use Workerman\Http\Client;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('http://0.0.0.0:2345');
// 保存訂閱主題->回調url的數(shù)組
global $subjects;
$subjects = [];
// 進程啟動時建立一個到A服務的連接
$worker->onWorkerStart = function() {
$con = new AsyncTcpConnection('ws://echo.websocket.org:443');
$con->transport = 'ssl'; // tcp協(xié)議不需要設置
$con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
$con->send('hello');
};
// 收到A服務的訂閱數(shù)據(jù)后轉發(fā)給訂閱者
$con->onMessage = function(AsyncTcpConnection $con, $subject) {
global $subjects;
foreach ($subjects[$subject] ?? [] as $callbackUrl) {
$httpClient = new Client();
$httpClient->post($callbackUrl, ['subject' => $subject]);
echo "callback $callbackUrl\n";
}
};
// 為了模擬A主動推送訂閱消息,這里定時向A服務發(fā)送數(shù)據(jù),A會返回hello
\Workerman\Timer::add(10, function () use ($con) {
$con->send('hello');
});
$con->connect();
};
$worker->onMessage = function (TcpConnection $httpConnection, Request $request) {
$callbackUrl = $request->get('callback');
// 訂閱主題
$subject = $request->get('subject');
// 沒有回調url或者訂閱主題認為是同步指令
if (!$callbackUrl || !$subject) {
$con = new AsyncTcpConnection('ws://echo.websocket.org:443');
$con->transport = 'ssl'; // tcp協(xié)議不需要設置
$con->onWebSocketConnect = function (AsyncTcpConnection $con) {
$con->send('hello');
};
$con->onMessage = function (AsyncTcpConnection $con, $data) use ($httpConnection) {
$httpConnection->send($data);
$con->close();
};
$con->connect();
return;
}
// 記錄主題和回調之間的關系
global $subjects;
if (!isset($subjects[$subject])) {
$subjects[$subject] = [];
}
if (in_array($callbackUrl, $subjects[$subject])) {
$httpConnection->send('already subscribed');
return;
}
$subjects[$subject][] = $callbackUrl;
$httpConnection->send('subscribed');
};
Worker::runAll();
php start.php start
同步請求url類似 http://127.0.0.1:2345
訂閱測試url類似 http://127.0.0.1:2345/?subject=hello&callback=<url地址>
為了方便測試,上面代碼用的wss協(xié)議測試,你需要改成自己的協(xié)議
感謝老大,思路太棒了,在worker進程開啟的時候就建立一個tcp長連接負責分發(fā)A服務的響應,然后http請求時使用短連接,查完就關閉連接。稍微改一下直接可以用了。接下來我ab壓測下單進程的性能,以及是否可以多進程,或者移植到webman上。
按我現(xiàn)在想的,應該可以直接開啟多進程增大并發(fā),因為每個進程開啟的時候就建立tcp客戶端連接,然后http客戶端發(fā)送訂閱指令時,無論subject分發(fā)到哪個worker進程去A服務上訂閱都無所謂。
可能要改的是就是進程開啟時的定時器,定時將存儲的訂閱指令發(fā)送到TCP服務端中
類似下面代碼
$worker->onWorkerStart = function() {
$con = new AsyncTcpConnection('ws://echo.websocket.org:443');
$con->transport = 'ssl'; // tcp協(xié)議不需要設置
$con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
$con->send('hello');
};
// 收到A服務的訂閱數(shù)據(jù)后轉發(fā)給訂閱者
$con->onMessage = function(AsyncTcpConnection $con, $subject) {
global $subjects;
foreach ($subjects[$subject] ?? [] as $callbackUrl) {
$httpClient = new Client();
$httpClient->post($callbackUrl, ['subject' => $subject]);
echo "callback $callbackUrl\n";
}
};
// 定時獲取全局變量中待發(fā)送的訂閱指令,發(fā)送后在onMessage中監(jiān)聽訂閱響應
\Workerman\Timer::add(10, function () use ($con) {
global $subjects;
foreach ($subjects as $subject) {
$con->send('訂閱指令');
}
if (empty($subjects)) {
$con->send('發(fā)送心跳');
}
});
$con->connect();
};