在学习RabbitMQ的延时队列时,需要对RabbitMQ的死信队列有了解,需要先学习它的死信队列
- 什么是延时队列
- 为什么要用延时队列?
- 如何使用延时队列?
一: 什么是延时队列
延时队列是一种有序的队列,它是有方向性,从这头进另外一头出。最重要的就是具有时间性,在正常情况下,普通队列是等待被消费者进行消费,然而延时队列则是在指定时间点才被消费,延时队列也是在死信队列的基础上实现的,只是多了时间属性。
二: 为什么要用延时队列?
使用延时队列,主要是利用该队列的时间属性,用其它的队列则会消费更多的资源与性能损耗。具体使用延时队列还是普通队列要看具体的业务场景。如果不用延时队列,用相关的定时任务(xx-job),也可以实现,那只是部分可以使用,并且在单位时间执行的定时任务过多,数据量过大,使用定时任务性能会有所损耗,资源浪费。下面场景一般适用于延时队列来实现。
- 电商下单,30分钟,未支付则自动取消订单
- 电商支付完成后,用户的积分在24小时内累加
- 商品促销,在指定时间内对用户推送相关商品
- 直播预约,开播提醒
三: 如何使用延时队列?
- 延时队列最主要体现在它的延时性上,也是RabbitMQ的一个高级特新TTL(Time To Live),
TTL 是表明队列中的消息的存活时间,单位是毫秒。
声明的延时队列,队列的存活时间是6秒。时间过期后,消息则会被丢失。如果不设置TTL,消息则不会过期的。
public Queue delayQueueA() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
args.put("x-message-ttl", 6000);
return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
}
- 延时消息的流程图
生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。
- 代码实现
RabbitMQ的配置
package com.codrpwh.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQDelayConfig {
public static final String DELAY_EXCHANGE_NAME = "coderpwh-delay.queue.demo.business.exchange";
public static final String DELAY_QUEUEA_NAME = "coderpwh-delay.queue.demo.business.queuea";
public static final String DELAY_QUEUEB_NAME = "coderpwh-delay.queue.demo.business.queueb";
public static final String DELAY_QUEUEA_ROUTING_KEY = "coderpwh-delay.queue.demo.business.queuea.routingkey";
public static final String DELAY_QUEUEB_ROUTING_KEY = "coderpwh-delay.queue.demo.business.queueb.routingkey";
public static final String DEAD_LETTER_EXCHANGE = "coderpwh-delay.queue.demo.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "coderpwh-delay.queue.demo.deadletter.delay_10s.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "coderpwh-delay.queue.demo.deadletter.delay_60s.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "coderpwh-delay.queue.demo.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "coderpwh-delay.queue.demo.deadletter.queueb";
/***
* 延迟队列--交换机-Bean
* @return
*/
@Bean("coderpwh-delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
/***
* 死信队列--交换机--Bean
*
* @return
*/
@Bean("coderpwh-deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
/***
* 延迟队列A--6秒
* @return
*/
@Bean("coderpwh-delayQueueA")
public Queue delayQueueA() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
args.put("x-message-ttl", 6000);
return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
}
/***
* 延迟队列B
* @return
*/
@Bean("coderpwh-delayQueueB")
public Queue delayQueueB() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
args.put("x-message-ttl", 60000);
return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
}
/***
* 死信队列-A
* @return
*/
@Bean("coderpwh-deadLetterQueueA")
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
/***
* 死信队列-B
* @return
*/
@Bean("coderpwh-deadLetterQueueB")
public Queue deadLetterQuueB() {
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
/***
* 延迟队列A与延迟交换机进行绑定
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingA(@Qualifier("coderpwh-delayQueueA") Queue queue, @Qualifier("coderpwh-delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
}
/***
* 延迟队列B与延迟交换机进行绑定
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingB(@Qualifier("coderpwh-delayQueueB") Queue queue, @Qualifier("coderpwh-delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
}
/***
* 死信队列A绑定
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding coderpwhdeadLetterBindingA(@Qualifier("coderpwh-deadLetterQueueA") Queue queue, @Qualifier("coderpwh-deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
/****
* 死信队列B绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding coderpwhdeadLetterBindingB(@Qualifier("coderpwh-deadLetterQueueB") Queue queue, @Qualifier("coderpwh-deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
消费方代码, 消费方A,B 是共用一个死信交换机的。消费方C是另外一个交换机,不在面的流程图中。
package com.codrpwh.rabbitmq.mq.consumer;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import java.io.IOException;
import java.util.Date;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEA_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEB_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig.DEAD_LETTER_QUEUEC_NAME;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列A收到的消息为:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列B收到的消息为:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列C收到消息为:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
生产方代码, sendMsg 主要对应上面的图中的A,B
package com.codrpwh.rabbitmq.mq.producer;
import com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg, Integer type) {
switch (type) {
case 10:
log.info("已进入10");
rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEA_ROUTING_KEY, msg);
break;
case 60:
log.info("已进入60");
rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEB_ROUTING_KEY, msg);
break;
}
}
public void sendMsgs(String msg, Integer delayTime) {
log.info("消息生产者");
rabbitTemplate.convertAndSend(RabbitMQDelayCConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayCConfig.DELAY_QUEUEC_ROUTING_KEY, msg, a -> {
a.getMessageProperties().setExpiration(String.valueOf(delayTime));
return a;
});
}
}
控制层Controller
package com.codrpwh.rabbitmq.controller;
import com.codrpwh.rabbitmq.common.ServerResponse;
import com.codrpwh.rabbitmq.mq.producer.BusinessMessageSender;
import com.codrpwh.rabbitmq.mq.producer.DelayMessageSender;
import com.codrpwh.rabbitmq.pojo.Mail;
import com.codrpwh.rabbitmq.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
@RestController
@RequestMapping(value = "/test")
@Slf4j
public class TestController {
@Resource
private TestService testService;
@Resource
private BusinessMessageSender sender;
@Resource
private DelayMessageSender delayMessageSender;
@RequestMapping(value = "/send", method = RequestMethod.POST)
public ServerResponse sendMail(@Validated @RequestBody Mail mail, Errors errors) {
if (errors.hasErrors()) {
String msg = errors.getFieldError().getDefaultMessage();
return ServerResponse.error(msg);
}
return testService.send(mail);
}
@RequestMapping(value = "/sendmsg", method = RequestMethod.GET)
public void sendMsg(String msg) {
sender.sendMsg(msg);
}
@RequestMapping(value = "/delay/msg", method = RequestMethod.GET)
public void sendMesg(String msg, Integer delayType) {
log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType);
delayMessageSender.sendMsg(msg, delayType);
}
@RequestMapping(value = "/message", method = RequestMethod.GET)
public void sendMessage(String message, Integer delayType) {
log.info("当前时间为:{},收到请求,msg:{} delayType:{}", new Date(),message, delayType);
delayMessageSender.sendMsgs(message, delayType);
}
}
启动服务,调用接口,localhost:8088/test/delay/msg?msg=hello&delayType=60
测试结果如下:
type为10 则延时6秒。6秒后,消息被消费者A被消费掉
type为60 则延时60秒。 60秒后,消费者B被消费掉
上面是针对A,B 两个进行了延时消费,一般开发中不能够复用,不具有共用性。在列举一列来实现,流程图跟上面是一样的,可以自行脑补。
RabbitMQ的配置,这回没有队列里面设置TTL了。
package com.codrpwh.rabbitmq.config;
import com.sun.tools.internal.xjc.reader.xmlschema.BindGreen;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Handler;
/**
* @author coderpwh
*/
@Configuration
public class RabbitMQDelayCConfig {
public static final String DELAY_EXCHANGE_NAME = "c-delay.exchange";
public static final String DELAY_QUEUEC_NAME = "c-delay.queue";
public static final String DELAY_QUEUEC_ROUTING_KEY = "c-delay.routingkey";
/***
* 死信队列,交换机,路由
*/
public static final String DEAD_LETTER_EXCHANGE = "c-letter.exchange";
public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "c-letter.routingkey";
public static final String DEAD_LETTER_QUEUEC_NAME = "c-letter.queue";
/***
* 延迟交换机
* @return
*/
@Bean("c-delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
/***
* 死信交换机
* @return
*/
@Bean("c-deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
/***
* 延迟队列
*
* @return
*/
@Bean("c-delayQueueC")
public Queue delayQueueC() {
Map<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
}
/***
* 死信队列
* @return
*/
@Bean("c-deadLetterQueueC")
public Queue deadLetterQueueC() {
return new Queue(DEAD_LETTER_QUEUEC_NAME);
}
/***
* 延迟队列C与交换机进行绑定
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingC(@Qualifier("c-delayQueueC") Queue queue, @Qualifier("c-delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
}
/***
* 死信队列
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBinding(@Qualifier("c-deadLetterQueueC") Queue queue, @Qualifier("c-deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
}
}
消费方C ,最下面的方法 receiveC
package com.codrpwh.rabbitmq.mq.consumer;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import java.io.IOException;
import java.util.Date;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEA_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEB_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig.DEAD_LETTER_QUEUEC_NAME;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列A收到的消息为:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列B收到的消息为:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列C收到消息为:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
生产方的代码,下面的sendMsgs方法
package com.codrpwh.rabbitmq.mq.producer;
import com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String msg, Integer type) {
switch (type) {
case 10:
log.info("已进入10");
rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEA_ROUTING_KEY, msg);
break;
case 60:
log.info("已进入60");
rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEB_ROUTING_KEY, msg);
break;
}
}
public void sendMsgs(String msg, Integer delayTime) {
log.info("消息生产者");
rabbitTemplate.convertAndSend(RabbitMQDelayCConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayCConfig.DELAY_QUEUEC_ROUTING_KEY, msg, a -> {
a.getMessageProperties().setExpiration(String.valueOf(delayTime));
return a;
});
}
}
测试代码,Controller控制层 ,sendMessage 方法
package com.codrpwh.rabbitmq.controller;
import com.codrpwh.rabbitmq.common.ServerResponse;
import com.codrpwh.rabbitmq.mq.producer.BusinessMessageSender;
import com.codrpwh.rabbitmq.mq.producer.DelayMessageSender;
import com.codrpwh.rabbitmq.pojo.Mail;
import com.codrpwh.rabbitmq.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
@RestController
@RequestMapping(value = "/test")
@Slf4j
public class TestController {
@Resource
private TestService testService;
@Resource
private BusinessMessageSender sender;
@Resource
private DelayMessageSender delayMessageSender;
@RequestMapping(value = "/send", method = RequestMethod.POST)
public ServerResponse sendMail(@Validated @RequestBody Mail mail, Errors errors) {
if (errors.hasErrors()) {
String msg = errors.getFieldError().getDefaultMessage();
return ServerResponse.error(msg);
}
return testService.send(mail);
}
@RequestMapping(value = "/sendmsg", method = RequestMethod.GET)
public void sendMsg(String msg) {
sender.sendMsg(msg);
}
@RequestMapping(value = "/delay/msg", method = RequestMethod.GET)
public void sendMesg(String msg, Integer delayType) {
log.info("当前时间:{},收到请求,msg:{},delayType:{}", new Date(), msg, delayType);
delayMessageSender.sendMsg(msg, delayType);
}
@RequestMapping(value = "/message", method = RequestMethod.GET)
public void sendMessage(String message, Integer delayType) {
log.info("当前时间为:{},收到请求,msg:{} delayType:{}", new Date(), message, delayType);
elayMessageSender.sendMsgs(message, delayType);
}
}
请求接口:localhost:8088/test/message?message=hello world&delayType=8000
测试结果如下:
结合上面生产者的代码,这样延时的时间就是可以灵活,当做参数来传。postman里面的传的是8000,消息延时8秒则被消费,过期则丢失消息了。
这只是RabbitMQ的demo,在项目中依旧是要结合业务场景来进行使用。在电商中用延时的队列的场景是比较多,RabbitMQ 也是消息中间件中的一种,
有精力可以把其他的中间件,kafka,RocketMQ都尝试一遍!