国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

GatewayWorker做物聯(lián)網(wǎng)循環(huán)采集數(shù)據(jù)問題

cxk

問題描述:??

我正在使用GatewayWorker開發(fā)一個(gè)物聯(lián)網(wǎng)平臺(tái),主要功能是作為TCP服務(wù)端與DTU設(shè)備(客戶端)建立連接并進(jìn)行通信。現(xiàn)在遇到一個(gè)需求:

需求場(chǎng)景??

每個(gè)設(shè)備連接后,服務(wù)端需要??定時(shí)循環(huán)下發(fā)一組指令??(例如MODBUS協(xié)議采集指令)。

??下發(fā)邏輯??:必須??逐條發(fā)送??,即發(fā)送一條指令 → 等待設(shè)備返回響應(yīng) → 再發(fā)送下一條,直到當(dāng)前設(shè)備的所有指令發(fā)送完成 → 等待下一次循環(huán)開始。

??循環(huán)要求??:整個(gè)過程需要周期性重復(fù)(例如每5分鐘一輪)。

當(dāng)前顧慮??

如果直接用GatewayWorker的定時(shí)器(如Timer::add()),每一輪采集需要等待N次指令交互(發(fā)送+響應(yīng)),??耗時(shí)較長(zhǎng)??(可能幾秒到十幾秒)。

??擔(dān)心阻塞問題??:
如果??新設(shè)備嘗試連接??,是否會(huì)因?yàn)閃orker進(jìn)程被占用而無法及時(shí)處理?
高并發(fā)場(chǎng)景下(例如100個(gè)設(shè)備),這種串行化設(shè)計(jì)是否會(huì)導(dǎo)致性能瓶頸?

希望有經(jīng)驗(yàn)的朋友能分享優(yōu)化思路,感謝!

297 1 4
1個(gè)回答

xiaobai

