如果我正常運(yùn)行訂閱 沒有轉(zhuǎn)發(fā) 到隊(duì)列 就不會出現(xiàn)內(nèi)存泄漏 如果我轉(zhuǎn)發(fā)到隊(duì)列就會出現(xiàn)內(nèi)存泄漏為啥呢
class Subscribe extends Worker
{
static $taskName = 'subscribe';
static $apiConfig;
static $queue;
public function __construct($socket_name = '', array $context_option = array())
{
parent::__construct($socket_name, $context_option);
}
public function on_start($worker)
{
global $db_config;
Db::setConfig($db_config);
global $redis_config;
self::$queue = new Client($redis_config['queue']['host'],$redis_config['queue']['options']);
Utils::echoLogs(self::$taskName.' 啟動成功');
// 作為客戶端鏈接WS
self::subscribe($list);
}
public function on_stop($worker)
{
Utils::echoLogs(self::$taskName.' 已停止');
}
public function subscribe($data)
{
$con = new AsyncTcpConnection(self::$apiConfig['spot_ws']);
$con->transport = 'ssl';
$con->websocketPingInterval = 55;
$con->onMessage = function ($con, $data) {
// 正常接收Data數(shù)據(jù)是長期穩(wěn)定
// 但是如果我將消息數(shù)據(jù)組裝成新的數(shù)組 newData 轉(zhuǎn)發(fā)到隊(duì)列就會出現(xiàn)內(nèi)存泄漏
self::$queue->send('data_queue', $newData);
};
$con->onError = function ($con, $err_code, $err_msg) {
};
$con->onClose = function ($con) {
$con->reConnect(1);
};
$con->connect();
}
public function run()
{
$this->onWorkerStart = array($this, 'on_start');
$this->onWorkerStop = array($this, 'on_stop');
parent::run();
}
}
Worker[2584] process terminated with ERROR: E_ERROR "Allowed memory size of 536870912 bytes exhausted (tried to allocate 69632 bytes) in /mnt/wss/vendor/workerman/workerman/Connection/TcpConnection.php on line 582"
Workerman\RedisQueue\Client 是異步的,消息會先在本地內(nèi)存里存儲,然后一條一條發(fā)給Redis服務(wù)端。如果產(chǎn)生消息的速度大于redis接收速度(或者redis連不上等),消息就會擠壓,占用內(nèi)存就越來越大。這個其實(shí)也不叫內(nèi)存泄漏。
你可以用同步方式發(fā)給redis,這樣就不占內(nèi)存了,參考 http://m.wtbis.cn/doc/workerman/components/workerman-redis-queue.html#%E5%9C%A8%E9%9D%9Eworkerman%E7%8E%AF%E5%A2%83%E5%90%91%E9%98%9F%E5%88%97%E5%8F%91%E9%80%81%E6%B6%88%E6%81%AF
如果是webman項(xiàng)目,直接用 Redis隊(duì)列同步投遞接口