RabbitMQ延时队列


在学习RabbitMQ的延时队列时,需要对RabbitMQ的死信队列有了解,需要先学习它的死信队列

  1. 什么是延时队列
  2. 为什么要用延时队列?
  3. 如何使用延时队列?

一: 什么是延时队列

延时队列是一种有序的队列,它是有方向性,从这头进另外一头出。最重要的就是具有时间性,在正常情况下,普通队列是等待被消费者进行消费,然而延时队列则是在指定时间点才被消费,延时队列也是在死信队列的基础上实现的,只是多了时间属性。

二: 为什么要用延时队列?

使用延时队列,主要是利用该队列的时间属性,用其它的队列则会消费更多的资源与性能损耗。具体使用延时队列还是普通队列要看具体的业务场景。如果不用延时队列,用相关的定时任务(xx-job),也可以实现,那只是部分可以使用,并且在单位时间执行的定时任务过多,数据量过大,使用定时任务性能会有所损耗,资源浪费。下面场景一般适用于延时队列来实现。

  1. 电商下单,30分钟,未支付则自动取消订单
  2. 电商支付完成后,用户的积分在24小时内累加
  3. 商品促销,在指定时间内对用户推送相关商品
  4. 直播预约,开播提醒

三: 如何使用延时队列?

  1. 延时队列最主要体现在它的延时性上,也是RabbitMQ的一个高级特新TTL(Time To Live),
    TTL 是表明队列中的消息的存活时间,单位是毫秒。

声明的延时队列,队列的存活时间是6秒。时间过期后,消息则会被丢失。如果不设置TTL,消息则不会过期的。

public Queue delayQueueA() {
     Map<String, Object> args = new HashMap<>();
     args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
     args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);

     args.put("x-message-ttl", 6000);

     return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
 }
  1. 延时消息的流程图

生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

  1. 代码实现

RabbitMQ的配置

