header("Content-Type:text/html;charset=utf-8");
use Workerman\Worker;
require_once __DIR__ . '/Autoloader.php';
// 注意:這里與上個(gè)例子不同,使用的是websocket協(xié)議
$ws_worker = new Worker("websocket://192.168.1.218:5556");
// 啟動(dòng)4個(gè)進(jìn)程對(duì)外提供服務(wù)
$ws_worker->count = 4;
// 當(dāng)收到客戶端發(fā)來的數(shù)據(jù)后返回hello $data給客戶端
$ws_worker->onMessage = function($connection, $data)
{
$data = json_decode($data, true);
$conn_arg = array(
'host' => '192.168.1.100',
'port' => '5672',
'login' => 'qifaoa',
'password' => 'qifaoa@123',
'vhost' => '/',
);
foreach ($data as $k => $v) {
$exchange = $v;//交換機(jī)
$routing_key = $v;//路由key
$queue = $v;//隊(duì)列
if (empty($exchange) || empty($routing_key) || empty($queue)) {
$connection->send('參數(shù)錯(cuò)誤');
die();
}
//創(chuàng)建連接和channel
$conn = new AMQPConnection($conn_arg);
if (!$conn->connect()) {
$connection->send('rabbitmq連接失敗');
die();
} else {
// echo '連接成功'."\n";
}
$channel = new AMQPChannel($conn);
//創(chuàng)建隊(duì)列
$q = new AMQPQueue($channel);
$q->setName($queue);
$q->setFlags(AMQP_DURABLE);//設(shè)置隊(duì)列持久化
$q->declareQueue();//聲明創(chuàng)建隊(duì)列
$q->bind($exchange, $routing_key);//綁定交換機(jī),指定路由鍵
//消息獲取
$message = $q->get(AMQP_AUTOACK);//自動(dòng)應(yīng)答機(jī)制
//判斷消息是否存在
if ($message) {
$connection->send($message->getBody());
} else {
// echo json_encode('空');
}
$conn->disconnect();//關(guān)閉
}
};
//連接關(guān)閉
$ws_worker->onclose = function($connection) {
echo "connection close\n";
};
//連接錯(cuò)誤
$worker->onError = function($connection, $code, $msg)
{
echo "error $code $msg\n";
};
// 運(yùn)行worker
Worker::runAll();
你的代碼是客戶端收到消息后才會(huì)觸發(fā)到RabbitMQ 去Get的, 并沒有主動(dòng)監(jiān)聽RabbitMQ的消息