国产+高潮+在线,国产 av 仑乱内谢,www国产亚洲精品久久,51国产偷自视频区视频,成人午夜精品网站在线观看

分享一個(gè)基于workerman的rabbitmq客戶端生產(chǎn)者、消費(fèi)者(基于Event實(shí)現(xiàn)事件驅(qū)動)

zgh419566

之前我寫過一個(gè)基于定時(shí)器+swoole的rabbitmq生產(chǎn)者和消費(fèi)者(http://m.wtbis.cn/q/8688
在使用時(shí)發(fā)現(xiàn)或多或少有一些問題(最大的問題是CPU搶占問題,導(dǎo)致workerman內(nèi)的基于定時(shí)任務(wù)長時(shí)間得不到執(zhí)行),畢竟官方的內(nèi)容都是同步機(jī)制的,我在想有沒有可能使用異步實(shí)現(xiàn)。

經(jīng)過長時(shí)間的研究,終于解決了這個(gè)問題,個(gè)人認(rèn)為比官方基于bunny+React的方式更好使用一些。

Lib_calss_rabbitmq.php

<?php
/*
 * 20230316 增加 rabbitmq_publish_v3
 * 20230320 使用event事件進(jìn)行驅(qū)動
 *
 */

//composer require php-amqplib/php-amqplib

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\AMQPReader;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Swoole\Coroutine;
use Workerman\Events\EventInterface;

class rabbitmq_client {
    public $connection = "";
    public $channel = "";
    public $is_connected = false ;

    public $is_debug = false;

    public $exchange_name = "" ;
    public $queen_name = "";
    public $comsume_callback = null;

    public $socket = null;
    public $config_option = array(
        'host' => "127.0.0.1",
        'port' => "5672",
        'user' => "admin",
        'password' => "admin",
        'exchange_name' => 'default_exchange_name',
        'queen_name' => 'default_queen_name',
    );

    public function __construct($option = array()){
        $this->config_option['host'] = $option['host'];
        $this->config_option['port'] = $option['port'];
        $this->config_option['user'] = $option['user'];
        $this->config_option['password'] = $option['password'];
        $this->exchange_name = $option['exchange_name'];
        $this->queen_name = $option['queen_name'];

        //初始化Rabbitmq連接
        while( $this->is_connected == false ){
            if($this->connect() == true){
                break;
            }
            $this->app_log("rabbitmq server connect failed");
            sleep(1);
        }

        //執(zhí)行定時(shí)握手任務(wù)
        Timer::add( 55 , function (){
            // 發(fā)送心跳數(shù)據(jù)
            $this->write_heartbeat();
        });
    }

    function app_log($log){
        //將日志信息發(fā)送給日志服務(wù)器
        $ts = round(microtime(true) - time() , 6);
        @list($ts1 , $ts2) = @explode("." , $ts);

        $logData = "{$ts1}.{$ts2} {$log}";

        //logToFile($logData);
        if(function_exists("logToScreen") == true){
            logToScreen($logData , true);
        }else{
            echo $logData."\n";
        }
    }

    function setDebug($is_debug = true){
        $this->is_debug = $is_debug;
    }

    public function connect(){
        try{
            $this->connection = new AMQPStreamConnection(
                $this->config_option['host'] ,
                $this->config_option['port'] ,
                $this->config_option['user'] ,
                $this->config_option['password'] ,
                '/' ,
                false ,
                'AMQPLAIN' ,
                null,
                'en_US' ,
                3.0 ,
                3.0 ,
                null ,
                true ,
                60
            );

            //ZGH debug
            //$this->app_log(get_class($this->connection));   //PhpAmqpLib\Connection\AMQPStreamConnection

            //$this->app_log(get_class($this->connection->getIo()));  //PhpAmqpLib\Wire\IO\StreamIO

            //$this->app_log(print_r($this->connection->getIo()->getSocket(),true));
            $this->socket = $this->connection->getIo()->getSocket();

            //$this->app_log(print_r(debug_backtrace(),true));
            //$this->app_log(print_r(debug_print_backtrace(),true));

            if( $this->connection ->isConnected() == true){

                $this->channel = $this->connection->channel();

                //聲明交換機(jī)
                $this->channel->exchange_declare( $this->exchange_name , 'topic', false, true, false);

                // 聲明隊(duì)列
                $this->channel->queue_declare( $this->queen_name , false, true, false, false);

                // 綁定隊(duì)列
                $this->channel->queue_bind($this->queen_name , $this->exchange_name , $this->queen_name );

                $this->is_connected = true;

                if($this->is_debug == true){
                    $this->app_log("rabbitmq connected");
                }
                return true;
            }
        }catch (Exception $e) {
            $this->app_log("error catched :".$e->getMessage());
            $this->is_connected = false;
        }
        return false;
    }

    public function reconnect(){
        if( $this->is_connected == false ){
            if( $this->connect() == true ){
                //重新連接到服務(wù)器
                $this->is_connected = true;
                return true;
            }
        }
        return false;
    }

    /**
     * @return void
     *
     * 向服務(wù)器發(fā)送
     */
    function write_heartbeat(){
        if($this->is_connected == true){
            try{
                //app_log("heartbeat");
                $pkt = new AMQPWriter();
                $pkt->write_octet(8);
                $pkt->write_short(0);
                $pkt->write_long(0);
                $pkt->write_octet(0xCE);
                $this->connection->write($pkt->getvalue());
            }catch (Exception $e) {
                $this->app_log("error catched :".$e->getMessage());
                $this->is_connected = false;
                $this->reconnect();
            }
        }else{
            // false
            $this->reconnect();
        }
    }

    /**
     * @param $data
     * @param $queen_name
     * @param $is_persistent
     * @param $is_debug
     * @return void
     */
    function publish( $data = "" , $is_persistent = true , $exchange_name_input = "" , $queen_name_input = ""){
        $delivery_mod = AMQPMessage::DELIVERY_MODE_PERSISTENT;
        if($is_persistent == false){
            $delivery_mod = AMQPMessage::DELIVERY_MODE_NON_PERSISTENT;
        }

        $exchange_name = "";
        if(strlen($exchange_name_input) > 0){
            $exchange_name = $exchange_name_input;
        }else{
            $exchange_name = $this->exchange_name;
        }

        $queen_name = "";
        if(strlen($queen_name_input) > 0){
            $queen_name = $queen_name_input;
        }else{
            $queen_name = $this->exchange_name;
        }

        $rabbit_msg = new AMQPMessage($data , ['content_type'=>'text/plain','delivery_mode'=>$delivery_mod]); //定義消息
        try{
            // 發(fā)送消息
            $this->channel->basic_publish($rabbit_msg, $exchange_name, $queen_name);
        }catch (Exception $e) {
            $this->app_log("error catched :".$e->getMessage());
            $this->is_connected = false;
            $this->reconnect();
        }
    }

    //在做消費(fèi)時(shí),對流量進(jìn)行控制,防止出現(xiàn)丟數(shù)據(jù)
    function set_comsume_qos( $prefetch_size = 0 , $prefetch_count = 1 ){
        $this->channel->basic_qos( $prefetch_size , $prefetch_count ,false);   //當(dāng)有消息在處理時(shí)不要發(fā)過來
    }

    /*
    function comsume_callback($msg){
        //收到MQ消息
        $message_body = $msg->body;
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        //echo "[x] Received ", $message_body, "\n";
        //redis_add_statistic( $redis , "rabbitmq:qos_test_consumption"  , 0.1);
    }
    */

    /*
    Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
    Swoole\Coroutine::set(['enable_deadlock_check' => false]);
     * */
    function comsume_swoole_go( $queen_name_input = ""){
        $queen_name = "";
        if(strlen($queen_name_input) > 0){
            $queen_name = $queen_name_input;
        }else{
            $queen_name = $this->queen_name;
        }
        if($this->is_connected == true) {
            go(function () {
                // 消費(fèi)者訂閱隊(duì)列
                try {
                    if( !$this->comsume_callback ){
                        $this->app_log("function comsume_callback must be set");
                        return false;
                    }
                    $this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
                } catch (Exception $e) {
                    $this->app_log("error catched :" . $e->getMessage());
                    $this->is_connected = false;
                    $this->reconnect();
                }
            });
            // 添加事件驅(qū)動,收到消息時(shí)觸發(fā)
            Worker::$globalEvent->add($this->socket, EventInterface::EV_READ, array($this, 'channel_wait'));

            //需要做一次初始化
            $this->channel_wait();
        }else{
            // false
            $this->connect();
        }
    }

    function comsume( $queen_name_input = ""){
        $queen_name = "";
        if(strlen($queen_name_input) > 0){
            $queen_name = $queen_name_input;
        }else{
            $queen_name = $this->queen_name;
        }

        if($this->is_connected == true) {
            // 消費(fèi)者訂閱隊(duì)列
            try {
                if( !$this->comsume_callback ){
                    $this->app_log("function comsume_callback must be set");
                    return false;
                }
                $this->channel->basic_consume( $this->queen_name , '' , false , false , false , false , $this->comsume_callback);
            } catch (Exception $e) {
                $this->app_log("error catched :" . $e->getMessage());
                $this->is_connected = false;
                $this->reconnect();
            }

            // 添加事件驅(qū)動,收到消息時(shí)觸發(fā)
            Worker::$globalEvent->add($this->socket, EventInterface::EV_READ, array($this, 'channel_wait'));

            //需要做一次初始化
            $this->channel_wait();
        }else{
            // false
            $this->connect();
        }
    }

    function channel_wait(){
        // 開始消費(fèi)
        try {
            /*
            while ( count($this->channel->callbacks) ) {
                $this->channel->wait();
                usleep(1);
            }
            while ( $this->channel->is_consuming() ) {
                usleep(10);
                $this->channel->wait();
            }

            //$this->app_log(get_class($this->channel));   //PhpAmqpLib\Channel\AMQPChannel

            while ( $this->channel->is_consuming() ) {
                $this->channel->wait(null , true , 0.001);
                usleep(10);
            }
            */
            if( $this->channel->is_consuming() ) {
                $this->channel->wait(null , true , 0.001);
            }
        } catch (Exception $e) {
            $this->app_log("error catched when consuming:" . $e->getMessage());
            $this->is_connected = false;
            $this->reconnect();
        }
    }
}

function rabbitmq_publish_v3( $rabbitmq_client , $data = "" , $queen_name_input = "" , $exchange_name_input = "" ){
    $exchange_name = "";
    if(strlen($exchange_name_input) > 0){
        $exchange_name = $exchange_name_input;
    }else{
        $exchange_name = $rabbitmq_client->exchange_name;
    }

    $queen_name = "";
    if(strlen($queen_name_input) > 0){
        $queen_name = $queen_name_input;
    }else{
        $queen_name = $rabbitmq_client->queen_name;
    }

    $rabbitmq_client->publish($data , true , $exchange_name , $queen_name);
}

使用方法:

<?php
    //初始化Rabbitmq連接
    $rabbitmq_config_option = array();
    $rabbitmq_config_option['host'] = RABBITMQ_SERVER_IP;
    $rabbitmq_config_option['port'] = RABBITMQ_SERVER_PORT;
    $rabbitmq_config_option['user'] = RABBITMQ_USERNAME;
    $rabbitmq_config_option['password'] = RABBITMQ_PASSWORD;
    $rabbitmq_config_option['exchange_name'] = "upstream_exchange";
    $rabbitmq_config_option['queen_name'] = "upstream_queen";
    $rabbitmq_client = new rabbitmq_client($rabbitmq_config_option);

    //生產(chǎn)者,可以指定隊(duì)列或者交換機(jī):
    function rabbitmq_publish_v3( $rabbitmq_client , $data = "" , $queen_name_input = "" , $exchange_name_input = "" ){
        $exchange_name = "";
      if(strlen($exchange_name_input) > 0){
        $exchange_name = $exchange_name_input;
    }else{
        $exchange_name = $rabbitmq_client->exchange_name;
    }

    $queen_name = "";
    if(strlen($queen_name_input) > 0){
        $queen_name = $queen_name_input;
    }else{
        $queen_name = $rabbitmq_client->queen_name;
    }

      $rabbitmq_client->publish($data , true , $exchange_name , $queen_name);
   }

   //作為消費(fèi)者使用
   $rabbitmq_client->comsume_callback = function ($msg)use($db,$workerId){
        //收到MQ消息
        $message_body = $msg->body;
        $data_arr = json_decode($message_body , true);
        //只有格式合格才進(jìn)行確認(rèn)
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        echo "[{$workerId}] Received ", $message_body, "\n";
    };

    $rabbitmq_client->comsume_swoole_go();    //協(xié)程方式 需要安裝swoole
    //$rabbitmq_client->comsume();        //普通方式

特別說明,如果需要使用協(xié)程方式,需要安裝swoole,并且在項(xiàng)目啟動文件前面加上:
use Swoole\Coroutine;
Worker::$eventLoopClass = 'Workerman\Events\Swoole';
Co::set(['hook_flags' => SWOOLE_HOOK_SLEEP]);
Swoole\Coroutine::set(['enable_deadlock_check' => false]);

2095 2 11
2個(gè)評論

owenzhang

感謝分享

  • 暫無評論
dkou

感謝分享

  • 暫無評論
年代過于久遠(yuǎn),無法發(fā)表評論

zgh419566

210
積分
0
獲贊數(shù)
0
粉絲數(shù)
2022-04-28 加入
??