对RabbitMQ的死信队列的学习,我们还是带着问题去学习它吧!
- 为什么要用死信队列?
- 死信队列是什么?
- 如何配置死信队列?
- 死信消息如何变化?
一: 为什么要用死信队列?
当一条消息初次消费失败,消息队列MQ会自动进行消息重试,当达到消息最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法进行消费。此时,消息队列MQ不会立即将消息丢失,而是将发送到消费者对应的特殊队列中。这种无法正常情况下被消费的消息称为死信消息(Dead-Letter Message)
二: 死信队列是什么?
死信队列就是RabbitMQ中的一种消息机制,在正常情况下无法被消费的消息称为死信消息,存储死信消息的队列被称为死信队列。发生以下情况则会发生死信消息的产生。
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false
- 消息在队列的存活时间超过设置的TTL时间
- 消息队列的消息数量已经超过最大队列长度
三: 如何配置死信队列?
具体配置死信队列,有下面几个步骤
- 配置业务队列,绑定到对应的业务交换机上
- 对应的业务队列配置对应的交换机,和路由key
- 为死信交换机配置死信队列
流程图如下:
具体来看代码实现,采用SpringBoot来实现的
springBoot 文件配置
server.port=8088
spring.resources.static-locations=classpath:/templates/,classpath:/static/
spring.mvc.view.suffix=.html
# redis
spring.redis.host=192.168.0.150
spring.redis.port=6379
spring.redis.password=888888
spring.redis.jedis.pool.max-idle=32
spring.redis.jedis.pool.max-wait=-1
spring.redis.jedis.pool.min-idle=0
spring.redis.timeout=0
# mysql
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.url=jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&autoR&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# show sql in console
logging.level.com.wangzaiplus.test.mapper=debug
# rabbitmq
spring.rabbitmq.host=192.168.36.2
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
# ����confirms�ص� P -> Exchange
spring.rabbitmq.publisher-confirms=true
# ����returnedMessage�ص� Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# �����ֶ�ȷ��(ack) Queue -> C
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#spring.rabbitmq.listener.simple.prefetch=100
# mail
spring.mail.host=smtp.163.com
spring.mail.username=coderpwh@163.com
spring.mail.password=WSNXGQUBFUWLSSBK
spring.mail.from=coderpwh@163.com
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
ok.http.connect-timeout=30
ok.http.read-timeout=30
ok.http.write-timeout=30
# ���ӳ�������Ŀ������ӵ��������
ok.http.max-idle-connections=200
# ���ӿ���ʱ�����Ϊ 300 ��
ok.http.keep-alive-duration=300
RabbitMQ 文件配置类
package com.codrpwh.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQDeadConfig {
public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";
/***
* 普通交换机
*
* @return
*/
@Bean("businessExchange")
public FanoutExchange businessExchange() {
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
/***
* 死信队列
* @return
*/
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean("businessQueueA")
public Queue businessQueueA() {
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();
}
@Bean("businessQueueB")
public Queue businessQueueB() {
Map<String, Object> map = new HashMap<>(2);
map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();
}
/****
* 死信队列A
* @return
*/
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
/****
* 死信队列B
* @return
*/
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB() {
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
/***
* 死信队列A绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
/****
* 死信队列B绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
队列消费者代码,A,B 两个队列
package com.codrpwh.rabbitmq.mq.consumer;
import com.codrpwh.rabbitmq.config.RabbitMQDeadConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import java.io.IOException;
@Slf4j
@Component
public class BusinessMessageReceiver {
@RabbitListener(queues = RabbitMQDeadConfig.BUSINESS_QUEUEA_NAME)
public void receiverA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到业务消息A:{}", msg);
boolean ack = true;
Exception exception = null;
try {
if (msg.contains("deadletter")) {
throw new RuntimeException("dead letter exception");
}
} catch (Exception e) {
ack = false;
exception = e;
}
if (!ack) {
log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RabbitListener(queues = RabbitMQDeadConfig.BUSINESS_QUEUEB_NAME)
public void receiverB(Message message, Channel channel) throws IOException {
log.info("收到业务消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
队列生产者,对照上面的流程图,A与B共用一个交换机
package com.codrpwh.rabbitmq.mq.producer;
import com.codrpwh.rabbitmq.config.RabbitMQDeadConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class BusinessMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg) {
rabbitTemplate.convertSendAndReceive(RabbitMQDeadConfig.BUSINESS_EXCHANGE_NAME, "", msg);
}
}
测试代码,如下,通过controller 来发送请求测试
package com.codrpwh.rabbitmq.controller;
import com.codrpwh.rabbitmq.common.ServerResponse;
import com.codrpwh.rabbitmq.mq.producer.BusinessMessageSender;
import com.codrpwh.rabbitmq.mq.producer.DelayMessageSender;
import com.codrpwh.rabbitmq.pojo.Mail;
import com.codrpwh.rabbitmq.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
@RestController
@RequestMapping(value = "/test")
@Slf4j
public class TestController {
@Resource
private TestService testService;
@Resource
private BusinessMessageSender sender;
@Resource
private DelayMessageSender delayMessageSender;
@RequestMapping(value = "/send", method = RequestMethod.POST)
public ServerResponse sendMail(@Validated @RequestBody Mail mail, Errors errors) {
if (errors.hasErrors()) {
String msg = errors.getFieldError().getDefaultMessage();
return ServerResponse.error(msg);
}
return testService.send(mail);
}
@RequestMapping(value = "/sendmsg", method = RequestMethod.GET)
public void sendMsg(String msg) {
sender.sendMsg(msg);
}
@RequestMapping(value = "/delay/msg", method = RequestMethod.GET)
public void sendMesg(String msg, Integer delayType) {
log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType);
delayMessageSender.sendMsg(msg, delayType);
}
@RequestMapping(value = "/message", method = RequestMethod.GET)
public void sendMessage(String message, Integer delayType) {
log.info("当前时间为:{},收到请求,msg:{} delayType:{}", new Date(),message, delayType);
delayMessageSender.sendMsgs(message, delayType);
}
}
测试结果图下图:
四: 关于死信消息的变化
结合上面的队列流程图
1.业务队列A,路由是 routingkeyA ,交换机是exchange. 业务队列A绑定了死信。业务队列A—>死信队列A ,业务队列B–>死信队列B
2.交换机,业务队列A,B分别绑定交换机与死信交换机。 交换机–>死信交换机
3.路由则是由routingkeyA—> dead_routingkeyA ,routtingkeyB –>dead_routingkeyB
五: 发生死信消息的原因
1.requeue 参数被设置为false, 消费方使用 basic.reject 或 basic.nack 否定应答(nack) 消息。
2.消息重试达到上限,消息重试失败