在微服务中,一般会用到不同的消息中间件。但在使用这些中间件的同时,有的需要结合业场景去实现,有的则需要靠MQ本身去实现。
MQ是如何保证消息的幂等性?MQ是如何保证消息不被重复消费?MQ又是如何保证消息可靠投递?
本文参考了沈大师分享的
MQ,究竟如何保证消息幂等,我们就在该基础上用SpirngBoot+RabbitMQ来实现消息的幂等性。

MQ的幂等性,由两部分构成:
1.MQ发送端,到MQ-server的幂等性(上半场)
2.MQ-server,到接收端的幂等性(下半场)
我们先来看看MQ发送消息到上半场,上图中的1-3部分
- MQ-client将消息发送到MQ-server
- MQ-server将详细落地,写入db等
- MQ-server回消息给MQ-client
如果3丢失,发送端MQ-client超时后会重发消息,可能导致服务端MQ-server收到重复消息。
此时重发是MQ-client发起的,消息的处理是MQ-server,为了避免步骤2落地重复的消息,对每条消息,MQ系统内部必须生成一个inner-msg-id,作为去重和幂等的依据,这个内部消息ID的特性是:
- 全局唯一
- MQ生成,具备业务无关性,对消息发送方和消息接收方屏蔽
有了这个inner-msg-id,就能保证上半场重发,也只有1条消息落到MQ-server的DB中,实现上半场幂等。
实现代码
SpringBoot的配置文件
server.port=8088
spring.resources.static-locations=classpath:/templates/,classpath:/static/
spring.mvc.view.suffix=.html
# redis
spring.redis.host=
spring.redis.port=
spring.redis.password=
spring.redis.jedis.pool.max-idle=32
spring.redis.jedis.pool.max-wait=-1
spring.redis.jedis.pool.min-idle=0
spring.redis.timeout=0
# mysql
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.url=jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&autoR&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# show sql in console
logging.level.com.wangzaiplus.test.mapper=debug
# rabbitmq
spring.rabbitmq.host=192.168.36.2
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
# ����confirms�ص� P -> Exchange
spring.rabbitmq.publisher-confirms=true
# ����returnedMessage�ص� Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# �����ֶ�ȷ��(ack) Queue -> C
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#spring.rabbitmq.listener.simple.prefetch=100
# mail
spring.mail.host=
spring.mail.username=
spring.mail.password=
spring.mail.from=c
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
ok.http.connect-timeout=30
ok.http.read-timeout=30
ok.http.write-timeout=30
# ���ӳ�������Ŀ������ӵ��������
ok.http.max-idle-connections=200
# ���ӿ���ʱ�����Ϊ 300 ��
ok.http.keep-alive-duration=300
下面2个是分别开启的RabbitMQ的消息确认和回退
- spring.rabbitmq.publisher-returns=true
- spring.rabbitmq.publisher-confirms=true
RabbitMQ关于队列,交换机,路由及死信队列相关的配置。
package com.codrpwh.rabbitmq.config;
import com.sun.tools.internal.xjc.reader.xmlschema.BindGreen;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Handler;
/**
* @author coderpwh
*/
@Configuration
public class RabbitMQDelayCConfig {
public static final String DELAY_EXCHANGE_NAME = "cc-delay.exchange";
public static final String DELAY_QUEUEC_NAME = "cc-delay.queue";
public static final String DELAY_QUEUEC_ROUTING_KEY = "cc-delay.routingkey";
/***
* 死信队列,交换机,路由
*/
public static final String DEAD_LETTER_EXCHANGE = "cc-letter.exchange";
public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "cc-letter.routingkey";
public static final String DEAD_LETTER_QUEUEC_NAME = "cc-letter.queue";
/***
* 延迟交换机
* @return
*/
@Bean("c-delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
/***
* 死信交换机
* @return
*/
@Bean("c-deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
/***
* 延迟队列
*
* @return
*/
@Bean("c-delayQueueC")
public Queue delayQueueC() {
Map<String, Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
}
/***
* 死信队列
* @return
*/
@Bean("c-deadLetterQueueC")
public Queue deadLetterQueueC() {
return new Queue(DEAD_LETTER_QUEUEC_NAME);
}
/***
* 延迟队列C与交换机进行绑定
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding delayBindingC(@Qualifier("c-delayQueueC") Queue queue, @Qualifier("c-delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
}
/***
* 死信队列
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding deadLetterBinding(@Qualifier("c-deadLetterQueueC") Queue queue, @Qualifier("c-deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
}
}
队列与交换机与路由及死信队列及交换机的流程图如下,是基于延时队列来实现的。
也是Rabbitmq的配置之一,主要是用来封装邮箱相关的队列交换机及路由。但我们重点看里面的对 RabbitTemplate 这个bean封装
- 重新对RabbitTemplate的封装
- RabbitMQ的消息确认机制,实现ConfirmCallback接口的confim方法,ack为true则投递成功,更新对应的db,false则相反
- RabbitMQ的回退机制, rabbitTemplate.setMandatory(true),ReturnCallback,发送失败的消息则用日志记录。
- 这里实现流程图中第三步,如果MQ-server回消息给MQ-client,server确认后再去更新的DB的状态
package com.codrpwh.rabbitmq.config;
import com.codrpwh.rabbitmq.common.Constant;
import com.codrpwh.rabbitmq.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import javax.annotation.Resource;
/**
* @author coderpwh
*/
@Configuration
@Slf4j
public class RabbitConfig {
@Resource
private CachingConnectionFactory connectionFactory;
@Resource
private MsgLogService msgLogService;
/***
* 登录 队列名称
*/
public static final String LOGIN_LOG_QUEUE_NAME = "login.log.queue";
/***
* 登录 交换机名称
*/
public static final String LOGIN_LOG_EXCHANGE_NAME = "login.log.exchange";
/****
* 登录 路由名称
*/
public static final String LOGIN_LOG_ROUTING_KEY_NAME = "login.log.routing.key";
/***
* 邮箱 队列名称
*/
public static final String MAIL_QUEUE_NAME = "mail.queue";
/***
*
* 邮箱 交换机名称
*
*/
public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
/****
* 邮箱 路由名称
*
*/
public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
/***
* 邮箱队列Bean
* @return
*/
@Bean
public Queue mailQueue() {
return new Queue(MAIL_QUEUE_NAME, true);
}
/***
* 邮箱交换机Bean
* @return
*/
@Bean
public DirectExchange mailExchange() {
return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
}
/***
* 邮箱 Binding 将邮箱队列,交换机都绑定到路由上
* @return
*/
@Bean
public Binding mailBinding() {
return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
}
/**
* 队列
*
* @return
*/
@Bean
public Queue logUserQUeUe() {
return new Queue(LOGIN_LOG_QUEUE_NAME, true);
}
/**
* 交换机
*
* @return
*/
@Bean
public DirectExchange logUserExchanage() {
return new DirectExchange(LOGIN_LOG_EXCHANGE_NAME, true, false);
}
/**
* 将队列通过交换机绑定在路由上
*
* @return
*/
@Bean
public Binding logUserBinding() {
return BindingBuilder.bind(logUserQUeUe()).to(logUserExchanage()).with(LOGIN_LOG_ROUTING_KEY_NAME);
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
/***
* RabbitMQ的消息确认机制
* 生产者-->服务端
* client-->Server
*
*/
rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
if (ack) {
log.info("消息确认机制,消息成功发送对应的交换机中........");
String msgId = correlationData.getId();
/***
* 消息记录,更新db中!
*/
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
} else {
log.error("消息发送到Exchange失败:{},原因:{}", correlationData, cause);
}
}));
// 设置了returnCallback 就强制回退
rabbitTemplate.setMandatory(true);
/***
* RabbitMQ的回退机制
* 用日志记录RabbitMQ的信息
*
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
/***
*
* 配置启用rabbitmq事务
*
* @param cachingConnectionFactory
* @return
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
return new RabbitTransactionManager(cachingConnectionFactory);
}
}
生产方代码,主要是下面的sendMsgs方法,CorrelationData 是 org.springframework.amqp.rabbit.connection下提供的类
有id,Message属性,其中id是不能为空的,必传,这时代码生产一个uuid。将CorrelationData 对象一并发送到队列中去,同时往数据库里面插入记录,
用来记录该条消息。 状态(0投递中 1投递成功 2投递失败 3已消费)为0,如果成功发送到MQ-server则通过上面的id更新转态为1,如果被消费者消费则将消息更新为3。这样用db来实现的消息的幂等性,当然也可以用redis来实现。这里主要实现了上面图中的第一步与第二步,第三步,已经在RabbitMQ配置文件中实现。
package com.codrpwh.rabbitmq.mq.producer;
import com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import com.codrpwh.rabbitmq.mapper.MsgLogMapper;
import com.codrpwh.rabbitmq.pojo.MsgLog;
import com.codrpwh.rabbitmq.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Component
@Slf4j
public class DelayMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource
private MsgLogMapper msgLogMapper;
public void sendMsg(String msg, Integer type) {
switch (type) {
case 10:
log.info("已进入10");
rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEA_ROUTING_KEY, msg);
break;
case 60:
log.info("已进入60");
rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEB_ROUTING_KEY, msg);
break;
}
}
public void sendMsgs(String msg, Integer delayTime) {
log.info("进入消息生产者......... msg:{},delayTime:{}", msg, delayTime);
String msgId = RandomUtil.UUID32();
CorrelationData correlationData = new CorrelationData(msgId);
/***
* 生产消息入库
*/
MsgLog msgLog = new MsgLog(msgId, msg, RabbitMQDelayCConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayCConfig.DELAY_QUEUEC_ROUTING_KEY);
msgLogMapper.insert(msgLog);
rabbitTemplate.convertAndSend(RabbitMQDelayCConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayCConfig.DELAY_QUEUEC_ROUTING_KEY, msg, a -> {
a.getMessageProperties().setExpiration(String.valueOf(delayTime));
return a;
}, correlationData);
}
}
MQ消息发送下半场,最上面图中的4-6
- 服务端MQ-server将消息发给接收端MQ-client;
- 接收端MQ-client回ACK给服务端;
- 服务端MQ-server将落地消息删除;
接收端MQ-client回ACK给服务端MQ-server,是消息消费业务方的主动调用行为,不能由MQ-client自动发起,
因为MQ系统不知道消费方什么时候真正消费成功。
如果5丢失,服务端MQ-server超时后会重发消息,可能导致MQ-client收到重复的消息。
上面是生产方的代码,下面来介绍消费方的代码,消费方的代码稍微复杂点,用代理类来实现的,也是为了后面代码的扩展性
消费代码的入口,receiveC方法,这里是用了BaseConsumerProxy代理类来对RabbtiMQ的消费逻辑代码进行了封装
msgLogService 是对消息的逻辑处理,DeadLetterConsumer 是对消费者对象处理对应业务逻辑
package com.codrpwh.rabbitmq.mq.consumer;
import com.alibaba.druid.support.json.JSONUtils;
import com.codrpwh.rabbitmq.config.RabbitConfig;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import com.codrpwh.rabbitmq.mq.BaseConsumer;
import com.codrpwh.rabbitmq.mq.BaseConsumerProxy;
import com.codrpwh.rabbitmq.service.MsgLogService;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Date;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEA_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEB_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig.DEAD_LETTER_QUEUEC_NAME;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@Resource
private MsgLogService msgLogService;
@Resource
private DeadLetterConsumer deadLetterConsumer;
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列A收到的消息为:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列B收到的消息为:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/* @RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},死信队列C收到消息为:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}*/
@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
log.info("已经进入C队列........ 参数:message为:{},channel:{}", message, channel);
BaseConsumerProxy baseConsumerProxy = new BaseConsumerProxy(deadLetterConsumer, msgLogService);
BaseConsumer proxy = (BaseConsumer) baseConsumerProxy.getProxy();
if (proxy != null) {
log.info("生产者开始发送消息了....... 参数:message为:{},channel:{}", message, channel);
proxy.consume(message, channel);
}
}
}
BaseConsumerProxy 代理实现类具体代码,代理类中有 2个私有方法一个是获取getCorrelationId,另一个则是更具correlationId来判断消息是否已经
消费掉,在 getProxy()代理方法中,处理对消费消息进行处理,消费者消费该消息则更新db的状态为3(已消费),同时进行手动确认
channel.basicAck(tag, false);这里完成的最上面的流程图中5,6步
package com.codrpwh.rabbitmq.mq;
import com.codrpwh.rabbitmq.common.Constant;
import com.codrpwh.rabbitmq.pojo.MsgLog;
import com.codrpwh.rabbitmq.service.MsgLogService;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import java.lang.reflect.Proxy;
import java.util.Map;
@Slf4j
public class BaseConsumerProxy {
private Object target;
private MsgLogService msgLogService;
public BaseConsumerProxy(Object target, MsgLogService msgLogService) {
this.target = target;
this.msgLogService = msgLogService;
}
public Object getProxy() {
ClassLoader classLoader = target.getClass().getClassLoader();
Class[] interfaces = target.getClass().getInterfaces();
Object proxy = Proxy.newProxyInstance(classLoader, interfaces, (proxy1, method, args) -> {
Message message = (Message) args[0];
Channel channel = (Channel) args[1];
/***
* 获取 correlationId
*/
String correlationId = getCorrelationId(message);
/***
* 判断消息是否已经被消费
*/
if (isConsumed(correlationId)) {
log.info("消息已经被消费,correlationId:{}", correlationId);
return null;
}
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
try {
Object result = method.invoke(target, args);
/****
* 更新db
*/
msgLogService.updateStatus(correlationId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
/****
* 手动进行消息的确认
*/
channel.basicAck(tag, false);
return result;
} catch (Exception e) {
log.error("getProxy error", e);
channel.basicNack(tag, false, true);
return null;
}
});
return proxy;
}
/***
* 获取CorrelationId
* @param message
* @return
*/
public String getCorrelationId(Message message) {
String correlationId = null;
log.info("消费方开始接受消息...message:{}", message);
MessageProperties properties = message.getMessageProperties();
Map<String, Object> headers = properties.getHeaders();
log.info("消费方开始接受消息..... header:{}", headers);
for (Map.Entry entry : headers.entrySet()) {
System.out.println("key:" + entry.getKey() + " ,value:" + entry.getValue());
String key = (String) entry.getKey();
Object value = entry.getValue();
if (key.equals("spring_returned_message_correlation")) {
log.info("value:{}" + value);
correlationId = (String) value;
}
}
return correlationId;
}
/***
* 消息是否已经被消费
* @param correlationId
* @return
*/
private boolean isConsumed(String correlationId) {
MsgLog msgLog = msgLogService.selectByMsgId(correlationId);
if (msgLog == null || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {
return true;
}
return false;
}
}

运行结果图如下:
已经基本上实现RabbitMQ的消息的幂等性,在MQ的上半场,通过消息确认机制,确认消息是否已发送并插入数据库
在下半场获取唯一的correlationId,并更新数据库状态。整个RabbitMQ的消息幂等性由MQ本身和业务来实现的。更新db,可以换成redis或者其他方式.
代码的实现已上传到github