walkor大神,請(qǐng)教一個(gè)socket相關(guān)的問題
現(xiàn)在情況是這樣的,我要寫一個(gè)將MySQL數(shù)據(jù)同步到ES的服務(wù),方案是將MySQL binlog日志解析成結(jié)構(gòu)化的數(shù)據(jù),然后寫入ES,解析binlog的是一個(gè)php cli 單進(jìn)程,死循環(huán)獲取binlog數(shù)據(jù),因?yàn)榕聰?shù)據(jù)太多,消費(fèi)能力跟不上,想著用workerman多個(gè)worker進(jìn)程處理,但是多個(gè)worker進(jìn)程存在的問題是有序性問題,同一張表的事件只能同時(shí)由一個(gè)worker進(jìn)程處理,我目前的想法是緩存了幾個(gè)緩存了幾個(gè)socket鏈接,然后根據(jù)表名去走對(duì)應(yīng)的socket鏈接發(fā)送消息,不確定這樣子穩(wěn)不穩(wěn)定。代碼如下面,寫了一點(diǎn)點(diǎn),大概意思能表現(xiàn)出來。
方案和代碼參考了這個(gè)問題 https://wenda.workerman.net/question/508
有沒有什么更好的方案呢?
消費(fèi)解析后的binlog worker進(jìn)程
use app\dbBase;
use Workerman\Worker;
require_once __DIR__ . '/../vendor/autoload.php';
// 創(chuàng)建一個(gè)Worker監(jiān)聽2347端口,不使用任何應(yīng)用層協(xié)議
$worker = new Worker("text://0.0.0.0:2347");
// 啟動(dòng)4個(gè)進(jìn)程對(duì)外提供服務(wù)
$worker->count = 6;
$worker->name = 'write_es';
Worker::$logFile = __DIR__ . '/' . $worker->name . '.log';
$worker->onWorkerStart = function($worker)
{
// 將db實(shí)例存儲(chǔ)在全局變量中(也可以存儲(chǔ)在某類的靜態(tài)成員中)
dbBase::getInstance()->init();
};
// 當(dāng)客戶端發(fā)來數(shù)據(jù)時(shí)
$worker->onMessage = function($connection, $data)
{
echo $data.PHP_EOL;
//將mysql數(shù)據(jù)寫入ES
// 向客戶端發(fā)送hello $data
$connection->send('hello ' . $data."\n");
};
$worker->onConnect = function ($connection) {
$connection->send('hello\n');
};
// 運(yùn)行worker
Worker::runAll();
解析binlog后推送到worker進(jìn)程
class mysqlEventSubscribers extends EventSubscribers
{
const client_count = 4;
private static $clients;
/**
* mysql增刪改查事件
* @param EventDTO $event
*/
public function allEvents(EventDTO $event): void {
// all events got __toString() implementation
echo $event;
// all events got JsonSerializable implementation
//echo json_encode($event, JSON_PRETTY_PRINT);
//將事件推送到worker進(jìn)程中進(jìn)行處理
$this->send($event);
echo 'Memory usage ' . round(memory_get_usage() / 1048576, 2) . ' MB' . PHP_EOL;
}
function send($event) {
if (!isset($this->clients)) {
// 建立socket連接到內(nèi)部推送端口
for ($i = 0; $i<self::client_count;$i++) {
static::$clients[$i] = stream_socket_client('tcp://127.0.0.1:2347', $errno, $errmsg, 1);
}
}
//根據(jù)event事件中的數(shù)據(jù)庫表名,找到對(duì)應(yīng)的
$client = $this->getClientByEvent($event);
// 發(fā)送數(shù)據(jù),注意5678端口是Text協(xié)議的端口,Text協(xié)議需要在數(shù)據(jù)末尾加上換行符
fwrite($client, json_encode($event) . "\n");
// 讀取推送結(jié)果
echo fread($client, 8192);
}
function getTableFromEvent($event) {
//邏輯沒實(shí)現(xiàn),大概就是不同的增刪改查返回不同的表名
return 'table';
}
function getClientByEvent($event) {
$table = $this->getTableFromEvent($event);
$clientIndex = syncTable::$tables[$table] % self::client_count;
return static::$clients[$clientIndex];
}
}
我測試了下,上面的方案是行不通的,因?yàn)橥粋€(gè)client發(fā)送的多個(gè)消息,并不是同一個(gè)worker進(jìn)程處理的,所以上面我的方案不行,應(yīng)該還是要在發(fā)送之前加個(gè)數(shù)組緩存event消息,同一個(gè)表的多條數(shù)據(jù),等一條處理完了再發(fā)送下一條
同一個(gè)連接的數(shù)據(jù)肯定是同一個(gè)worker處理的,所以感覺代碼沒啥問題。