国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

workerman實現(xiàn)tcp客戶端中間件,該用什么樣的方法提供服務?

xiaopi

問題描述

現(xiàn)有第三方服務,實現(xiàn)了TCP服務端,稱為服務A。基本用法就是通過創(chuàng)建TCP客戶端連接服務A,對服務A發(fā)送各種指令,獲得響應,以及訂閱服務A的各種事件,服務A會主動推送事件通知過來。
我想用wokerman的做一個中間層,負責與服務A通信,并以wokerman為核心提供一些服務,共內部的其他程序使用。流程為本地其他項目————調用workerman中間層————調用服務A,其中wokerman中間層負責將服務A的響應結果返回給本地其他項目

我想知道wokerman中間層提供何種服務才能簡便供本地其他項目調用?
已知與服務A進行TCP交互,TCP服務端會響應的數(shù)據(jù)格式為兩種,一種是同步命令型,一種是異步訂閱型。
同步命令型:指的是TCP客戶端發(fā)出命令指令,TCP服務端會阻塞的返回指令的執(zhí)行結果,與http協(xié)議一樣 請求/響應
異步訂閱型:指的是TCP客戶端發(fā)出訂閱指令,TCP服務端會立即返回訂閱結果,并定時向TCP客戶端發(fā)送訂閱的數(shù)據(jù),直到客戶端收集了足夠的數(shù)據(jù),主動取消訂閱或者斷開連接為止。

初步實現(xiàn)wokerman中間層的想法是:
1.中間層搭建一個http服務端,用來接收本地其他項目的url請求(指令)
2.中間層接收到url發(fā)送過來的指令后,判斷是否為同步命令型指令,如果是,則建立TCP客戶端與服務A交互,并同步等待服務A的響應結果,并將響應結果作為response響應給http客戶端;如果是異步訂閱型指令,則要求http客戶端發(fā)送的參數(shù)中必須包含回調url,以便中間件接收到服務A訂閱數(shù)據(jù)后,通過該回調響應給http客戶端。
3.為了解決workerman中間層服務A交互時的TCP邊界問題,使用workerman的特性,定制了與服務A的通訊協(xié)議(封裝了解包/發(fā)包協(xié)議)定制協(xié)議,中間件與服務A交互的方式如下:

$con = new \Workerman\Connection\AsyncTcpConnection('TestNL://14.103.39.10:3315');
    $con->onConnect = function ($con) {
        // 發(fā)送驗證
        $con->send("xxxx");
    };
    $con->onMessage = function ($con, $data) {
        // 獲取解包/合包后的服務A響應的數(shù)據(jù)
        var_data($data)
    };

那么問題就出現(xiàn)了,我使用workerman的定制協(xié)議,必須要用AsyncTcpConnection建立TCP客戶端,而且我又想提供http服務端及時的把數(shù)據(jù)響應給http客戶端,這在workerman或者webman中根本不允許的啊,AsyncTcpConnection是異步的,我不能阻塞http的worker進程用來等待AsyncTcpConnection的message的響應啊。請問應該如何解決這種情況呢?

為此你搜索到了哪些方案及不適用的原因

這種需求應該很常見,請問大佬們如何實現(xiàn)呢?

919 3 4
3個回答

walkor 打賞

問題描述得很詳細,非常贊,一看就有回答的欲望。
workerman做這樣的業(yè)務非常擅長,以下是示例代碼

安裝 workerman/http-client

為了異步執(zhí)行http回調,要裝下 workerman/http-client

composer require workerman/http-client

服務端代碼

以下是服務端代碼示例 start.php

<?php

use Workerman\Connection\TcpConnection;
use Workerman\Http\Client;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('http://0.0.0.0:2345');

// 保存訂閱主題->回調url的數(shù)組
global $subjects;
$subjects = [];

