之前我寫過一個(gè)基于定時(shí)器+swoole的rabbitmq生產(chǎn)者和消費(fèi)者(http://m.wtbis.cn/q/8688)
在使用時(shí)發(fā)現(xiàn)或多或少有一些問題(最大的問題是CPU搶占問題,導(dǎo)致workerman內(nèi)的基于定時(shí)任務(wù)長時(shí)間得不到執(zhí)行),畢竟官方的內(nèi)容都是同步機(jī)制的,我在想有沒有可能使用異步實(shí)現(xiàn)。
經(jīng)過長時(shí)間的研究,終于解決了這個(gè)問題,個(gè)人認(rèn)為比官方基于bunny+React的方式更好使用一些。
Lib_calss_rabbitmq.php
<?php
/*
* 20230316 增加 rabbitmq_publish_v3
* 20230320 使用event事件進(jìn)行驅(qū)動
*
*/
//composer require php-amqplib/php-amqplib
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\AMQPReader;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Swoole\Coroutine;
use Workerman\Events\EventInterface;
class rabbitmq_client {
public $connection = "";
public $channel = "";
public $is_connected = false ;
public $is_debug = false;
public $exchange_name = "" ;
public $queen_name = "";
public $comsume_callback = null;
public $socket = null;
public $config_option = array(
'host' => "127.0.0.1",
'port' => "5672",
'user' => "admin",
'password' => "admin",
'exchange_name' => 'default_exchange_name',
'queen_name' => 'default_queen_name',
);
public function __construct($option = array()){
$this->config_option['host'] = $option['host'];
$this->config_option['port'] = $option['port'];
$this->config_option['user'] = $option['user'];
$this->config_option['password'] = $option['password'];
$this->exchange_name = $option['exchange_name'];
$this->queen_name = $option['queen_name'];
//初始化Rabbitmq連接
while( $this->is_connected == false ){
if($this->connect() == true){
break;
}
$this->app_log("rabbitmq server connect failed");
sleep(1);
}
//執(zhí)行定時(shí)握手任務(wù)
Timer::add( 55 , function (){
// 發(fā)送心跳數(shù)據(jù)
$this->write_heartbeat();
});
}
function app_log($log){
//將日志信息發(fā)送給日志服務(wù)器
$ts = round(microtime(true) - time() , 6);
@list($ts1 , $ts2) = @explode("." , $ts);
$logData = "{$ts1}.{$ts2} {$log}";
//logToFile($logData);
if(function_exists("logToScreen") == true){
logToScreen($logData , true);
}else{
echo $logData."\n";
}
}
function setDebug($is_debug = true){
$this->is_debug = $is_debug;
}
public function connect(){
try{
$this->connection = new AMQPStreamConnection(
$this->config_option['host'] ,
$this->config_option['port'] ,
$this->config_option['user'] ,
$this->config_option['password'] ,
'/' ,
false ,
'AMQPLAIN' ,
null,
'en_US' ,
3.0 ,
3.0 ,
null ,
true ,
60
);
//ZGH debug
//$this->app_log(get_class($this->connection)); //PhpAmqpLib\Connection\AMQPStreamConnection
//$this->app_log(get_class($this->connection->getIo())); //PhpAmqpLib\Wire\IO\StreamIO
//$this->app_log(print_r($this->connection->getIo()->getSocket(),true));
$this->socket = $this->connection->getIo()->getSocket();
//$this->app_log(print_r(debug_backtrace(),true));
//$this->app_log(print_r(debug_print_backtrace(),true));
if( $this->connection ->isConnected() == true){
$this->channel = $this->connection->channel();
//聲明交換機(jī)
$this->channel->exchange_declare( $this->exchange_name , 'topic', false, true, false);
// 聲明隊(duì)列
$this->channel->queue_declare( $this->queen_name , false, true, false, false);
// 綁定隊(duì)列
$this->channel->queue_bind($this->queen_name , $this->exchange_name , $this->queen_name );
$this->is_connected = true;
if($this->is_debug == true){
$this->app_log("rabbitmq connected");
}
return true;
}
}catch (Exception $e) {
$this->app_log("error catched :".$e->getMessage());
$this->is_connected = false;
}
return false;
}
public function reconnect(){
if( $this->is_connected == false ){
if( $this->connect() == true ){
//重新連接到服務(wù)器
$this->is_connected = true;
return true;
}
}
return false;
}
/**
* @return void
*
* 向服務(wù)器發(fā)送
*/
function write_heartbeat(){
if($this->is_connected == true){
try{
//app_log("heartbeat");
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$this->connection->write($pkt->getvalue());
}catch (Exception $e) {
$this->app_log("error catched :".$e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
}else{
// false
$this->reconnect();
}
}
/**
* @param $data
* @param $queen_name
* @param $is_persistent
* @param $is_debug
* @return void
*/
function publish( $data = "" , $is_persistent = true , $exchange_name_input = "" , $queen_name_input = ""){
$delivery_mod = AMQPMessage::DELIVERY_MODE_PERSISTENT;
if($is_persistent == false){
$delivery_mod = AMQPMessage::DELIVERY_MODE_NON_PERSISTENT;
}
$exchange_name = "";
if(strlen($exchange_name_input) > 0){
$exchange_name = $exchange_name_input;
}else{
$exchange_name = $this->exchange_name;
}
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $this->exchange_name;
}
$rabbit_msg = new AMQPMessage($data , ['content_type'=>'text/plain','delivery_mode'=>$delivery_mod]); //定義消息
try{
// 發(fā)送消息
$this->channel->basic_publish($rabbit_msg, $exchange_name, $queen_name);
}catch (Exception $e) {
$this->app_log("error catched :".$e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
}
//在做消費(fèi)時(shí),對流量進(jìn)行控制,防止出現(xiàn)丟數(shù)據(jù)
function set_comsume_qos( $prefetch_size = 0 , $prefetch_count = 1 ){
$this->channel->basic_qos( $prefetch_size , $prefetch_count ,false); //當(dāng)有消息在處理時(shí)不要發(fā)過來
}
/*
function comsume_callback($msg){
//收到MQ消息
$message_body = $msg->body;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//echo "[x] Received ", $message_body, "\n";
//redis_add_statistic( $redis , "rabbitmq:qos_test_consumption" , 0.1);
}
*/
/*
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
Swoole\Coroutine::set(['enable_deadlock_check' => false]);
* */
function comsume_swoole_go( $queen_name_input = ""){
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $this->queen_name;
}
if($this->is_connected == true) {
go(function () {
// 消費(fèi)者訂閱隊(duì)列
try {
if( !$this->comsume_callback ){
$this->app_log("function comsume_callback must be set");
return false;
}
$this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
} catch (Exception $e) {
$this->app_log("error catched :" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
});
// 添加事件驅(qū)動,收到消息時(shí)觸發(fā)
Worker::$globalEvent->add($this->socket, EventInterface::EV_READ, array($this, 'channel_wait'));
//需要做一次初始化
$this->channel_wait();
}else{
// false
$this->connect();
}
}
function comsume( $queen_name_input = ""){
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $this->queen_name;
}
if($this->is_connected == true) {
// 消費(fèi)者訂閱隊(duì)列
try {
if( !$this->comsume_callback ){
$this->app_log("function comsume_callback must be set");
return false;
}
$this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
} catch (Exception $e) {
$this->app_log("error catched :" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
// 添加事件驅(qū)動,收到消息時(shí)觸發(fā)
Worker::$globalEvent->add($this->socket, EventInterface::EV_READ, array($this, 'channel_wait'));
//需要做一次初始化
$this->channel_wait();
}else{
// false
$this->connect();
}
}
function channel_wait(){
// 開始消費(fèi)
try {
/*
while ( count($this->channel->callbacks) ) {
$this->channel->wait();
usleep(1);
}
while ( $this->channel->is_consuming() ) {
usleep(10);
$this->channel->wait();
}
//$this->app_log(get_class($this->channel)); //PhpAmqpLib\Channel\AMQPChannel
while ( $this->channel->is_consuming() ) {
$this->channel->wait(null , true , 0.001);
usleep(10);
}
*/
if( $this->channel->is_consuming() ) {
$this->channel->wait(null , true , 0.001);
}
} catch (Exception $e) {
$this->app_log("error catched when consuming:" . $e->getMessage());
$this->is_connected = false;
$this->reconnect();
}
}
}
function rabbitmq_publish_v3( $rabbitmq_client , $data = "" , $queen_name_input = "" , $exchange_name_input = "" ){
$exchange_name = "";
if(strlen($exchange_name_input) > 0){
$exchange_name = $exchange_name_input;
}else{
$exchange_name = $rabbitmq_client->exchange_name;
}
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $rabbitmq_client->queen_name;
}
$rabbitmq_client->publish($data , true , $exchange_name , $queen_name);
}
使用方法:
<?php
//初始化Rabbitmq連接
$rabbitmq_config_option = array();
$rabbitmq_config_option['host'] = RABBITMQ_SERVER_IP;
$rabbitmq_config_option['port'] = RABBITMQ_SERVER_PORT;
$rabbitmq_config_option['user'] = RABBITMQ_USERNAME;
$rabbitmq_config_option['password'] = RABBITMQ_PASSWORD;
$rabbitmq_config_option['exchange_name'] = "upstream_exchange";
$rabbitmq_config_option['queen_name'] = "upstream_queen";
$rabbitmq_client = new rabbitmq_client($rabbitmq_config_option);
//生產(chǎn)者,可以指定隊(duì)列或者交換機(jī):
function rabbitmq_publish_v3( $rabbitmq_client , $data = "" , $queen_name_input = "" , $exchange_name_input = "" ){
$exchange_name = "";
if(strlen($exchange_name_input) > 0){
$exchange_name = $exchange_name_input;
}else{
$exchange_name = $rabbitmq_client->exchange_name;
}
$queen_name = "";
if(strlen($queen_name_input) > 0){
$queen_name = $queen_name_input;
}else{
$queen_name = $rabbitmq_client->queen_name;
}
$rabbitmq_client->publish($data , true , $exchange_name , $queen_name);
}
//作為消費(fèi)者使用
$rabbitmq_client->comsume_callback = function ($msg)use($db,$workerId){
//收到MQ消息
$message_body = $msg->body;
$data_arr = json_decode($message_body , true);
//只有格式合格才進(jìn)行確認(rèn)
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
echo "[{$workerId}] Received ", $message_body, "\n";
};
$rabbitmq_client->comsume_swoole_go(); //協(xié)程方式 需要安裝swoole
//$rabbitmq_client->comsume(); //普通方式
特別說明,如果需要使用協(xié)程方式,需要安裝swoole,并且在項(xiàng)目啟動文件前面加上:
use Swoole\Coroutine;
Worker::$eventLoopClass = 'Workerman\Events\Swoole';
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
Swoole\Coroutine::set(['enable_deadlock_check' => false]);
感謝分享