RabbitMQ死信队列


对RabbitMQ的死信队列的学习,我们还是带着问题去学习它吧!

  1. 为什么要用死信队列?
  2. 死信队列是什么?
  3. 如何配置死信队列?
  4. 死信消息如何变化?

一: 为什么要用死信队列?

当一条消息初次消费失败,消息队列MQ会自动进行消息重试,当达到消息最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法进行消费。此时,消息队列MQ不会立即将消息丢失,而是将发送到消费者对应的特殊队列中。这种无法正常情况下被消费的消息称为死信消息(Dead-Letter Message)

二: 死信队列是什么?

死信队列就是RabbitMQ中的一种消息机制,在正常情况下无法被消费的消息称为死信消息,存储死信消息的队列被称为死信队列。发生以下情况则会发生死信消息的产生。

  1. 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false
  2. 消息在队列的存活时间超过设置的TTL时间
  3. 消息队列的消息数量已经超过最大队列长度

三: 如何配置死信队列?

具体配置死信队列,有下面几个步骤

  1. 配置业务队列,绑定到对应的业务交换机上
  2. 对应的业务队列配置对应的交换机,和路由key
  3. 为死信交换机配置死信队列

流程图如下:

具体来看代码实现,采用SpringBoot来实现的

springBoot 文件配置

server.port=8088
spring.resources.static-locations=classpath:/templates/,classpath:/static/
spring.mvc.view.suffix=.html

# redis
spring.redis.host=192.168.0.150
spring.redis.port=6379
spring.redis.password=888888
spring.redis.jedis.pool.max-idle=32
spring.redis.jedis.pool.max-wait=-1
spring.redis.jedis.pool.min-idle=0
spring.redis.timeout=0

# mysql
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.url=jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&autoR&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# show sql in console
logging.level.com.wangzaiplus.test.mapper=debug

# rabbitmq
spring.rabbitmq.host=192.168.36.2
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
# ����confirms�ص� P -> Exchange
spring.rabbitmq.publisher-confirms=true
# ����returnedMessage�ص� Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# �����ֶ�ȷ��(ack) Queue -> C
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#spring.rabbitmq.listener.simple.prefetch=100


# mail
spring.mail.host=smtp.163.com
spring.mail.username=coderpwh@163.com
spring.mail.password=WSNXGQUBFUWLSSBK
spring.mail.from=coderpwh@163.com
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true

ok.http.connect-timeout=30
ok.http.read-timeout=30
ok.http.write-timeout=30
# ���ӳ�������Ŀ������ӵ��������
ok.http.max-idle-connections=200
# ���ӿ���ʱ�����Ϊ 300 ��
ok.http.keep-alive-duration=300

RabbitMQ 文件配置类

package com.codrpwh.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQDeadConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";

    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";

    public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";

    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";

    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";

    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";

    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";

    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";


    /***
     *  普通交换机
     *
     * @return
     */
    @Bean("businessExchange")
    public FanoutExchange businessExchange() {
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    /***
     *  死信队列
     * @return
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    @Bean("businessQueueA")
    public Queue businessQueueA() {
        Map<String, Object> map = new HashMap<>();

        map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);

        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();
    }

    @Bean("businessQueueB")
    public Queue businessQueueB() {
        Map<String, Object> map = new HashMap<>(2);

        map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);

        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();
    }


    /****
     *   死信队列A
     * @return
     */
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }


    /****
     *  死信队列B
     * @return
     */
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB() {
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }


    @Bean
    public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }


    /***
     *   死信队列A绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    /****
     *   死信队列B绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }

}

队列消费者代码,A,B 两个队列

package com.codrpwh.rabbitmq.mq.consumer;

import com.codrpwh.rabbitmq.config.RabbitMQDeadConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;

import java.io.IOException;

@Slf4j
@Component
public class BusinessMessageReceiver {


    @RabbitListener(queues = RabbitMQDeadConfig.BUSINESS_QUEUEA_NAME)
    public void receiverA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("收到业务消息A:{}", msg);
        boolean ack = true;

        Exception exception = null;

        try {
            if (msg.contains("deadletter")) {
                throw new RuntimeException("dead letter exception");
            }

        } catch (Exception e) {
            ack = false;
            exception = e;
        }

        if (!ack) {
            log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    @RabbitListener(queues = RabbitMQDeadConfig.BUSINESS_QUEUEB_NAME)
    public void receiverB(Message message, Channel channel) throws IOException {
        log.info("收到业务消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


}


队列生产者,对照上面的流程图,A与B共用一个交换机

package com.codrpwh.rabbitmq.mq.producer;

import com.codrpwh.rabbitmq.config.RabbitMQDeadConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class BusinessMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg) {
        rabbitTemplate.convertSendAndReceive(RabbitMQDeadConfig.BUSINESS_EXCHANGE_NAME, "", msg);
    }


}

测试代码,如下,通过controller 来发送请求测试

package com.codrpwh.rabbitmq.controller;

import com.codrpwh.rabbitmq.common.ServerResponse;
import com.codrpwh.rabbitmq.mq.producer.BusinessMessageSender;
import com.codrpwh.rabbitmq.mq.producer.DelayMessageSender;
import com.codrpwh.rabbitmq.pojo.Mail;
import com.codrpwh.rabbitmq.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

@RestController
@RequestMapping(value = "/test")
@Slf4j
public class TestController {


    @Resource
    private TestService testService;

    @Resource
    private BusinessMessageSender sender;

    @Resource
    private DelayMessageSender delayMessageSender;


    @RequestMapping(value = "/send", method = RequestMethod.POST)
    public ServerResponse sendMail(@Validated @RequestBody Mail mail, Errors errors) {

        if (errors.hasErrors()) {
            String msg = errors.getFieldError().getDefaultMessage();
            return ServerResponse.error(msg);
        }
        return testService.send(mail);
    }


    @RequestMapping(value = "/sendmsg", method = RequestMethod.GET)
    public void sendMsg(String msg) {
        sender.sendMsg(msg);
    }


    @RequestMapping(value = "/delay/msg", method = RequestMethod.GET)
    public void sendMesg(String msg, Integer delayType) {
        log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType);
        delayMessageSender.sendMsg(msg, delayType);
    }


    @RequestMapping(value = "/message", method = RequestMethod.GET)
    public void sendMessage(String message, Integer delayType) {
        log.info("当前时间为:{},收到请求,msg:{} delayType:{}", new Date(),message, delayType);
        delayMessageSender.sendMsgs(message, delayType);
    }


}

测试结果图下图:

四: 关于死信消息的变化

结合上面的队列流程图

1.业务队列A,路由是 routingkeyA ,交换机是exchange. 业务队列A绑定了死信。业务队列A—>死信队列A ,业务队列B–>死信队列B

2.交换机,业务队列A,B分别绑定交换机与死信交换机。 交换机–>死信交换机

3.路由则是由routingkeyA—> dead_routingkeyA ,routtingkeyB –>dead_routingkeyB

五: 发生死信消息的原因

1.requeue 参数被设置为false, 消费方使用 basic.reject 或 basic.nack 否定应答(nack) 消息。

2.消息重试达到上限,消息重试失败


文章作者: coderpwh
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 coderpwh !
  目录