// 進程啟動時建立一個到A服務的連接
$worker->onWorkerStart = function() {
    $con = new AsyncTcpConnection('ws://echo.websocket.org:443');
    $con->transport = 'ssl'; // tcp協(xié)議不需要設置
    $con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
        $con->send('hello');
    };
    // 收到A服務的訂閱數(shù)據(jù)后轉發(fā)給訂閱者
    $con->onMessage = function(AsyncTcpConnection $con, $subject) {
        global $subjects;
        foreach ($subjects[$subject] ?? [] as $callbackUrl) {
            $httpClient = new Client();
            $httpClient->post($callbackUrl, ['subject' => $subject]);
            echo "callback $callbackUrl\n";
        }
    };
    // 為了模擬A主動推送訂閱消息,這里定時向A服務發(fā)送數(shù)據(jù),A會返回hello
    \Workerman\Timer::add(10, function () use ($con) {
        $con->send('hello');
    });
    $con->connect();
};

$worker->onMessage = function (TcpConnection $httpConnection, Request $request) {
    $callbackUrl = $request->get('callback');
    // 訂閱主題
    $subject = $request->get('subject');
    // 沒有回調url或者訂閱主題認為是同步指令
    if (!$callbackUrl || !$subject) {
        $con = new AsyncTcpConnection('ws://echo.websocket.org:443');
        $con->transport = 'ssl'; // tcp協(xié)議不需要設置
        $con->onWebSocketConnect = function (AsyncTcpConnection $con) {
            $con->send('hello');
        };
        $con->onMessage = function (AsyncTcpConnection $con, $data) use ($httpConnection) {
            $httpConnection->send($data);
            $con->close();
        };
        $con->connect();
        return;
    }
    // 記錄主題和回調之間的關系
    global $subjects;
    if (!isset($subjects[$subject])) {
        $subjects[$subject] = [];
    }
    if (in_array($callbackUrl, $subjects[$subject])) {
        $httpConnection->send('already subscribed');
        return;
    }
    $subjects[$subject][] = $callbackUrl;
    $httpConnection->send('subscribed');
};

Worker::runAll();

啟動

php start.php start

測試

同步請求url類似 http://127.0.0.1:2345
訂閱測試url類似 http://127.0.0.1:2345/?subject=hello&callback=<url地址>

說明

為了方便測試,上面代碼用的wss協(xié)議測試,你需要改成自己的協(xié)議

  • xiaopi 2024-04-28

    感謝老大,思路太棒了,在worker進程開啟的時候就建立一個tcp長連接負責分發(fā)A服務的響應,然后http請求時使用短連接,查完就關閉連接。稍微改一下直接可以用了。接下來我ab壓測下單進程的性能,以及是否可以多進程,或者移植到webman上。
    按我現(xiàn)在想的,應該可以直接開啟多進程增大并發(fā),因為每個進程開啟的時候就建立tcp客戶端連接,然后http客戶端發(fā)送訂閱指令時,無論subject分發(fā)到哪個worker進程去A服務上訂閱都無所謂。

    可能要改的是就是進程開啟時的定時器,定時將存儲的訂閱指令發(fā)送到TCP服務端中
    類似下面代碼

    $worker->onWorkerStart = function() {
        $con = new AsyncTcpConnection('ws://echo.websocket.org:443');
        $con->transport = 'ssl'; // tcp協(xié)議不需要設置
        $con->onWebSocketConnect = function(AsyncTcpConnection $con, ) {
            $con->send('hello');
        };
        // 收到A服務的訂閱數(shù)據(jù)后轉發(fā)給訂閱者
        $con->onMessage = function(AsyncTcpConnection $con, $subject) {
            global $subjects;
            foreach ($subjects[$subject] ?? [] as $callbackUrl) {
                $httpClient = new Client();
                $httpClient->post($callbackUrl, ['subject' => $subject]);
                echo "callback $callbackUrl\n";
            }
        };
        //  定時獲取全局變量中待發(fā)送的訂閱指令,發(fā)送后在onMessage中監(jiān)聽訂閱響應
        \Workerman\Timer::add(10, function () use ($con) {
            global $subjects;
            foreach ($subjects as $subject) {
                $con->send('訂閱指令');
            }
            if (empty($subjects)) {
                $con->send('發(fā)送心跳');
            }
        });
        $con->connect();
    };
walkor 打賞

多進程時要加一句 $worker->reusePort = true;

$worker = new Worker('http://0.0.0.0:2345');
$worker->reusePort = true;
JackDx

學習了。

  • 暫無評論
年代過于久遠,無法發(fā)表回答
??