国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

請(qǐng)教一個(gè)socket長連接相關(guān)的問題

foolgry

walkor大神,請(qǐng)教一個(gè)socket相關(guān)的問題

現(xiàn)在情況是這樣的,我要寫一個(gè)將MySQL數(shù)據(jù)同步到ES的服務(wù),方案是將MySQL binlog日志解析成結(jié)構(gòu)化的數(shù)據(jù),然后寫入ES,解析binlog的是一個(gè)php cli 單進(jìn)程,死循環(huán)獲取binlog數(shù)據(jù),因?yàn)榕聰?shù)據(jù)太多,消費(fèi)能力跟不上,想著用workerman多個(gè)worker進(jìn)程處理,但是多個(gè)worker進(jìn)程存在的問題是有序性問題,同一張表的事件只能同時(shí)由一個(gè)worker進(jìn)程處理,我目前的想法是緩存了幾個(gè)緩存了幾個(gè)socket鏈接,然后根據(jù)表名去走對(duì)應(yīng)的socket鏈接發(fā)送消息,不確定這樣子穩(wěn)不穩(wěn)定。代碼如下面,寫了一點(diǎn)點(diǎn),大概意思能表現(xiàn)出來。
方案和代碼參考了這個(gè)問題 https://wenda.workerman.net/question/508

有沒有什么更好的方案呢?

消費(fèi)解析后的binlog worker進(jìn)程

use app\dbBase;
use Workerman\Worker;
require_once __DIR__ . '/../vendor/autoload.php';

// 創(chuàng)建一個(gè)Worker監(jiān)聽2347端口,不使用任何應(yīng)用層協(xié)議
$worker = new Worker("text://0.0.0.0:2347");

// 啟動(dòng)4個(gè)進(jìn)程對(duì)外提供服務(wù)
$worker->count = 6;
$worker->name = 'write_es';
Worker::$logFile = __DIR__ . '/' . $worker->name . '.log';

$worker->onWorkerStart = function($worker)
{
    // 將db實(shí)例存儲(chǔ)在全局變量中(也可以存儲(chǔ)在某類的靜態(tài)成員中)
    dbBase::getInstance()->init();
};

// 當(dāng)客戶端發(fā)來數(shù)據(jù)時(shí)
$worker->onMessage = function($connection, $data)
{
    echo $data.PHP_EOL;
    //將mysql數(shù)據(jù)寫入ES
    // 向客戶端發(fā)送hello $data
    $connection->send('hello ' . $data."\n");
};

$worker->onConnect = function ($connection) {
    $connection->send('hello\n');
};

// 運(yùn)行worker
Worker::runAll();

解析binlog后推送到worker進(jìn)程

class mysqlEventSubscribers extends EventSubscribers
{

    const client_count = 4;
    private static $clients;

    /**
     * mysql增刪改查事件
     * @param EventDTO $event
     */
    public function allEvents(EventDTO $event): void {
        // all events got __toString() implementation
        echo $event;
        // all events got JsonSerializable implementation
        //echo json_encode($event, JSON_PRETTY_PRINT);
        //將事件推送到worker進(jìn)程中進(jìn)行處理
        $this->send($event);

        echo 'Memory usage ' . round(memory_get_usage() / 1048576, 2) . ' MB' . PHP_EOL;
    }

    function send($event) {
        if (!isset($this->clients)) {
            // 建立socket連接到內(nèi)部推送端口
            for ($i = 0; $i<self::client_count;$i++) {
                static::$clients[$i] = stream_socket_client('tcp://127.0.0.1:2347', $errno, $errmsg, 1);
            }
        }

        //根據(jù)event事件中的數(shù)據(jù)庫表名,找到對(duì)應(yīng)的
        $client = $this->getClientByEvent($event);

        // 發(fā)送數(shù)據(jù),注意5678端口是Text協(xié)議的端口,Text協(xié)議需要在數(shù)據(jù)末尾加上換行符
        fwrite($client, json_encode($event) . "\n");
        // 讀取推送結(jié)果
        echo fread($client, 8192);
    }

    function getTableFromEvent($event) {
        //邏輯沒實(shí)現(xiàn),大概就是不同的增刪改查返回不同的表名
        return 'table';
    }

    function getClientByEvent($event) {
        $table = $this->getTableFromEvent($event);
        $clientIndex = syncTable::$tables[$table] % self::client_count;
        return static::$clients[$clientIndex];
    }
}
2344 3 0
3個(gè)回答

敖德薩

我怎么感覺你這有點(diǎn)像要寫定時(shí)任務(wù)的樣子 就是當(dāng)有消息過多的時(shí)候我沒辦法同步到es的時(shí)候就用緩存來解壓一下 直至消息同步為此是不是?

  • foolgry 2021-10-11

    主要是為了多進(jìn)程消費(fèi),讓同一個(gè)表到達(dá)同一個(gè)worker進(jìn)程

foolgry

我測試了下,上面的方案是行不通的,因?yàn)橥粋€(gè)client發(fā)送的多個(gè)消息,并不是同一個(gè)worker進(jìn)程處理的,所以上面我的方案不行,應(yīng)該還是要在發(fā)送之前加個(gè)數(shù)組緩存event消息,同一個(gè)表的多條數(shù)據(jù),等一條處理完了再發(fā)送下一條

  • 暫無評(píng)論
six

同一個(gè)連接的數(shù)據(jù)肯定是同一個(gè)worker處理的,所以感覺代碼沒啥問題。

  • foolgry 2021-10-12

    我測試的結(jié)果發(fā)現(xiàn)同一個(gè)連接的數(shù)據(jù)是分發(fā)給不同的worker處理的,這個(gè)代碼還是不行

  • six 2021-10-12

    不可能,worker進(jìn)程間是隔離的,連接分配到某個(gè)進(jìn)程后就不會(huì)再次分配了,這個(gè)連接的所有數(shù)據(jù)都會(huì)給這個(gè)進(jìn)程處理。

  • foolgry 2021-10-13

    @1393:你說的是對(duì)的,我之前測試的代碼有問題,多謝了

年代過于久遠(yuǎn),無法發(fā)表回答
??