package com.codrpwh.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQDelayConfig {


    public static final String DELAY_EXCHANGE_NAME = "coderpwh-delay.queue.demo.business.exchange";

    public static final String DELAY_QUEUEA_NAME = "coderpwh-delay.queue.demo.business.queuea";

    public static final String DELAY_QUEUEB_NAME = "coderpwh-delay.queue.demo.business.queueb";

    public static final String DELAY_QUEUEA_ROUTING_KEY = "coderpwh-delay.queue.demo.business.queuea.routingkey";

    public static final String DELAY_QUEUEB_ROUTING_KEY = "coderpwh-delay.queue.demo.business.queueb.routingkey";

    public static final String DEAD_LETTER_EXCHANGE = "coderpwh-delay.queue.demo.deadletter.exchange";

    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "coderpwh-delay.queue.demo.deadletter.delay_10s.routingkey";

    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "coderpwh-delay.queue.demo.deadletter.delay_60s.routingkey";

    public static final String DEAD_LETTER_QUEUEA_NAME = "coderpwh-delay.queue.demo.deadletter.queuea";

    public static final String DEAD_LETTER_QUEUEB_NAME = "coderpwh-delay.queue.demo.deadletter.queueb";


    /***
     *   延迟队列--交换机-Bean
     * @return
     */
    @Bean("coderpwh-delayExchange")
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }


    /***
     *  死信队列--交换机--Bean
     *
     * @return
     */
    @Bean("coderpwh-deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }


    /***
     *  延迟队列A--6秒
     * @return
     */
    @Bean("coderpwh-delayQueueA")
    public Queue delayQueueA() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);

        args.put("x-message-ttl", 6000);

        return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
    }

    /***
     *  延迟队列B
     * @return
     */
    @Bean("coderpwh-delayQueueB")
    public Queue delayQueueB() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);

        args.put("x-message-ttl", 60000);

        return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
    }


    /***
     *  死信队列-A
     * @return
     */
    @Bean("coderpwh-deadLetterQueueA")
    public Queue deadLetterQueueA() {
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    /***
     * 死信队列-B
     * @return
     */
    @Bean("coderpwh-deadLetterQueueB")
    public Queue deadLetterQuueB() {
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }


    /***
     *  延迟队列A与延迟交换机进行绑定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayBindingA(@Qualifier("coderpwh-delayQueueA") Queue queue, @Qualifier("coderpwh-delayExchange") DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
    }


    /***
     *   延迟队列B与延迟交换机进行绑定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding delayBindingB(@Qualifier("coderpwh-delayQueueB") Queue queue, @Qualifier("coderpwh-delayExchange") DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
    }


    /***
     *    死信队列A绑定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding coderpwhdeadLetterBindingA(@Qualifier("coderpwh-deadLetterQueueA") Queue queue, @Qualifier("coderpwh-deadLetterExchange") DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }


    /****
     *   死信队列B绑定关系
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding coderpwhdeadLetterBindingB(@Qualifier("coderpwh-deadLetterQueueB") Queue queue, @Qualifier("coderpwh-deadLetterExchange") DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }


}

消费方代码, 消费方A,B 是共用一个死信交换机的。消费方C是另外一个交换机,不在面的流程图中。

package com.codrpwh.rabbitmq.mq.consumer;

import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;

import java.io.IOException;
import java.util.Date;

import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEA_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEB_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig.DEAD_LETTER_QUEUEC_NAME;

@Slf4j
@Component
public class DeadLetterQueueConsumer {


    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("当前时间:{},死信队列A收到的消息为:{}", new Date().toString(), msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }


    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("当前时间:{},死信队列B收到的消息为:{}", new Date().toString(), msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
    public void receiveC(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("当前时间:{},死信队列C收到消息为:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }


}

生产方代码, sendMsg 主要对应上面的图中的A,B

package com.codrpwh.rabbitmq.mq.producer;

import com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class DelayMessageSender {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg, Integer type) {

        switch (type) {
            case 10:
                log.info("已进入10");
                rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEA_ROUTING_KEY, msg);
                break;
            case 60:
                log.info("已进入60");
                rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEB_ROUTING_KEY, msg);
                break;
        }

    }

    public void sendMsgs(String msg, Integer delayTime) {
        log.info("消息生产者");
        rabbitTemplate.convertAndSend(RabbitMQDelayCConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayCConfig.DELAY_QUEUEC_ROUTING_KEY, msg, a -> {
            a.getMessageProperties().setExpiration(String.valueOf(delayTime));
            return a;
        });
    }


}

控制层Controller

package com.codrpwh.rabbitmq.controller;

import com.codrpwh.rabbitmq.common.ServerResponse;
import com.codrpwh.rabbitmq.mq.producer.BusinessMessageSender;
import com.codrpwh.rabbitmq.mq.producer.DelayMessageSender;
import com.codrpwh.rabbitmq.pojo.Mail;
import com.codrpwh.rabbitmq.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

@RestController
@RequestMapping(value = "/test")
@Slf4j
public class TestController {


    @Resource
    private TestService testService;

    @Resource
    private BusinessMessageSender sender;

    @Resource
    private DelayMessageSender delayMessageSender;


    @RequestMapping(value = "/send", method = RequestMethod.POST)
    public ServerResponse sendMail(@Validated @RequestBody Mail mail, Errors errors) {

        if (errors.hasErrors()) {
            String msg = errors.getFieldError().getDefaultMessage();
            return ServerResponse.error(msg);
        }
        return testService.send(mail);
    }


    @RequestMapping(value = "/sendmsg", method = RequestMethod.GET)
    public void sendMsg(String msg) {
        sender.sendMsg(msg);
    }


    @RequestMapping(value = "/delay/msg", method = RequestMethod.GET)
    public void sendMesg(String msg, Integer delayType) {
        log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType);
        delayMessageSender.sendMsg(msg, delayType);
}


    @RequestMapping(value = "/message", method = RequestMethod.GET)
    public void sendMessage(String message, Integer delayType) {
        log.info("当前时间为:{},收到请求,msg:{} delayType:{}", new Date(),message, delayType);
        delayMessageSender.sendMsgs(message, delayType);
    }


}

启动服务,调用接口,localhost:8088/test/delay/msg?msg=hello&delayType=60

测试结果如下:

type为10 则延时6秒。6秒后,消息被消费者A被消费掉

type为60 则延时60秒。 60秒后,消费者B被消费掉

上面是针对A,B 两个进行了延时消费,一般开发中不能够复用,不具有共用性。在列举一列来实现,流程图跟上面是一样的,可以自行脑补。

RabbitMQ的配置,这回没有队列里面设置TTL了。

package com.codrpwh.rabbitmq.config;

import com.sun.tools.internal.xjc.reader.xmlschema.BindGreen;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Handler;

/**
 * @author coderpwh
 */
@Configuration
public class RabbitMQDelayCConfig {


    public static final String DELAY_EXCHANGE_NAME = "c-delay.exchange";

    public static final String DELAY_QUEUEC_NAME = "c-delay.queue";

    public static final String DELAY_QUEUEC_ROUTING_KEY = "c-delay.routingkey";


    /***
     *  死信队列,交换机,路由
     */
    public static final String DEAD_LETTER_EXCHANGE = "c-letter.exchange";

    public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "c-letter.routingkey";

    public static final String DEAD_LETTER_QUEUEC_NAME = "c-letter.queue";


    /***
     *  延迟交换机
     * @return
     */

    @Bean("c-delayExchange")
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }


    /***
     *  死信交换机
     * @return
     */

    @Bean("c-deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }


    /***
     * 延迟队列
     *
     * @return
     */

    @Bean("c-delayQueueC")
    public Queue delayQueueC() {
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
        return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
    }


    /***
     *  死信队列
     * @return
     */

    @Bean("c-deadLetterQueueC")
    public Queue deadLetterQueueC() {
        return new Queue(DEAD_LETTER_QUEUEC_NAME);
    }


    /***
     *  延迟队列C与交换机进行绑定
     * @param queue
     * @param exchange
     * @return
     */

    @Bean
    public Binding delayBindingC(@Qualifier("c-delayQueueC") Queue queue, @Qualifier("c-delayExchange") DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
    }

    /***
     *   死信队列
     * @param queue
     * @param exchange
     * @return
     */

    @Bean
    public Binding deadLetterBinding(@Qualifier("c-deadLetterQueueC") Queue queue, @Qualifier("c-deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
    }


}

消费方C ,最下面的方法 receiveC

package com.codrpwh.rabbitmq.mq.consumer;

import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;

import java.io.IOException;
import java.util.Date;

import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEA_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEB_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig.DEAD_LETTER_QUEUEC_NAME;

@Slf4j
@Component
public class DeadLetterQueueConsumer {


    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("当前时间:{},死信队列A收到的消息为:{}", new Date().toString(), msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }


    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("当前时间:{},死信队列B收到的消息为:{}", new Date().toString(), msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
    public void receiveC(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("当前时间:{},死信队列C收到消息为:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }


}

生产方的代码,下面的sendMsgs方法

package com.codrpwh.rabbitmq.mq.producer;

import com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class DelayMessageSender {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg, Integer type) {

        switch (type) {
            case 10:
                log.info("已进入10");
                rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEA_ROUTING_KEY, msg);
                break;
            case 60:
                log.info("已进入60");
                rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEB_ROUTING_KEY, msg);
                break;
        }

    }

    public void sendMsgs(String msg, Integer delayTime) {
        log.info("消息生产者");
        rabbitTemplate.convertAndSend(RabbitMQDelayCConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayCConfig.DELAY_QUEUEC_ROUTING_KEY, msg, a -> {
            a.getMessageProperties().setExpiration(String.valueOf(delayTime));
            return a;
        });
    }


}

测试代码,Controller控制层 ,sendMessage 方法

package com.codrpwh.rabbitmq.controller;

import com.codrpwh.rabbitmq.common.ServerResponse;
import com.codrpwh.rabbitmq.mq.producer.BusinessMessageSender;
import com.codrpwh.rabbitmq.mq.producer.DelayMessageSender;
import com.codrpwh.rabbitmq.pojo.Mail;
import com.codrpwh.rabbitmq.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

@RestController
@RequestMapping(value = "/test")
@Slf4j
public class TestController {

    @Resource
    private TestService testService;

    @Resource
    private BusinessMessageSender sender;

    @Resource
    private DelayMessageSender delayMessageSender;


    @RequestMapping(value = "/send", method = RequestMethod.POST)
    public ServerResponse sendMail(@Validated @RequestBody Mail mail, Errors errors) {

        if (errors.hasErrors()) {
            String msg = errors.getFieldError().getDefaultMessage();
            return ServerResponse.error(msg);
        }
        return testService.send(mail);
    }


    @RequestMapping(value = "/sendmsg", method = RequestMethod.GET)
    public void sendMsg(String msg) {
        sender.sendMsg(msg);
    }


    @RequestMapping(value = "/delay/msg", method = RequestMethod.GET)
    public void sendMesg(String msg, Integer delayType) {
        log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType);
        delayMessageSender.sendMsg(msg, delayType);
    }


    @RequestMapping(value = "/message", method = RequestMethod.GET)
    public void sendMessage(String message, Integer delayType) {
        log.info("当前时间为:{},收到请求,msg:{} delayType:{}", new Date(), message, delayType);
        elayMessageSender.sendMsgs(message, delayType);
    }


}

请求接口:localhost:8088/test/message?message=hello world&delayType=8000


测试结果如下:


结合上面生产者的代码,这样延时的时间就是可以灵活,当做参数来传。postman里面的传的是8000,消息延时8秒则被消费,过期则丢失消息了。

这只是RabbitMQ的demo,在项目中依旧是要结合业务场景来进行使用。在电商中用延时的队列的场景是比较多,RabbitMQ 也是消息中间件中的一种,
有精力可以把其他的中间件,kafka,RocketMQ都尝试一遍!


文章作者: coderpwh
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 coderpwh !
  目录