你設(shè)備上線后,綁定了uid以后,就是誰發(fā)誰的,接受的時(shí)候直接處理就行了吧

  • cxk 2025-05-11

    這是谷歌給的回答,應(yīng)該是完美回答了,給后來的朋友參考下:
    您好!您提出的這個(gè)問題非常經(jīng)典,是物聯(lián)網(wǎng)平臺(tái)中常見的需求和顧慮。直接在單個(gè)定時(shí)器回調(diào)中完成一整個(gè)設(shè)備的所有指令的串行收發(fā),確實(shí)會(huì)因?yàn)橥降却憫?yīng)而阻塞當(dāng)前 Worker 進(jìn)程,影響該進(jìn)程處理其他連接的請(qǐng)求和新連接。

    下面我為您詳細(xì)分析并提供解決方案:

    理解 GatewayWorker 的非阻塞特性

    GatewayWorker (基于 Workerman) 的核心優(yōu)勢(shì)在于其事件驅(qū)動(dòng)和非阻塞 I/O 模型。每個(gè) Worker 進(jìn)程通過 epoll (Linux) 或類似機(jī)制高效地處理成千上萬的并發(fā)連接。關(guān)鍵在于,任何一個(gè)回調(diào)函數(shù)(如 onMessage, onConnect, Timer::add 的回調(diào))都不應(yīng)該執(zhí)行長(zhǎng)時(shí)間的同步阻塞操作。

    您顧慮的阻塞問題分析

    1. ?Worker 進(jìn)程被占用?:
      • 如果您在 Timer::add 的回調(diào)中,針對(duì)一個(gè)設(shè)備,執(zhí)行 "發(fā)送指令1 -> (同步等待) -> 收到響應(yīng)1 -> 發(fā)送指令2 -> (同步等待) -> 收到響應(yīng)2 ..." 這樣的邏輯,那么在這個(gè)設(shè)備完成一輪采集的整個(gè)過程中(可能幾秒到十幾秒),執(zhí)行該回調(diào)的那個(gè) Worker 進(jìn)程將無法處理任何其他事情,包括其他已連接設(shè)備的消息、新設(shè)備的連接請(qǐng)求等。
      • ?后果?:如果只有一個(gè) Worker 進(jìn)程,整個(gè)服務(wù)都會(huì)卡住。如果有多個(gè) Worker 進(jìn)程,那么被占用的那個(gè) Worker 進(jìn)程會(huì)卡住,其他 Worker 進(jìn)程可以繼續(xù)工作,但新連接可能會(huì)被分配到卡住的 Worker 上,導(dǎo)致連接超時(shí)。
    2. ?高并發(fā)性能瓶頸?:
      • 假設(shè)有 4 個(gè) Worker 進(jìn)程,100 個(gè)設(shè)備。如果每個(gè)設(shè)備采集耗時(shí) 10 秒。
      • ?理想情況(非阻塞)?:4 個(gè) Worker 可以并發(fā)處理這 100 個(gè)設(shè)備的指令收發(fā)和響應(yīng)。
      • ?阻塞情況?:如果采用上述同步等待邏輯,一個(gè) Worker 在同一時(shí)間只能處理一個(gè)設(shè)備的完整采集流程。那么 4 個(gè) Worker 最多同時(shí)處理 4 個(gè)設(shè)備。其他 96 個(gè)設(shè)備就需要排隊(duì)等待,導(dǎo)致采集延遲非常大,甚至無法按時(shí)完成。

    解決方案:異步化指令收發(fā)

    核心思想是將“發(fā)送 -> 等待響應(yīng) -> 發(fā)送下一條”這個(gè)過程異步化。每一次“發(fā)送指令”后,?不等待響應(yīng)?,而是記錄下當(dāng)前設(shè)備的狀態(tài)(例如,期望收到的響應(yīng)類型,下一條要發(fā)送的指令等),然后當(dāng)前回調(diào)函數(shù)立即結(jié)束,讓 Worker 進(jìn)程可以去處理其他事件。當(dāng)設(shè)備響應(yīng)到達(dá)時(shí),在 onMessage 事件中根據(jù)之前記錄的狀態(tài),發(fā)送下一條指令。

    具體實(shí)現(xiàn)步驟 (在 Events.php 中)

    假設(shè)我們?yōu)槊總€(gè)連接維護(hù)一個(gè)狀態(tài),表明它當(dāng)前處于指令序列的哪一步。

    <?php
    use Workerman\Worker;
    use Workerman\Timer;
    use GatewayWorker\Lib\Gateway;
    
    class Events
    {
        /**
         * 業(yè)務(wù)進(jìn)程啟動(dòng)時(shí)觸發(fā)
         * @param \Workerman\Worker $businessWorker
         */
        public static function onWorkerStart($businessWorker)
        {
            // 全局定時(shí)器,例如每5分鐘觸發(fā)一次,用于啟動(dòng)所有設(shè)備的采集周期
            Timer::add(300, function() use ($businessWorker) { // 300秒 = 5分鐘
                // 獲取所有在線的客戶端 client_id 列表
                // 注意:如果設(shè)備非常多,一次性獲取所有 client_id 可能有性能問題,
                // 可以考慮分批處理或從持久化存儲(chǔ)中獲取活躍設(shè)備列表
                $all_clients = Gateway::getAllClientSessions(); // 或者 Gateway::getAllClientInfo();
    
                foreach ($all_clients as $client_id => $session_data) {
                    // 這里可以根據(jù) $session_data 判斷設(shè)備類型或是否需要采集
                    // 例如,可以約定設(shè)備連接后在 session 中設(shè)置一個(gè)標(biāo)識(shí)
                    // if (isset($session_data['needs_collection']) && $session_data['needs_collection']) {
    
                    // 為了演示,我們假設(shè)所有連接都需要采集
                    // 并且我們使用 connection 對(duì)象來存儲(chǔ)狀態(tài) (更推薦)
                    // 或者使用 Gateway::setSession($client_id, [...]) 更新會(huì)話
    
                    $connection = $businessWorker->connections[$client_id] ?? null; // 獲取連接對(duì)象
                    if ($connection) {
                        // 初始化或重置該設(shè)備的采集狀態(tài)
                        $connection->current_command_index = 0; // 當(dāng)前指令索引
                        $connection->is_collecting = true;      // 標(biāo)記正在采集中
                        // (可選) 清除可能存在的舊的單指令超時(shí)定時(shí)器
                        if (isset($connection->command_timeout_timer_id)) {
                            Timer::del($connection->command_timeout_timer_id);
                            unset($connection->command_timeout_timer_id);
                        }
    
                        echo "Starting collection for client: {$client_id}\n";
                        self::sendNextCommand($client_id, $connection);
                    }
                    // }
                }
            });
        }
    
        /**
         * 當(dāng)客戶端連接時(shí)觸發(fā)
         * @param string $client_id 連接id
         */
        public static function onConnect($client_id)
        {
            echo "Client connected: {$client_id}\n";
            // 可以在這里初始化設(shè)備的一些信息,例如待采集的指令列表
            // Gateway::setSession($client_id, [
            //     'device_id' => 'DTU_XYZ', // 假設(shè)設(shè)備會(huì)上報(bào)ID,或根據(jù)client_id查詢
            //     'commands' => [
            //         "MODBUS_CMD_1_HEX", // 指令內(nèi)容,根據(jù)您的協(xié)議定義
            //         "MODBUS_CMD_2_HEX",
            //         "MODBUS_CMD_3_HEX"
            //     ],
            //     'current_command_index' => 0,
            //     'is_collecting' => false // 初始狀態(tài)為非采集中
            // ]);
    
            // 或者直接在 connection 對(duì)象上設(shè)置 (推薦,更靈活)
            $connection = Gateway::getConnectionByClientId($client_id);
            if ($connection) {
                // 假設(shè)這是從數(shù)據(jù)庫(kù)或配置文件加載的指令
                $connection->commands = self::getDeviceCommands($client_id);
                $connection->current_command_index = 0;
                $connection->is_collecting = false;
                // 可以考慮連接成功后立即開始一次采集,或者等待全局定時(shí)器
                // self::sendNextCommand($client_id, $connection);
            }
        }
    
        /**
         * 獲取設(shè)備的指令列表 (示例)
         * @param string $client_id
         * @return array
         */
        public static function getDeviceCommands($client_id)
        {
            // 這里應(yīng)該是根據(jù)設(shè)備ID或類型從數(shù)據(jù)庫(kù)/配置加載實(shí)際指令
            // 示例指令 (請(qǐng)?zhí)鎿Q為您的真實(shí)指令,可能是二進(jìn)制數(shù)據(jù))
            return [
                "READ_REG_1", // 指令1的字符串表示或二進(jìn)制數(shù)據(jù)
                "READ_REG_2",
                "READ_REG_3",
            ];
        }
    
        /**
         * 發(fā)送下一條指令的輔助函數(shù)
         * @param string $client_id
         * @param \Workerman\Connection\TcpConnection $connection (可選,如果外部已獲取)
         */
        public static function sendNextCommand($client_id, $connection = null)
        {
            if (!$connection) {
                $connection = Gateway::getConnectionByClientId($client_id);
            }
    
            if (!$connection || !isset($connection->commands) || !$connection->is_collecting) {
                echo "Client {$client_id} not found or not ready for collection.\n";
                if ($connection) $connection->is_collecting = false;
                return;
            }
    
            $commands = $connection->commands;
            $index = $connection->current_command_index;
    
            if ($index < count($commands)) {
                $command_to_send = $commands[$index];
                echo "Client {$client_id}: Sending command #{$index}: {$command_to_send}\n";
                Gateway::sendToClient($client_id, $command_to_send); // 發(fā)送指令
    
                // 記錄已發(fā)送,并更新索引以備下次使用 (在收到響應(yīng)后)
                // $connection->current_command_index++; // 注意:這里遞增也可以放到 onMessage 中處理成功后
    
                // 為當(dāng)前指令設(shè)置一個(gè)超時(shí)定時(shí)器 (例如10秒)
                // 如果10秒內(nèi)沒有收到響應(yīng),則認(rèn)為這條指令超時(shí)
                if (isset($connection->command_timeout_timer_id)) {
                    Timer::del($connection->command_timeout_timer_id); // 清除上一個(gè)指令的超時(shí)
                }
                $connection->command_timeout_timer_id = Timer::add(10, function() use ($client_id, $connection, $index) {
                    echo "Client {$client_id}: Timeout for command #{$index}\n";
                    // 超時(shí)處理邏輯:
                    // 1. 記錄錯(cuò)誤
                    // 2. 決定是重試當(dāng)前指令、跳過當(dāng)前指令,還是中止本輪采集
                    // 這里我們選擇中止本輪采集,等待下個(gè)大周期
                    $connection->is_collecting = false;
                    unset($connection->command_timeout_timer_id); // 清除自身
                    // 可以選擇發(fā)送下一條指令 (跳過)
                    // $connection->current_command_index++;
                    // self::sendNextCommand($client_id, $connection);
                }, null, false); // false 表示只執(zhí)行一次
    
            } else {
                // 所有指令已發(fā)送完成
                echo "Client {$client_id}: All commands sent for this cycle.\n";
                $connection->is_collecting = false; // 標(biāo)記本輪采集結(jié)束
                $connection->current_command_index = 0; // 重置索引,為下一個(gè)大周期做準(zhǔn)備
                if (isset($connection->command_timeout_timer_id)) {
                    Timer::del($connection->command_timeout_timer_id); // 清除可能存在的最后一條指令的超時(shí)
                    unset($connection->command_timeout_timer_id);
                }
            }
        }
    
        /**
         * 當(dāng)客戶端發(fā)來消息時(shí)觸發(fā)
         * @param string $client_id 連接id
         * @param mixed $message 完整的消息數(shù)據(jù)(注意:需要處理粘包分包問題)
         */
        public static function onMessage($client_id, $message)
        {
            $connection = Gateway::getConnectionByClientId($client_id);
            if (!$connection || !$connection->is_collecting) {
                // 非采集期間收到的消息,或連接已不存在
                echo "Client {$client_id}: Received message while not actively collecting or connection lost: {$message}\n";
                return;
            }
    
            echo "Client {$client_id}: Received response: {$message}\n";
    
            // 1. 清除當(dāng)前指令的超時(shí)定時(shí)器
            if (isset($connection->command_timeout_timer_id)) {
                Timer::del($connection->command_timeout_timer_id);
                unset($connection->command_timeout_timer_id);
            }
    
            // 2. 解析響應(yīng) $message (判斷是否是期望的響應(yīng),是否成功等)
            //    這部分邏輯高度依賴于您的通信協(xié)議 (如MODBUS)
            //    例如: isValidResponse($message, $connection->commands[$connection->current_command_index])
    
            // 3. 如果響應(yīng)有效且是當(dāng)前指令的響應(yīng)
            // (為簡(jiǎn)化,此處假設(shè)任何收到的消息都是對(duì)上一條指令的有效響應(yīng))
            $connection->current_command_index++; // 移動(dòng)到下一條指令
    
            // 4. 發(fā)送下一條指令
            self::sendNextCommand($client_id, $connection);
        }
    
        /**
         * 當(dāng)用戶斷開連接時(shí)觸發(fā)
         * @param string $client_id 連接id
         */
        public static function onClose($client_id)
        {
            echo "Client closed: {$client_id}\n";
            // 清理工作,例如清除與該連接相關(guān)的定時(shí)器
            $connection = Gateway::getConnectionByClientId($client_id); // onClose 時(shí) $connection 對(duì)象可能已被清理
                                                                      // 但我們這里是為了嘗試獲取其上的自定義屬性
            if ($connection && isset($connection->command_timeout_timer_id)) {
                Timer::del($connection->command_timeout_timer_id);
                // unset($connection->command_timeout_timer_id); // $connection 對(duì)象會(huì)被銷毀,屬性自然消失
            }
            // Gateway::setSession($client_id, null); // 清理會(huì)話(如果用了 session)
        }
    }

    代碼邏輯說明:

    1. ?onWorkerStart?:
      • 設(shè)置一個(gè)全局周期性定時(shí)器(例如每5分鐘)。
      • 此定時(shí)器到期后,遍歷所有連接的客戶端。
      • 為每個(gè)需要采集的客戶端初始化/重置采集狀態(tài)(如 current_command_index = 0, is_collecting = true)。
      • 調(diào)用 sendNextCommand() 開始發(fā)送該設(shè)備的第一條指令。
    2. ?onConnect?:
      • 當(dāng)設(shè)備連接時(shí),可以為其加載指令列表 (如 $connection->commands) 并初始化狀態(tài)。
      • 您可以選擇在此處立即開始一次采集,或者等待全局定時(shí)器的調(diào)度。
    3. ?getDeviceCommands($client_id)?:
      • 這是一個(gè)輔助函數(shù),用于根據(jù)設(shè)備標(biāo)識(shí)獲取其應(yīng)答的指令列表。實(shí)際應(yīng)用中,這可能來自數(shù)據(jù)庫(kù)或配置文件。
    4. ?sendNextCommand($client_id, $connection)?:
      • 這是核心的指令發(fā)送控制函數(shù)。
      • 檢查當(dāng)前設(shè)備是否還有未發(fā)送的指令 ($connection->current_command_index < count($connection->commands)).
      • 如果有,則發(fā)送當(dāng)前指令 (Gateway::sendToClient())。
      • ?關(guān)鍵?:發(fā)送后,為這條指令設(shè)置一個(gè)單次執(zhí)行的超時(shí)定時(shí)器 (Timer::add(..., ..., ..., false)). 如果在指定時(shí)間內(nèi)(如10秒)沒有收到響應(yīng),這個(gè)定時(shí)器會(huì)觸發(fā),您可以處理超時(shí)邏輯(記錄錯(cuò)誤、嘗試重發(fā)、或中止本輪采集)。
      • 如果所有指令都發(fā)送完畢,則標(biāo)記本輪采集結(jié)束 ($connection->is_collecting = false),并重置指令索引,等待下一個(gè)大的采集周期。
    5. ?onMessage($client_id, $message)?:
      • 當(dāng)收到設(shè)備響應(yīng)時(shí):
        • 首先清除之前為該指令設(shè)置的超時(shí)定時(shí)器 (Timer::del())。
        • 解析響應(yīng)數(shù)據(jù),判斷是否正確、完整。
        • 如果響應(yīng)有效,將當(dāng)前指令索引 ($connection->current_command_index) 加 1。
        • 調(diào)用 sendNextCommand() 繼續(xù)發(fā)送序列中的下一條指令。
    6. ?onClose($client_id)?:
      • 設(shè)備斷開連接時(shí),清理可能存在的單指令超時(shí)定時(shí)器。

    優(yōu)勢(shì):

    • ?非阻塞?:Worker 進(jìn)程在發(fā)送指令或等待響應(yīng)時(shí)不會(huì)被阻塞。Timer::add 的回調(diào)和 onMessage 的回調(diào)都應(yīng)該快速執(zhí)行完畢。
    • ?高并發(fā)?:每個(gè) Worker 進(jìn)程可以同時(shí)管理數(shù)千個(gè)連接的這種異步指令收發(fā)流程。
    • ?可管理性?:通過連接對(duì)象的屬性(如 $connection->current_command_index, $connection->is_collecting)來維護(hù)每個(gè)設(shè)備的狀態(tài),清晰明了。
    • ?超時(shí)控制?:不僅有總的采集周期,還有對(duì)單條指令的超時(shí)控制,增強(qiáng)了健壯性。

    重要注意事項(xiàng):

    • ?粘包/分包?:TCP是流式協(xié)議,您需要確保在 onMessage 中正確處理消息的邊界。通常在 GatewayWorker 的 BusinessWorker 中設(shè)置 $worker->protocol 來指定一個(gè)協(xié)議類處理。例如,MODBUS RTU over TCP 通常每條消息有明確的長(zhǎng)度或結(jié)束符,您可以自定義協(xié)議來解決。
    • ?指令格式?:示例中的指令是字符串。實(shí)際應(yīng)用中,它們可能是二進(jìn)制數(shù)據(jù),需要相應(yīng)處理。
    • ?錯(cuò)誤處理和重試?:當(dāng)單條指令超時(shí)或響應(yīng)錯(cuò)誤時(shí),您需要定義詳細(xì)的錯(cuò)誤處理邏輯,例如:記錄日志、重試N次、跳過當(dāng)前指令、或中止整個(gè)設(shè)備的當(dāng)前采集周期。
    • ?狀態(tài)持久化?:如果 Worker 進(jìn)程重啟,內(nèi)存中的連接狀態(tài)(如 current_command_index)會(huì)丟失。如果需要更強(qiáng)的持久性,可以考慮在關(guān)鍵狀態(tài)變化時(shí)(如每發(fā)送一條指令后)將狀態(tài)同步到 Redis 或數(shù)據(jù)庫(kù),但這會(huì)增加復(fù)雜性和開銷。對(duì)于大多數(shù)物聯(lián)網(wǎng)采集場(chǎng)景,如果進(jìn)程重啟,等待下一個(gè)大的采集周期重新開始通常是可以接受的。
    • ?Gateway::getSession vs $connection 對(duì)象屬性?:
      • Gateway::setSessionGateway::getSession 是通過 Gateway 進(jìn)程周轉(zhuǎn)的,會(huì)有一定的 IPC 開銷。
      • 直接在 $connection 對(duì)象上設(shè)置自定義屬性 (如 $connection->commands = ...;) 更高效,因?yàn)檫@些屬性直接存在于 BusinessWorker 進(jìn)程內(nèi)存中。在 Events.php 的方法中,通常可以通過 $worker->connections[$client_id]Gateway::getConnectionByClientId($client_id) 來獲取連接對(duì)象。

    這個(gè)異步化的方案能夠很好地解決您提出的顧慮,并確保 GatewayWorker 平臺(tái)在高并發(fā)物聯(lián)網(wǎng)場(chǎng)景下的性能和響應(yīng)能力。

??