用websocket 創(chuàng)建pulsar客戶端 消費(fèi)后發(fā)送 ack 消息后 鏈接就中斷了
public function onWorkerStart()
{
global $consumer, $service;
$domain = 'ws://mqe.tuyacn.com:8285/';
$option = [
'ssl' => array(
// 本地證書(shū)路徑。 必須是 PEM 格式,并且包含本地的證書(shū)及私鑰。
'local_cert' => '/your/path/to/pemfile',
// local_cert 文件的密碼。
'passphrase' => 'your_pem_passphrase',
// 是否允許自簽名證書(shū)。
'allow_self_signed' => true,
// 是否需要驗(yàn)證 SSL 證書(shū)。
'verify_peer' => false
)
];
$consumer = new AsyncTcpConnection($domain,$option);
// 設(shè)置以ssl加密方式訪問(wèn)
$consumer->transport = 'ssl';
//pulsar邏輯處理
$service = new Pulsar();
$consumer->headers = [
'username' => $access_id,
'password' => self::genPwd($access_id,$access_key),
"Connection" => "Upgrade",
];
$consumer->onConnect = function(AsyncTcpConnection $con) {
};
$consumer->onMessage = function(AsyncTcpConnection $con, $data)use($service,$access_key) {
//服務(wù)文件處理
$message_id = $service->statisticsData($data,$access_key);
$con->send(json_encode(['messageId' => $message_id]));
};
$consumer->connect();
// 設(shè)置連接的onClose回調(diào)
$consumer->onClose = function(AsyncTcpConnection $con)
{
echo "connection closed\n";
//斷線重連
$con->reConnect(1);
};
}
這里寫(xiě)問(wèn)題具體描述
官方給的c# demo ack消息是這樣的
if (messageId != ""){
var payload = System.Text.Encoding.UTF8.GetString(Convert.FromBase64String(jobject["payload"].ToString()));
//msg handler
DateTime s = DateTime.Now;
MessageHandler(payload);
Console.WriteLine("business processing cost="+(s-DateTime.Now));
//send ack
await webSocket.SendAsync(new ArraySegment<byte>(System.Text.Encoding.UTF8.GetBytes(ackStrJson)), WebSocketMessageType.Text, true, CancellationToken.None);
}
是我寫(xiě)的有問(wèn)題嗎