RabbitMQ实战(三)PHP展示实例

yang-pig| 阅读:503 发表时间:2018-07-10 23:17:03 数据库

RabbitMQ 中的一些术语

如果你打开 RabbitMQ web 控制台,你会发现其中有一个 Exhanges 不好理解。下面简单说明一下。

交换器(Exchange)

交换器就像路由器,我们先是把消息发到交换器,然后交换器再根据路由键(routingKey)把消息投递到对应的队列。(明白这个概念很重要,后面的代码里面充分体现了这一点)

3.png

队列(Queue)

队列很好理解,就不用解释了。

4.png

绑定(Binding)

交换器怎么知道把这条消息投递到哪个队列呢?这就需要用到绑定了。大概就是:使用某个路由键(routingKey)把某个队列(Queue)绑定到某个交换器(Exchange),这样交换器就知道根据路由键把这条消息投递到哪个队列了。(后面的代码里面充分体现了这一点)

新建rabbit_consumer.php作为消费者

<?php 

//配置信息 

$conn_args = array( 

    'host' => '127.0.0.1',  

    'port' => '5672',  

    'login' => 'guest',  

    'password' => 'guest', 

    'vhost'=>'/' 

);   

$e_name = 'e_linvo'; //交换机名 

$q_name = 'q_linvo'; //队列名 

$k_route = 'key_1'; //路由key 

//创建连接和channel 

$conn = new AMQPConnection($conn_args);  

if (!$conn->connect()) {   

    die("Cannot connect to the broker!\n");   

}   

$channel = new AMQPChannel($conn);   

//创建交换机    
$ex = new AMQPExchange($channel);   

$ex->setName($e_name); 

$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型

$ex->setFlags(AMQP_DURABLE); //持久化 

echo "Exchange Status:".$ex->declare()."\n";   

//创建队列    

$q = new AMQPQueue($channel); 

$q->setName($q_name);   

$q->setFlags(AMQP_DURABLE); //持久化  

echo "Message Total:".$q->declare()."\n";   

//绑定交换机与队列,并指定路由键 

echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; 

//阻塞模式接收消息 

echo "Message:\n";   

while(True){ 

    $q->consume('processMessage');   

    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  

} 

$conn->disconnect();   

/**
 * 消费回调函数

 * 处理消息
 */ 
function processMessage($envelope, $queue) { 

    $msg = $envelope->getBody(); 

    echo $msg."\n"; //处理消息 

    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 

}

新建rabbit_publisher.php作为生产者

<?php

//配置信息 

$conn_args = array( 

    'host' => '127.0.0.1',  

    'port' => '5672',  

    'login' => 'guest',  

    'password' => 'guest', 

    'vhost'=>'/' 
);   
$e_name = 'e_linvo'; //交换机名 

//$q_name = 'q_linvo'; //无需队列名 

$k_route = 'key_1'; //路由key 

//创建连接和channel 

$conn = new AMQPConnection($conn_args);   

if (!$conn->connect()) {   

    die("Cannot connect to the broker!\n");   

}   

$channel = new AMQPChannel($conn);   

//创建交换机对象    

$ex = new AMQPExchange($channel);  

$ex->setName($e_name);   

date_default_timezone_set("Asia/Shanghai");

//发送消息 

//$channel->startTransaction(); //开始事务  

for($i=0; $i<5; ++$i){ 

    sleep(1);//休眠1秒

    //消息内容 

    $message = "TEST MESSAGE!".date("h:i:sa");   

    echo "Send Message:".$ex->publish($message, $k_route)."\n";  

} 

//$channel->commitTransaction(); //提交事务 

$conn->disconnect();

测试一下:

先起一个窗口同样切换到php目录,输入php rabbit_consumer.php 运行消费者

浏览器运行生产者http://localhost/demo/rabbit_publisher.php

5.png


消费者 ack

  • 怎么保证 RabbitMQ 投递的消息被成功投递到了消费者?

    RabbitMQ 投递的消息,刚投递一半,产生了网络抖动,就有可能到不了消费者。

  • 解决办法:

    RabbitMQ 对消费者说:“如果你成功接收到了消息,给我说确认收到了,不然我就当你没有收到,我还会重新投递”

在 RabbitMQ 中,有两种 acknowledgement 模式。

自动 acknowledgement 模式

这也称作发后即忘模式。

在这种模式下,RabbitMQ 投递了消息,在投递成功之前,如果消费者的 TCP 连接 或者 channel 关闭了,这条消息就会丢失。

会有丢失消息问题。

手动 acknowledgement 模式

在这种模式下,RabbitMQ 投递了消息,在投递成功之前,如果消费者的 TCP 连接 或者 channel 关闭了,导致这条消息没有被 acked,RabbitMQ 会自动把当前消息重新入队,再次投递。

生产者 confirms

  • 问题:怎么保证生产者发送的消息被 RabbitMQ 成功接收?

    生产者发送的消息,刚发送一半,产生了网络抖动,就有可能到不了 RabbitMQ。

  • 解决办法:

    生产者对 RabbitMQ 说:“如果你成功接收到了消息,给我说确认收到了,不然我就当你没有收到”