new Server();//Channel的服務(wù)器
$worker1 = new Worker();
$worker1->onStart=function(){
Client::on(xxx,function($msg){
echo "這里不能打印";
}
}
$worker2 = new Worker();
$worker2->onStart = function(){
Client::publish(xxx,data);//這里publish。
}
Worker::runAll();
Client.php
onRemoteMessage 打印,沒有打印到publish的內(nèi)容
2個worker間題。
同個worker內(nèi)正常。
真實環(huán)境是:
worker2用來
$redis->subscribe(xxx,function($instrance, $channel, $msg){
Client::publish($channel, $msg);
}
worker1用
Client::on($channel, function($msg){
// 來處理。因為redis->subscribe是阻塞的,不能在worker1內(nèi)用。worker1要處理其它業(yè)務(wù)的。不能阻塞
}
不能在onWorkerStart里立刻publish,因為另外的進(jìn)程可能還沒有運行到onWorkerStart,
可能還沒運行到Client::on(xxx, ...)
另外兩個worker要在onWorkerStart里調(diào)用下Client::connect('channel_server_ip')下,
1:我看Client::on和Client::publish內(nèi)部實現(xiàn)都有先調(diào)用Client::connect();//我的Server,ip和port都是默認(rèn)的,Client::connect的ip和port也是默認(rèn)的。這看來去應(yīng)該是正常的。
2:不能在onWorkerStart里立刻$redis->publish( Client::publish),這個邏輯我這邊業(yè)務(wù)是允許的。
現(xiàn)在的情況是,worker2->onWorkerStart里$redis->subscribe('xx', function($msg){
Client::publish(xxxx,$msg);//這里要給$worker1->onWorerStart里的Client::on(xxxx,這里處理);
})
看Client::publish是收到打印了。但Server::這邊是沒收到。Client::onRemoteMessage這里也沒收到Client::publish發(fā)來的數(shù)據(jù)。
我請問一下。
當(dāng)onWorkerStart里使用了redis->subscribe,會不會影響 這個回調(diào)里的Client::publish 發(fā)送數(shù)據(jù)到Server
onWorkStart不能redis-subscribe(會導(dǎo)致Client::publish發(fā)不出數(shù)據(jù)),有何辦法解決這個問題嗎?
最簡單的方法是不直接調(diào)用Channel/Client::publish,因為Channel/Client是異步非阻塞的,你的redis-subscribe會阻塞整個進(jìn)程,導(dǎo)致Channel/Client無法處理異步數(shù)據(jù)。你可以把Channel/Client::publish部分代碼抽離出來,手寫代碼用阻塞的方式發(fā)送。
你可以看下Client.php publish方法的代碼,改為用阻塞的方式publish事件。
代碼類似
<?php
function publish($events, $data)
{
$client = stream_socket_client('tcp://channel_server_ip:channel_server_port');
$buffer = serialize(array('type' => 'publish', 'channels'=>(array)$events, 'data' => $data));
// frame協(xié)議格式,頭部4個字節(jié)是包的長度
$all_buffer = pack('N', strlen($buffer)+4).$buffer;
fwrite($client, $all_buffer);
}