我正在使用GatewayWorker開發(fā)一個(gè)物聯(lián)網(wǎng)平臺(tái),主要功能是作為TCP服務(wù)端與DTU設(shè)備(客戶端)建立連接并進(jìn)行通信。現(xiàn)在遇到一個(gè)需求:
每個(gè)設(shè)備連接后,服務(wù)端需要??定時(shí)循環(huán)下發(fā)一組指令??(例如MODBUS協(xié)議采集指令)。
??下發(fā)邏輯??:必須??逐條發(fā)送??,即發(fā)送一條指令 → 等待設(shè)備返回響應(yīng) → 再發(fā)送下一條,直到當(dāng)前設(shè)備的所有指令發(fā)送完成 → 等待下一次循環(huán)開始。
??循環(huán)要求??:整個(gè)過程需要周期性重復(fù)(例如每5分鐘一輪)。
如果直接用GatewayWorker的定時(shí)器(如Timer::add()),每一輪采集需要等待N次指令交互(發(fā)送+響應(yīng)),??耗時(shí)較長(zhǎng)??(可能幾秒到十幾秒)。
??擔(dān)心阻塞問題??:
如果??新設(shè)備嘗試連接??,是否會(huì)因?yàn)閃orker進(jìn)程被占用而無法及時(shí)處理?
高并發(fā)場(chǎng)景下(例如100個(gè)設(shè)備),這種串行化設(shè)計(jì)是否會(huì)導(dǎo)致性能瓶頸?
希望有經(jīng)驗(yàn)的朋友能分享優(yōu)化思路,感謝!
你設(shè)備上線后,綁定了uid以后,就是誰發(fā)誰的,接受的時(shí)候直接處理就行了吧
這是谷歌給的回答,應(yīng)該是完美回答了,給后來的朋友參考下:
您好!您提出的這個(gè)問題非常經(jīng)典,是物聯(lián)網(wǎng)平臺(tái)中常見的需求和顧慮。直接在單個(gè)定時(shí)器回調(diào)中完成一整個(gè)設(shè)備的所有指令的串行收發(fā),確實(shí)會(huì)因?yàn)橥降却憫?yīng)而阻塞當(dāng)前 Worker 進(jìn)程,影響該進(jìn)程處理其他連接的請(qǐng)求和新連接。
下面我為您詳細(xì)分析并提供解決方案:
理解 GatewayWorker 的非阻塞特性
GatewayWorker (基于 Workerman) 的核心優(yōu)勢(shì)在于其事件驅(qū)動(dòng)和非阻塞 I/O 模型。每個(gè) Worker 進(jìn)程通過 epoll
(Linux) 或類似機(jī)制高效地處理成千上萬的并發(fā)連接。關(guān)鍵在于,任何一個(gè)回調(diào)函數(shù)(如 onMessage
, onConnect
, Timer::add
的回調(diào))都不應(yīng)該執(zhí)行長(zhǎng)時(shí)間的同步阻塞操作。
您顧慮的阻塞問題分析
Timer::add
的回調(diào)中,針對(duì)一個(gè)設(shè)備,執(zhí)行 "發(fā)送指令1 -> (同步等待) -> 收到響應(yīng)1 -> 發(fā)送指令2 -> (同步等待) -> 收到響應(yīng)2 ..." 這樣的邏輯,那么在這個(gè)設(shè)備完成一輪采集的整個(gè)過程中(可能幾秒到十幾秒),執(zhí)行該回調(diào)的那個(gè) Worker 進(jìn)程將無法處理任何其他事情,包括其他已連接設(shè)備的消息、新設(shè)備的連接請(qǐng)求等。解決方案:異步化指令收發(fā)
核心思想是將“發(fā)送 -> 等待響應(yīng) -> 發(fā)送下一條”這個(gè)過程異步化。每一次“發(fā)送指令”后,?不等待響應(yīng)?,而是記錄下當(dāng)前設(shè)備的狀態(tài)(例如,期望收到的響應(yīng)類型,下一條要發(fā)送的指令等),然后當(dāng)前回調(diào)函數(shù)立即結(jié)束,讓 Worker 進(jìn)程可以去處理其他事件。當(dāng)設(shè)備響應(yīng)到達(dá)時(shí),在 onMessage
事件中根據(jù)之前記錄的狀態(tài),發(fā)送下一條指令。
具體實(shí)現(xiàn)步驟 (在 Events.php
中)
假設(shè)我們?yōu)槊總€(gè)連接維護(hù)一個(gè)狀態(tài),表明它當(dāng)前處于指令序列的哪一步。
<?php
use Workerman\Worker;
use Workerman\Timer;
use GatewayWorker\Lib\Gateway;
class Events
{
/**
* 業(yè)務(wù)進(jìn)程啟動(dòng)時(shí)觸發(fā)
* @param \Workerman\Worker $businessWorker
*/
public static function onWorkerStart($businessWorker)
{
// 全局定時(shí)器,例如每5分鐘觸發(fā)一次,用于啟動(dòng)所有設(shè)備的采集周期
Timer::add(300, function() use ($businessWorker) { // 300秒 = 5分鐘
// 獲取所有在線的客戶端 client_id 列表
// 注意:如果設(shè)備非常多,一次性獲取所有 client_id 可能有性能問題,
// 可以考慮分批處理或從持久化存儲(chǔ)中獲取活躍設(shè)備列表
$all_clients = Gateway::getAllClientSessions(); // 或者 Gateway::getAllClientInfo();
foreach ($all_clients as $client_id => $session_data) {
// 這里可以根據(jù) $session_data 判斷設(shè)備類型或是否需要采集
// 例如,可以約定設(shè)備連接后在 session 中設(shè)置一個(gè)標(biāo)識(shí)
// if (isset($session_data['needs_collection']) && $session_data['needs_collection']) {
// 為了演示,我們假設(shè)所有連接都需要采集
// 并且我們使用 connection 對(duì)象來存儲(chǔ)狀態(tài) (更推薦)
// 或者使用 Gateway::setSession($client_id, [...]) 更新會(huì)話
$connection = $businessWorker->connections[$client_id] ?? null; // 獲取連接對(duì)象
if ($connection) {
// 初始化或重置該設(shè)備的采集狀態(tài)
$connection->current_command_index = 0; // 當(dāng)前指令索引
$connection->is_collecting = true; // 標(biāo)記正在采集中
// (可選) 清除可能存在的舊的單指令超時(shí)定時(shí)器
if (isset($connection->command_timeout_timer_id)) {
Timer::del($connection->command_timeout_timer_id);
unset($connection->command_timeout_timer_id);
}
echo "Starting collection for client: {$client_id}\n";
self::sendNextCommand($client_id, $connection);
}
// }
}
});
}
/**
* 當(dāng)客戶端連接時(shí)觸發(fā)
* @param string $client_id 連接id
*/
public static function onConnect($client_id)
{
echo "Client connected: {$client_id}\n";
// 可以在這里初始化設(shè)備的一些信息,例如待采集的指令列表
// Gateway::setSession($client_id, [
// 'device_id' => 'DTU_XYZ', // 假設(shè)設(shè)備會(huì)上報(bào)ID,或根據(jù)client_id查詢
// 'commands' => [
// "MODBUS_CMD_1_HEX", // 指令內(nèi)容,根據(jù)您的協(xié)議定義
// "MODBUS_CMD_2_HEX",
// "MODBUS_CMD_3_HEX"
// ],
// 'current_command_index' => 0,
// 'is_collecting' => false // 初始狀態(tài)為非采集中
// ]);
// 或者直接在 connection 對(duì)象上設(shè)置 (推薦,更靈活)
$connection = Gateway::getConnectionByClientId($client_id);
if ($connection) {
// 假設(shè)這是從數(shù)據(jù)庫(kù)或配置文件加載的指令
$connection->commands = self::getDeviceCommands($client_id);
$connection->current_command_index = 0;
$connection->is_collecting = false;
// 可以考慮連接成功后立即開始一次采集,或者等待全局定時(shí)器
// self::sendNextCommand($client_id, $connection);
}
}
/**
* 獲取設(shè)備的指令列表 (示例)
* @param string $client_id
* @return array
*/
public static function getDeviceCommands($client_id)
{
// 這里應(yīng)該是根據(jù)設(shè)備ID或類型從數(shù)據(jù)庫(kù)/配置加載實(shí)際指令
// 示例指令 (請(qǐng)?zhí)鎿Q為您的真實(shí)指令,可能是二進(jìn)制數(shù)據(jù))
return [
"READ_REG_1", // 指令1的字符串表示或二進(jìn)制數(shù)據(jù)
"READ_REG_2",
"READ_REG_3",
];
}
/**
* 發(fā)送下一條指令的輔助函數(shù)
* @param string $client_id
* @param \Workerman\Connection\TcpConnection $connection (可選,如果外部已獲取)
*/
public static function sendNextCommand($client_id, $connection = null)
{
if (!$connection) {
$connection = Gateway::getConnectionByClientId($client_id);
}
if (!$connection || !isset($connection->commands) || !$connection->is_collecting) {
echo "Client {$client_id} not found or not ready for collection.\n";
if ($connection) $connection->is_collecting = false;
return;
}
$commands = $connection->commands;
$index = $connection->current_command_index;
if ($index < count($commands)) {
$command_to_send = $commands[$index];
echo "Client {$client_id}: Sending command #{$index}: {$command_to_send}\n";
Gateway::sendToClient($client_id, $command_to_send); // 發(fā)送指令
// 記錄已發(fā)送,并更新索引以備下次使用 (在收到響應(yīng)后)
// $connection->current_command_index++; // 注意:這里遞增也可以放到 onMessage 中處理成功后
// 為當(dāng)前指令設(shè)置一個(gè)超時(shí)定時(shí)器 (例如10秒)
// 如果10秒內(nèi)沒有收到響應(yīng),則認(rèn)為這條指令超時(shí)
if (isset($connection->command_timeout_timer_id)) {
Timer::del($connection->command_timeout_timer_id); // 清除上一個(gè)指令的超時(shí)
}
$connection->command_timeout_timer_id = Timer::add(10, function() use ($client_id, $connection, $index) {
echo "Client {$client_id}: Timeout for command #{$index}\n";
// 超時(shí)處理邏輯:
// 1. 記錄錯(cuò)誤
// 2. 決定是重試當(dāng)前指令、跳過當(dāng)前指令,還是中止本輪采集
// 這里我們選擇中止本輪采集,等待下個(gè)大周期
$connection->is_collecting = false;
unset($connection->command_timeout_timer_id); // 清除自身
// 可以選擇發(fā)送下一條指令 (跳過)
// $connection->current_command_index++;
// self::sendNextCommand($client_id, $connection);
}, null, false); // false 表示只執(zhí)行一次
} else {
// 所有指令已發(fā)送完成
echo "Client {$client_id}: All commands sent for this cycle.\n";
$connection->is_collecting = false; // 標(biāo)記本輪采集結(jié)束
$connection->current_command_index = 0; // 重置索引,為下一個(gè)大周期做準(zhǔn)備
if (isset($connection->command_timeout_timer_id)) {
Timer::del($connection->command_timeout_timer_id); // 清除可能存在的最后一條指令的超時(shí)
unset($connection->command_timeout_timer_id);
}
}
}
/**
* 當(dāng)客戶端發(fā)來消息時(shí)觸發(fā)
* @param string $client_id 連接id
* @param mixed $message 完整的消息數(shù)據(jù)(注意:需要處理粘包分包問題)
*/
public static function onMessage($client_id, $message)
{
$connection = Gateway::getConnectionByClientId($client_id);
if (!$connection || !$connection->is_collecting) {
// 非采集期間收到的消息,或連接已不存在
echo "Client {$client_id}: Received message while not actively collecting or connection lost: {$message}\n";
return;
}
echo "Client {$client_id}: Received response: {$message}\n";
// 1. 清除當(dāng)前指令的超時(shí)定時(shí)器
if (isset($connection->command_timeout_timer_id)) {
Timer::del($connection->command_timeout_timer_id);
unset($connection->command_timeout_timer_id);
}
// 2. 解析響應(yīng) $message (判斷是否是期望的響應(yīng),是否成功等)
// 這部分邏輯高度依賴于您的通信協(xié)議 (如MODBUS)
// 例如: isValidResponse($message, $connection->commands[$connection->current_command_index])
// 3. 如果響應(yīng)有效且是當(dāng)前指令的響應(yīng)
// (為簡(jiǎn)化,此處假設(shè)任何收到的消息都是對(duì)上一條指令的有效響應(yīng))
$connection->current_command_index++; // 移動(dòng)到下一條指令
// 4. 發(fā)送下一條指令
self::sendNextCommand($client_id, $connection);
}
/**
* 當(dāng)用戶斷開連接時(shí)觸發(fā)
* @param string $client_id 連接id
*/
public static function onClose($client_id)
{
echo "Client closed: {$client_id}\n";
// 清理工作,例如清除與該連接相關(guān)的定時(shí)器
$connection = Gateway::getConnectionByClientId($client_id); // onClose 時(shí) $connection 對(duì)象可能已被清理
// 但我們這里是為了嘗試獲取其上的自定義屬性
if ($connection && isset($connection->command_timeout_timer_id)) {
Timer::del($connection->command_timeout_timer_id);
// unset($connection->command_timeout_timer_id); // $connection 對(duì)象會(huì)被銷毀,屬性自然消失
}
// Gateway::setSession($client_id, null); // 清理會(huì)話(如果用了 session)
}
}
代碼邏輯說明:
onWorkerStart
?:
current_command_index = 0
, is_collecting = true
)。sendNextCommand()
開始發(fā)送該設(shè)備的第一條指令。onConnect
?:
$connection->commands
) 并初始化狀態(tài)。getDeviceCommands($client_id)
?:
sendNextCommand($client_id, $connection)
?:
$connection->current_command_index < count($connection->commands)
).Gateway::sendToClient()
)。Timer::add(..., ..., ..., false)
). 如果在指定時(shí)間內(nèi)(如10秒)沒有收到響應(yīng),這個(gè)定時(shí)器會(huì)觸發(fā),您可以處理超時(shí)邏輯(記錄錯(cuò)誤、嘗試重發(fā)、或中止本輪采集)。$connection->is_collecting = false
),并重置指令索引,等待下一個(gè)大的采集周期。onMessage($client_id, $message)
?:
Timer::del()
)。$connection->current_command_index
) 加 1。sendNextCommand()
繼續(xù)發(fā)送序列中的下一條指令。onClose($client_id)
?:
優(yōu)勢(shì):
Timer::add
的回調(diào)和 onMessage
的回調(diào)都應(yīng)該快速執(zhí)行完畢。$connection->current_command_index
, $connection->is_collecting
)來維護(hù)每個(gè)設(shè)備的狀態(tài),清晰明了。重要注意事項(xiàng):
onMessage
中正確處理消息的邊界。通常在 GatewayWorker 的 BusinessWorker
中設(shè)置 $worker->protocol
來指定一個(gè)協(xié)議類處理。例如,MODBUS RTU over TCP 通常每條消息有明確的長(zhǎng)度或結(jié)束符,您可以自定義協(xié)議來解決。current_command_index
)會(huì)丟失。如果需要更強(qiáng)的持久性,可以考慮在關(guān)鍵狀態(tài)變化時(shí)(如每發(fā)送一條指令后)將狀態(tài)同步到 Redis 或數(shù)據(jù)庫(kù),但這會(huì)增加復(fù)雜性和開銷。對(duì)于大多數(shù)物聯(lián)網(wǎng)采集場(chǎng)景,如果進(jìn)程重啟,等待下一個(gè)大的采集周期重新開始通常是可以接受的。Gateway::getSession
vs $connection
對(duì)象屬性?:
Gateway::setSession
和 Gateway::getSession
是通過 Gateway 進(jìn)程周轉(zhuǎn)的,會(huì)有一定的 IPC 開銷。$connection
對(duì)象上設(shè)置自定義屬性 (如 $connection->commands = ...;
) 更高效,因?yàn)檫@些屬性直接存在于 BusinessWorker 進(jìn)程內(nèi)存中。在 Events.php
的方法中,通常可以通過 $worker->connections[$client_id]
或 Gateway::getConnectionByClientId($client_id)
來獲取連接對(duì)象。這個(gè)異步化的方案能夠很好地解決您提出的顧慮,并確保 GatewayWorker 平臺(tái)在高并發(fā)物聯(lián)網(wǎng)場(chǎng)景下的性能和響應(yīng)能力。