場(chǎng)景: 我們系統(tǒng)每個(gè)用戶需要通過ws實(shí)時(shí)同步他們?cè)谀车谌狡脚_(tái)上的數(shù)據(jù)。會(huì)有新用戶產(chǎn)生,產(chǎn)生新用戶時(shí)動(dòng)態(tài)給他建立一個(gè)連接。
我有如下方案,都無法滿足需求
方案1:
寫一個(gè)workerman腳本BTrade.php ,接收cli參數(shù) 用戶id,以id作為worker的name與id, 如:“php BTrade.php start 1272”
問題: 在命令行執(zhí)行一次腳本后,再次執(zhí)行“php BTrade.php start 1273” 會(huì)報(bào)錯(cuò) “Workerman[ BTrade.php] already running”
疑問:是不是一個(gè)腳本只能存在一個(gè)進(jìn)程?
方案2
在一個(gè)腳本里new多個(gè)worker出來,每個(gè)worker維護(hù)一個(gè)用戶的連接。如果有新用戶來了,則重啟腳本讀取新的用戶列表,再建立連接
代碼:
$accounts = $pR->get($aKey); //取所有用戶
foreach ($accounts as $account){
$aM = new UserAccount();
$account = $aM->getAccount($accountId);
var_dump($account['id']);
$worker = new Worker();
$worker->name = 'ws_trade_sync_'.md5($account['id']).'_usdt';
$worker->id = $account['id'];
// 進(jìn)程啟動(dòng)時(shí)
$worker->onWorkerStart = function ($worker) use (
$key,
$account,
$queueTmp
//下面省略....
}
問題:restart腳本雖然能拿到最新用戶,但是會(huì)導(dǎo)致已有的連接斷開 reload能保證不斷開,但是不會(huì)執(zhí)行 $accounts = $pR->get($aKey); 獲取所有用戶?
希望有大佬能指點(diǎn)下是不是我的使用方式是錯(cuò)誤的?或者誰能提供下解決思路。十分感謝!
方案3:一個(gè)用戶一個(gè)腳本,每個(gè)人一個(gè)進(jìn)程? 這樣感覺好low,而且需要不停的動(dòng)態(tài)創(chuàng)建新腳本文件。
希望大佬給與解答,再次感謝!
代碼類似這樣,原理是定時(shí)從redis獲取所有賬戶數(shù)據(jù),然后建立連接,建立連接時(shí)判斷這個(gè)用戶的連接是否已經(jīng)存在,不存在才建立。
代碼未測(cè)試,大致邏輯寫出來了,你調(diào)試下應(yīng)該就能用了
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
global $worker;
$worker = new Worker();
$worker->count = 1;
$worker->onWorkerStart = function($worker)
{
// 在這里建立redis連接
$pR = new YourRedis(....);
// 每秒執(zhí)行一次
Timer::add(1, function() use ($pR) {
// 用來存儲(chǔ)哪些用戶已經(jīng)建立連接
static $connections = [];
$aKey = '....';
$accounts = $pR->get($aKey);
foreach ($accounts as $account){
// 假設(shè)數(shù)據(jù)里有個(gè)唯一的id標(biāo)記用戶
$account_id = $account['id'];
// 判斷這個(gè)用戶是否建立連接
if (isset($connections[$account_id])) {
return;
}
// 通過account的到ws連接地址
$ws_url = get_url_by_acccount($account);
$ws = new AsyncTcpConnection($ws_url);
// 記錄這個(gè)account已經(jīng)建立起連接
$connections[$account_id] = $ws;
// 連接建立時(shí)
$ws->onConnect = function($ws) {
};
// 連接上有數(shù)據(jù)發(fā)來時(shí)
$ws->onMessage = function($ws, $data) {
var_dump($data);
};
// 如果連接斷開1秒后自動(dòng)重連
$ws->onClose = function($ws){
$ws->reconnect(1);
};
$ws->connect();
}
});
};
Worker::runAll();
給你提供另一個(gè)思路,用GatewayWorker來搭一個(gè)服務(wù)端,每個(gè)用戶作為客戶端進(jìn)行鏈接,用id來標(biāo)識(shí)各個(gè)客戶,針對(duì)性的給他們推送數(shù)據(jù)。
Events.php里OnMessage代碼如下.
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://m.wtbis.cn/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
/**
* 用于檢測(cè)業(yè)務(wù)代碼死循環(huán)或者長(zhǎng)時(shí)間阻塞等問題
* 如果發(fā)現(xiàn)業(yè)務(wù)卡死,可以將下面declare打開(去掉//注釋),并執(zhí)行php start.php reload
* 然后觀察一段時(shí)間workerman.log看是否有process_timeout異常
*/
//declare(ticks=1);
use \GatewayWorker\Lib\Gateway;
use Workerman\Lib\Timer;
/**
* 主邏輯
* 主要是處理 onConnect onMessage onClose 三個(gè)方法
* onConnect 和 onClose 如果不需要可以不用實(shí)現(xiàn)并刪除
*/
class Events
{
static $redis;
public static function onWorkerStart($worker){
//初始化redis
$redis = new Redis();
$redis->pconnect('127.0.0.1', 6379);
self::$redis = $redis;
}
/**
* 當(dāng)客戶端連接時(shí)觸發(fā)
* 如果業(yè)務(wù)不需此回調(diào)可以刪除onConnect
*
* @param int $client_id 連接id
*/
public static function onConnect($client_id)
{
}
/**
* 當(dāng)客戶端發(fā)來消息時(shí)觸發(fā)
* @param int $client_id 連接id
* @param mixed $message 具體消息
*/
public static function onMessage($client_id, $message)
{
//首先判斷該websocket是否第一次鏈接
$uid = $_SESSION['uid'];
if(!$uid) //第一次連接
{
//客戶端連接后發(fā)送的message中帶上客戶id標(biāo)識(shí)
//綁定uid,上線
Gateway::bindUid($client_id, $message);
//存?zhèn)€session
$_SESSION['uid'] = $message;
$_SESSION['timerId'] = Timer::add(1, function() use($client_id){
//你的業(yè)務(wù)邏輯
//開啟定時(shí)器給ws客戶端發(fā)送數(shù)據(jù)
}
}
}
/**
* 當(dāng)用戶斷開連接時(shí)觸發(fā)
* @param int $client_id 連接id
*/
public static function onClose($client_id)
{
//斷開的時(shí)候刪掉定時(shí)器
if(isset($_SESSION['timerId']))
{
Timer::del($_SESSION['timerId']);
}
}
public static function onWebSocketConnect($client_id, $data)
{
}
}