最近在公司的代码里面看到RabbitMQ相关的代码,于是带着好奇心研究了下RabbitMQ.
RabbitMQ的核心是交换机和队列。
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存 储消息,在启用ack模式后,交换机找不到队列会返回错误。队列用于临时存储消息和转发消息。
交换机有四种类型:Direct, topic, Headers and Fanout。
Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.[精确匹配类型]
Topic:按规则转发消息(最灵活)[模式匹配]
Headers:设置header attribute参数类型的交换机
RabbitMQ的队列类型有两种,即时队列和延时队列【DelayQueue】。
即时队列:队列中的消息会被立即消费;
延时队列:队列中的消息会在指定的时间延时之后被消费。
下面将从即时队列和延时队列的具体实现来详解RabbitMQ.
即时队列详解:
DirectExchange:
DirectExchangeTest.java
@Test
public void helloTest() throws Exception {
helloSender.send();
}
HelloSender.java
package com.neo.rabbit.hello;
import java.util.Date;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date()+" sender is JunZhou";
System.out.println("Sender/this messag is send by zj : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
HelloReceiver.java
package com.neo.rabbit.hello;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
RabbitConfig.java
package com.neo.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//@Configuration这个注解是必须的,保证在基本类实例化之前该类已经被实例化
@Configuration
public class RabbitConfig {
@Autowired
@Qualifier("defaultDirectExchange")
private DirectExchange exchange;
//创建队列,注意加注解;
@Bean
public Queue hiQueue() {
return new Queue("hello");
}
@Bean
public Queue neoQueue() {
return new Queue("neo");
}
@Bean
public Queue objectQueue() {
return new Queue("object");
}
//绑定队列;
@Bean
public Binding helloBinding() {
return BindingBuilder.bind(hiQueue()).to(exchange).with("hello");
}
}
FanoutExchange:
FanoutExchangeTest.java
@Test
public void fanoutSenderTest(){
fanoutSender.send();
}
FanoutSender .java
package com.neo.rabbit.fanout;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "Sender from JunZhou 20180806 : hi, fanout msg ";
System.out.println("Sender from JunZhou 20180806 : " + context);
//在fancout模式下,所有绑定到该交换机的队列均会收到交换机发出的消息,因此队列的名称会被忽略不计,因此`convertAndSend(exchangeName,queueName,Msg);`中的第二个参数可以用“”代替;
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
}
FanoutReceiverA .java……FanoutReceiverC .java
package com.neo.rabbit.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver A : " + message);
}
}
FanoutRabbitConfig.java
package com.neo.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(AMessage()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(BMessage()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(CMessage()).to(fanoutExchange());
}
}
TopicExchangeTest.java
@Test
public void topicSenderTest(){
//topicSender.send();
//topicSender.send1();
topicSender.send2();
}
TopicSender .java
package com.neo.rabbit.topic;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, i am message all";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
}
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}
TopicReceiver .java
package com.neo.rabbit.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver1 : " + message);
}
}
TopicReceiver2 .java
package com.neo.rabbit.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
}
}
TopicRabbitConfig .java
package com.neo.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:
“*”表示一个词.
“#”表示零个或多个词.
在上面的TopicExchange中,当调用send1()方法,执行this.rabbitTemplate.convertAndSend("topicExchange","topic.message", context);
时,实际上只有两个队列,即采用with(“topic.#”);和with(“topic.message”);绑定的两个队列会接受到消息;
以上就是非常简单的即时队列的详解,线面是延时队列的详解。
延时队列有哪些用处呢?打个比方,加入用户下了单,但是在30分钟内没有支付我们就返回支付失败提示这个情景就可以采用延时队列来处理。
延时队列的原理图如下:
客户端:指具体往MQ发生消息端, 客户端将消息内容进行自定义包装, 将消息中附带目标队列名称。如:客户端向队列Q1发送字符串“hello” , 延时时间为60秒, 包装后修改为{“queueName”:”Q1”,”body”: “hello”},此时,将消息发送到DLX死信队列,而非Q1队列,并将消息设置为60秒超时。
DLX:死信队列,用来存储有超时时间信息的消息, 并且可以设置当消息超时时,转发到另一个指定队列(此处设置转发到router), 死信队列无消费者,当接收到客户端消息之后,等待消息超时,将消息转发到指定的Router队列
Router: 转发队列,用来接收死信队列超时消息, 如上示例消息,在接收到之后,消费者将消息解析,获取queueName,body,再向所获取的queueName队列中发送一条消息,内容为body.
延时队列的具体实现代码如下:
DelaySendTest.java
@Test
public void delaySendTest(){
System.out.println("发送延迟消息...");
QueueMessage message = new QueueMessage("app.queue.hello","测试延时消息...");
message.setType(20);//20代表延时消息队列;
message.setSeconds(600);//设置延时时间,单位为毫秒;
messageQueueService.send(message);
}
QueueMessage.java
package com.neo.rabbit.DelayMQ.Model;
import java.util.Date;
public class QueueMessage {
private String exchange;
private String queueName;
private Integer type;
private Integer group;
private Date timestamp;
private String message;
private Integer status;
private int retry = 0;
private int maxRetry = 10;
private int seconds = 1;
public QueueMessage() {
super();
}
public QueueMessage(String queueName, String message) {
super();
this.queueName = queueName;
this.message = message;
this.exchange = "default.direct.exchange";
this.type = 10;
this.group = 10;
this.timestamp = new Date();
this.status = 10;
}
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public Integer getGroup() {
return group;
}
public void setGroup(Integer group) {
this.group = group;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public int getRetry() {
return retry;
}
public void setRetry(int retry) {
this.retry = retry;
}
public int getMaxRetry() {
return maxRetry;
}
public void setMaxRetry(int maxRetry) {
this.maxRetry = maxRetry;
}
public int getSeconds() {
return seconds;
}
public void setSeconds(int seconds) {
this.seconds = seconds;
}
}
MessageException.java
package com.neo.rabbit.DelayMQ.exception;
public class MessageException extends RuntimeException{
private static final long serialVersionUID = 140085406084367372L;
private int code ;
private String message;
private String data;
public MessageException() {
super();
}
public MessageException(int code, String message) {
super(message);
this.code = code;
this.message = message;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
DeafaultMessageServiceImpl.java
package com.neo.rabbit.DelayMQ.service.impl;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.neo.rabbit.DelayMQ.Model.QueueMessage;
import com.neo.rabbit.DelayMQ.exception.MessageException;
import com.neo.rabbit.DelayMQ.service.IMessageQueueService;
import com.neo.rabbit.DelayMQ.utils.JSONUtils;
import com.neo.rabbit.DelayMQ.utils.StringUtils;
@Service("deafaultMessageService")
public class DeafaultMessageServiceImpl implements IMessageQueueService{
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send(QueueMessage message) {
this.checkMessage(message);
if(message.getType() == 10){//即时消息
this.sendMessage(message.getExchange(),message.getQueueName(),message.getMessage());
}
if(message.getType() == 20){//延时消息
sendTimeMessage(message);
}
}
//发送即时消息;
private void sendMessage(String exchange,String queueName,String msg){
rabbitTemplate.convertAndSend(exchange,queueName, msg);
}
//发送延时消息;
public void sendTimeMessage(QueueMessage message) {
int seconds = message.getSeconds();
if(seconds <= 0){// 直接发送,无需进入死信队列
sendMessage(message.getExchange(),message.getQueueName(), message.getMessage());
}else{
long times = seconds * 1000;//rabbit默认为毫秒级
//这里需要字符定义延时处理器;
MessagePostProcessor processor = new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(times + "");
return message;
}
};
rabbitTemplate.convertAndSend("default.direct.exchange","default.dead.letter.queue", JSONUtils.toJson(message), processor);
}
}
private void checkMessage(QueueMessage message){
if (StringUtils.isNullOrEmpty(message.getExchange())) {
throw new MessageException(10, "发送消息格式错误: 消息交换机(exchange)不能为空!");
}
if(message.getGroup() == null){
throw new MessageException(10, "发送消息格式错误: 消息组(group)不能为空!");
}
if(message.getType() == null){
throw new MessageException(10, "发送消息格式错误: 消息类型(type)不能为空!");
}
if(message.getStatus() == null){
throw new MessageException(10, "发送消息格式错误: 消息状态(status)不能为空!");
}
if(StringUtils.isNullOrEmpty(message.getQueueName())){
throw new MessageException(10, "发送消息格式错误: 消息目标名称(queueName)不能为空!");
}
if (StringUtils.isNullOrEmpty(message.getMessage())) {
throw new MessageException(10, "发送消息格式错误: 消息内容(message)不能为空!");
}
}
}
QueueConfiguration.java
package com.neo.rabbit.DelayMQ.mqconfig;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfiguration {
/**
* 默认即时消息交换机
* @return
*/
@Bean("defaultDirectExchange")
public DirectExchange defaultDirectExchange() {
return new DirectExchange("default.direct.exchange", true, false);
}
/**
* 默认延迟消息死信队列
* @return
*/
@Bean
public Queue defaultDeadLetterQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","default.direct.exchange");//设置交换机路由
arguments.put("x-dead-letter-routing-key", "default.repeat.trade.queue");//设置转发队列名称
Queue queue = new Queue("default.dead.letter.queue",true,false,false,arguments);
return queue;
}
@Bean
public Binding defaultDeadLetterBinding() {
Binding bind = BindingBuilder.bind(defaultDeadLetterQueue()).to(defaultDirectExchange()).with("default.dead.letter.queue");
return bind;
}
/**
* 默认延迟消息死信接受转发消息队列
* @return
*/
@Bean
public Queue defaultRepeatTradeQueue() {
Queue queue = new Queue("default.repeat.trade.queue",true,false,false);
return queue;
}
/**
* 转发队列和默认交换机的绑定;
* @return
*/
@Bean
public Binding defaultRepeatTradeBinding() {
return BindingBuilder.bind(defaultRepeatTradeQueue()).to(defaultDirectExchange()).with("default.repeat.trade.queue");
}
@Bean
public Queue helloQueue() {
Queue queue = new Queue("app.queue.hello",true,false,false);
return queue;
}
@Bean
public Binding helloBinding() {
return BindingBuilder.bind(helloQueue()).to(defaultDirectExchange()).with("app.queue.hello");
}
}
TradeProcessor.java
package com.neo.rabbit.DelayMQ.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.neo.rabbit.DelayMQ.Model.QueueMessage;
import com.neo.rabbit.DelayMQ.service.IMessageQueueService;
import com.neo.rabbit.DelayMQ.utils.JSONUtils;
/**
*
* @author victor
* @desc 死信接收处理消费者
*/
@Component
@RabbitListener(queues = "default.repeat.trade.queue")
public class TradeProcessor {
@Autowired
private IMessageQueueService messageQueueService;
@RabbitHandler
public void process(String content) {
System.out.println("-----------延时结束--------------");
QueueMessage message = JSONUtils.toBean(content, QueueMessage.class);
message.setType(10);
messageQueueService.send(message);
}
}
HelloProcessor.java
package com.neo.rabbit.DelayMQ.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "app.queue.hello")
public class HelloProcessor {
@RabbitHandler
public void process(String content) {
System.out.println("hello 接受消息:" + content);
}
}
延时队列在可视化界面的表现形式为:
其实最终的原理非常简单:
归纳起来就是:客户端发送消息到指定的交换机,进入死信队列【死信队列没有消费者】,在死信队列里超时处理器等待消息超时,消息超时后转发消息给转发队列,转发队列的消息消费者监听到消息后进行对应的逻辑处理即可。