環(huán)境什么都相同,只是更換了一臺服務(wù)器,運(yùn)行則出現(xiàn)這個問題,原服務(wù)器跟這個一樣的。
SendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready
--------------------start_gateway.php--------------------
<?php
require_once __DIR__ . '/vendor/autoload.php';
use GatewayWorker\Gateway;
use Workerman\Worker;
// 初始化 Gateway
$gateway = new Gateway("websocket://0.0.0.0:3301");
// 設(shè)置進(jìn)程數(shù)
$gateway->count = 4;
// 設(shè)置名稱
$gateway->name = 'WebSocketGateway';
// 設(shè)置心跳檢測
$gateway->pingInterval = 30;
$gateway->pingNotResponseLimit = 1;
--------------------start_businessworker.php--------------------
<?php
require_once __DIR__ . '/vendor/autoload.php';
use GatewayWorker\BusinessWorker;
use Workerman\Worker;
// 初始化 BusinessWorker
$worker = new BusinessWorker();
// 設(shè)置名稱
$worker->name = 'WebSocketBusinessWorker';
// 設(shè)置進(jìn)程數(shù)
$worker->count = 4;
// 設(shè)置事件處理類
$worker->eventHandler = 'Events';
--------------------start_http.php--------------------
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\Response;
use GatewayWorker\Lib\Gateway;
// 創(chuàng)建 Redis 客戶端
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 創(chuàng)建一個 HTTP 服務(wù)器,監(jiān)聽 3300 端口
$httpWorker = new Worker('http://0.0.0.0:3300');
// 監(jiān)聽 HTTP 請求事件
$httpWorker->onMessage = function (TcpConnection $connection, Request $request) use ($redis) {
// 檢查請求方法是否為 POST
if ($request->method() === 'POST') {
// 獲取請求體中的 JSON 數(shù)據(jù)
$data = json_decode($request->rawBody(), true);
// 檢查 JSON 數(shù)據(jù)是否解析成功
if (json_last_error() === JSON_ERROR_NONE) {
// 獲取請求路徑(例如 /pudm)
$path = $request->path();
// 從 Redis 中獲取該路徑對應(yīng)的客戶端 ID
$clientIds = $redis->get($path);
if (!empty($clientIds)) {
Gateway::sendToGroup($path, $request->rawBody());
// 返回成功響應(yīng)
$connection->send(new Response(200, [], json_encode([
'status' => 'success',
'message' => 'Message forwarded to WebSocket clients'
])));
} else {
// 沒有客戶端連接該路徑
$connection->send(new Response(404, [], json_encode([
'status' => 'error',
'message' => 'No clients connected to this path'
])));
}
} else {
// JSON 解析失敗
$connection->send(new Response(400, [], json_encode([
'status' => 'error',
'message' => 'Invalid JSON data'
])));
}
} else {
// 非 POST 請求
$connection->send(new Response(405, [], json_encode([
'status' => 'error',
'message' => 'Only POST requests are allowed'
])));
}
};
--------------------start_register.php--------------------
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
// 初始化 Register
$register = new Worker('text://0.0.0.0:1237');
// 設(shè)置名稱
$register->name = 'Register';
--------------------Events.php--------------------
<?php
require_once __DIR__ . '/vendor/autoload.php';
use GatewayWorker\Lib\Gateway;
use Workerman\Connection\TcpConnection;
// 創(chuàng)建 Redis 客戶端
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
class Events
{
// 當(dāng)客戶端連接時觸發(fā)
public static function onConnect($clientId)
{
echo "Client {$clientId} connected.\n";
}
// 當(dāng)客戶端發(fā)送消息時觸發(fā)
public static function onMessage($clientId, $message)
{
echo "Received message from client {$clientId}: {$message}\n";
Gateway::sendToAll($message, null, $clientId);
}
// 當(dāng)客戶端斷開連接時觸發(fā)
public static function onClose($clientId)
{
echo "Client {$clientId} disconnected.\n";
// 從 Redis 中移除該客戶端 ID
global $redis;
$paths = $redis->keys('*');
foreach ($paths as $path) {
$redis->sRem($path, $clientId);
}
}
// 當(dāng) WebSocket 握手成功時觸發(fā)
public static function onWebSocketConnect($clientId, $httpHeader)
{
// 從 HTTP 請求頭中提取路徑
$path = $httpHeader['server']['REQUEST_URI'];
Gateway::joinGroup($clientId, $path);
global $redis;
$redis->set($path, $clientId);
}
// 從 HTTP 請求頭中提取路徑
private static function extractPathFromHeader($httpHeader)
{
// 查找路徑的開始位置
$start = strpos($httpHeader, 'GET /') + 4;
// 查找路徑的結(jié)束位置
$end = strpos($httpHeader, ' HTTP/1.1');
// 提取路徑
return substr($httpHeader, $start, $end - $start);
}
}
PHP版本:8.0.26
--------------------Events.php--------------------
<?php
require_once __DIR__ . '/vendor/autoload.php';
use GatewayWorker\Lib\Gateway;
use Workerman\Worker;
class Events
{
// 當(dāng) Worker 進(jìn)程啟動時觸發(fā)
public static function onWorkerStart($worker)
{
// 創(chuàng)建 Redis 客戶端
global $redis;
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
echo "Redis connected.\n";
}
// 當(dāng)客戶端連接時觸發(fā)
public static function onConnect($clientId)
{
echo "Client {$clientId} connected.\n";
}
// 當(dāng)客戶端發(fā)送消息時觸發(fā)
public static function onMessage($clientId, $message)
{
echo "Received message from client {$clientId}: {$message}\n";
Gateway::sendToAll($message, null, $clientId);
}
// 當(dāng)客戶端斷開連接時觸發(fā)
public static function onClose($clientId)
{
echo "Client {$clientId} disconnected.\n";
// 從 Redis 中移除該客戶端 ID
global $redis;
$paths = $redis->keys('*');
foreach ($paths as $path) {
$redis->sRem($path, $clientId);
}
}
// 當(dāng) WebSocket 握手成功時觸發(fā)
public static function onWebSocketConnect($clientId, $httpHeader)
{
// 從 HTTP 請求頭中提取路徑
$path = $httpHeader['server']['REQUEST_URI'];
Gateway::joinGroup($clientId, $path);
var_dump(Gateway::getClientIdCountByGroup($path));
// 將客戶端 ID 存儲到 Redis 中
global $redis;
$redis->sAdd($path, $clientId);
}
}
--------------------start_gateway.php--------------------
<?php
require_once __DIR__ . '/vendor/autoload.php';
use GatewayWorker\Gateway;
use Workerman\Worker;
// 初始化 Gateway
$gateway = new Gateway("websocket://0.0.0.0:3301");
// 設(shè)置進(jìn)程數(shù)
$gateway->count = 2;
// 設(shè)置名稱
$gateway->name = 'WebSocketGateway';
// 設(shè)置心跳檢測
$gateway->pingInterval = 30;
$gateway->pingNotResponseLimit = 1;
$gateway->registerAddress = '127.0.0.1:1237';
--------------------start_businessworker.php--------------------
<?php
require_once __DIR__ . '/vendor/autoload.php';
use GatewayWorker\BusinessWorker;
use Workerman\Worker;
// 初始化 BusinessWorker
$worker = new BusinessWorker();
// 設(shè)置名稱
$worker->name = 'WebSocketBusinessWorker';
// 設(shè)置進(jìn)程數(shù)
$worker->count = 2;
// 設(shè)置事件處理類
$worker->eventHandler = 'Events';
$worker->registerAddress = '127.0.0.1:1237';
--------------------start_http.php--------------------
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\Response;
use GatewayWorker\Lib\Gateway;
// 創(chuàng)建一個 HTTP 服務(wù)器,監(jiān)聽 3300 端口
$httpWorker = new Worker('http://0.0.0.0:3300');
// 設(shè)置 Worker 進(jìn)程數(shù)
$httpWorker->count = 4;
// 當(dāng) Worker 進(jìn)程啟動時觸發(fā)
$httpWorker->onWorkerStart = function ($worker) {
// 創(chuàng)建 Redis 客戶端
global $redis;
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
echo "Redis connected in Worker {$worker->id}.\n";
};
// 監(jiān)聽 HTTP 請求事件
$httpWorker->onMessage = function (TcpConnection $connection, Request $request) {
global $redis;
// 檢查請求方法是否為 POST
if ($request->method() === 'POST') {
// 獲取請求體中的 JSON 數(shù)據(jù)
$data = json_decode($request->rawBody(), true);
// 檢查 JSON 數(shù)據(jù)是否解析成功
if (json_last_error() === JSON_ERROR_NONE) {
// 獲取請求路徑(例如 /pudm)
$path = $request->path();
// 從 Redis 中獲取該路徑對應(yīng)的客戶端 ID
$clientIds = $redis->get($path);
if (!empty($clientIds)) {
// 轉(zhuǎn)發(fā)消息給 WebSocket 客戶端
Gateway::sendToGroup($path, $request->rawBody());
// 返回成功響應(yīng)
$connection->send(new Response(200, [], json_encode([
'status' => 'success',
'message' => 'Message forwarded to WebSocket clients'
])));
} else {
// 沒有客戶端連接該路徑
$connection->send(new Response(404, [], json_encode([
'status' => 'error',
'message' => 'No clients connected to this path'
])));
}
} else {
// JSON 解析失敗
$connection->send(new Response(400, [], json_encode([
'status' => 'error',
'message' => 'Invalid JSON data'
])));
}
} else {
// 非 POST 請求
$connection->send(new Response(405, [], json_encode([
'status' => 'error',
'message' => 'Only POST requests are allowed'
])));
}
};