RabbitMQ幂等性


在微服务中,一般会用到不同的消息中间件。但在使用这些中间件的同时,有的需要结合业场景去实现,有的则需要靠MQ本身去实现。
MQ是如何保证消息的幂等性?MQ是如何保证消息不被重复消费?MQ又是如何保证消息可靠投递?

本文参考了沈大师分享的
MQ,究竟如何保证消息幂等,我们就在该基础上用SpirngBoot+RabbitMQ来实现消息的幂等性。

MQ的幂等性,由两部分构成:

1.MQ发送端,到MQ-server的幂等性(上半场)

2.MQ-server,到接收端的幂等性(下半场)

我们先来看看MQ发送消息到上半场,上图中的1-3部分

  1. MQ-client将消息发送到MQ-server
  2. MQ-server将详细落地,写入db等
  3. MQ-server回消息给MQ-client

如果3丢失,发送端MQ-client超时后会重发消息,可能导致服务端MQ-server收到重复消息

此时重发是MQ-client发起的,消息的处理是MQ-server,为了避免步骤2落地重复的消息,对每条消息,MQ系统内部必须生成一个inner-msg-id,作为去重和幂等的依据,这个内部消息ID的特性是:

  1. 全局唯一
  2. MQ生成,具备业务无关性,对消息发送方和消息接收方屏蔽

有了这个inner-msg-id,就能保证上半场重发,也只有1条消息落到MQ-server的DB中,实现上半场幂等。

实现代码

SpringBoot的配置文件

server.port=8088
spring.resources.static-locations=classpath:/templates/,classpath:/static/
spring.mvc.view.suffix=.html

# redis
spring.redis.host= 
spring.redis.port= 
spring.redis.password= 
spring.redis.jedis.pool.max-idle=32
spring.redis.jedis.pool.max-wait=-1
spring.redis.jedis.pool.min-idle=0
spring.redis.timeout=0

# mysql
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.url=jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&autoR&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# show sql in console
logging.level.com.wangzaiplus.test.mapper=debug

# rabbitmq
spring.rabbitmq.host=192.168.36.2
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
# ����confirms�ص� P -> Exchange
spring.rabbitmq.publisher-confirms=true
# ����returnedMessage�ص� Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# �����ֶ�ȷ��(ack) Queue -> C
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#spring.rabbitmq.listener.simple.prefetch=100


# mail
spring.mail.host=  
spring.mail.username=
spring.mail.password= 
spring.mail.from=c 
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true

ok.http.connect-timeout=30
ok.http.read-timeout=30
ok.http.write-timeout=30
# ���ӳ�������Ŀ������ӵ��������
ok.http.max-idle-connections=200
# ���ӿ���ʱ�����Ϊ 300 ��
ok.http.keep-alive-duration=300

下面2个是分别开启的RabbitMQ的消息确认和回退

  • spring.rabbitmq.publisher-returns=true
  • spring.rabbitmq.publisher-confirms=true

RabbitMQ关于队列,交换机,路由及死信队列相关的配置。

package com.codrpwh.rabbitmq.config;

import com.sun.tools.internal.xjc.reader.xmlschema.BindGreen;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Handler;

/**
 * @author coderpwh
 */
@Configuration
public class RabbitMQDelayCConfig {


    public static final String DELAY_EXCHANGE_NAME = "cc-delay.exchange";

    public static final String DELAY_QUEUEC_NAME = "cc-delay.queue";

    public static final String DELAY_QUEUEC_ROUTING_KEY = "cc-delay.routingkey";


    /***
     *  死信队列,交换机,路由
     */
    public static final String DEAD_LETTER_EXCHANGE = "cc-letter.exchange";

    public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "cc-letter.routingkey";

    public static final String DEAD_LETTER_QUEUEC_NAME = "cc-letter.queue";


    /***
     *  延迟交换机
     * @return
     */

    @Bean("c-delayExchange")
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }


    /***
     *  死信交换机
     * @return
     */

    @Bean("c-deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }


    /***
     * 延迟队列
     *
     * @return
     */

    @Bean("c-delayQueueC")
    public Queue delayQueueC() {
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
        return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
    }


    /***
     *  死信队列
     * @return
     */

    @Bean("c-deadLetterQueueC")
    public Queue deadLetterQueueC() {
        return new Queue(DEAD_LETTER_QUEUEC_NAME);
    }


    /***
     *  延迟队列C与交换机进行绑定
     * @param queue
     * @param exchange
     * @return
     */

    @Bean
    public Binding delayBindingC(@Qualifier("c-delayQueueC") Queue queue, @Qualifier("c-delayExchange") DirectExchange exchange) {

        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
    }

    /***
     *   死信队列
     * @param queue
     * @param exchange
     * @return
     */

    @Bean
    public Binding deadLetterBinding(@Qualifier("c-deadLetterQueueC") Queue queue, @Qualifier("c-deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
    }


}

队列与交换机与路由及死信队列及交换机的流程图如下,是基于延时队列来实现的。

也是Rabbitmq的配置之一,主要是用来封装邮箱相关的队列交换机及路由。但我们重点看里面的对 RabbitTemplate 这个bean封装

  • 重新对RabbitTemplate的封装
  • RabbitMQ的消息确认机制,实现ConfirmCallback接口的confim方法,ack为true则投递成功,更新对应的db,false则相反
  • RabbitMQ的回退机制, rabbitTemplate.setMandatory(true),ReturnCallback,发送失败的消息则用日志记录。
  • 这里实现流程图中第三步,如果MQ-server回消息给MQ-client,server确认后再去更新的DB的状态
package com.codrpwh.rabbitmq.config;

import com.codrpwh.rabbitmq.common.Constant;
import com.codrpwh.rabbitmq.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import javax.annotation.Resource;

/**
 * @author coderpwh
 */
@Configuration
@Slf4j
public class RabbitConfig {


    @Resource
    private CachingConnectionFactory connectionFactory;


    @Resource
    private MsgLogService msgLogService;


    /***
     *  登录 队列名称
     */
    public static final String LOGIN_LOG_QUEUE_NAME = "login.log.queue";

    /***
     *    登录 交换机名称
     */

    public static final String LOGIN_LOG_EXCHANGE_NAME = "login.log.exchange";

    /****
     *   登录 路由名称
     */

    public static final String LOGIN_LOG_ROUTING_KEY_NAME = "login.log.routing.key";


    /***
     *  邮箱 队列名称
     */
    public static final String MAIL_QUEUE_NAME = "mail.queue";


    /***
     *
     *   邮箱 交换机名称
     *
     */
    public static final String MAIL_EXCHANGE_NAME = "mail.exchange";

    /****
     *   邮箱 路由名称
     *
     */
    public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";


    /***
     *  邮箱队列Bean
     * @return
     */
    @Bean
    public Queue mailQueue() {
        return new Queue(MAIL_QUEUE_NAME, true);
    }

    /***
     *   邮箱交换机Bean
     * @return
     */
    @Bean
    public DirectExchange mailExchange() {
        return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
    }


    /***
     *  邮箱 Binding 将邮箱队列,交换机都绑定到路由上
     * @return
     */
    @Bean
    public Binding mailBinding() {
        return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
    }


    /**
     * 队列
     *
     * @return
     */
    @Bean
    public Queue logUserQUeUe() {
        return new Queue(LOGIN_LOG_QUEUE_NAME, true);
    }


    /**
     * 交换机
     *
     * @return
     */
    @Bean
    public DirectExchange logUserExchanage() {
        return new DirectExchange(LOGIN_LOG_EXCHANGE_NAME, true, false);
    }

    /**
     * 将队列通过交换机绑定在路由上
     *
     * @return
     */
    @Bean
    public Binding logUserBinding() {
        return BindingBuilder.bind(logUserQUeUe()).to(logUserExchanage()).with(LOGIN_LOG_ROUTING_KEY_NAME);
    }

    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate() {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());


        /***
         *   RabbitMQ的消息确认机制
         *   生产者-->服务端
         *   client-->Server
         *
         */
        rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {

            if (ack) {
                log.info("消息确认机制,消息成功发送对应的交换机中........");
                String msgId = correlationData.getId();
                /***
                 *  消息记录,更新db中!
                 */
                msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
            } else {
                log.error("消息发送到Exchange失败:{},原因:{}", correlationData, cause);
            }

        }));


        // 设置了returnCallback 就强制回退
        rabbitTemplate.setMandatory(true);

        /***
         *   RabbitMQ的回退机制
         *    用日志记录RabbitMQ的信息
         *
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });
        return rabbitTemplate;

    }


    /***
     *
     *   配置启用rabbitmq事务
     *
     * @param cachingConnectionFactory
     * @return
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
        return new RabbitTransactionManager(cachingConnectionFactory);
    }


}

生产方代码,主要是下面的sendMsgs方法,CorrelationData 是 org.springframework.amqp.rabbit.connection下提供的类
id,Message属性,其中id是不能为空的,必传,这时代码生产一个uuid。将CorrelationData 对象一并发送到队列中去,同时往数据库里面插入记录,
用来记录该条消息。 状态(0投递中 1投递成功 2投递失败 3已消费)0,如果成功发送到MQ-server则通过上面的id更新转态为1,如果被消费者消费则将消息更新为3。这样用db来实现的消息的幂等性,当然也可以用redis来实现。这里主要实现了上面图中的第一步与第二步,第三步,已经在RabbitMQ配置文件中实现。

package com.codrpwh.rabbitmq.mq.producer;

import com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import com.codrpwh.rabbitmq.mapper.MsgLogMapper;
import com.codrpwh.rabbitmq.pojo.MsgLog;
import com.codrpwh.rabbitmq.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

@Component
@Slf4j
public class DelayMessageSender {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Resource
    private MsgLogMapper msgLogMapper;

    public void sendMsg(String msg, Integer type) {

        switch (type) {
            case 10:
                log.info("已进入10");
                rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEA_ROUTING_KEY, msg);
                break;
            case 60:
                log.info("已进入60");
                rabbitTemplate.convertAndSend(RabbitMQDelayConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayConfig.DELAY_QUEUEB_ROUTING_KEY, msg);
                break;
        }

    }


    public void sendMsgs(String msg, Integer delayTime) {

        log.info("进入消息生产者......... msg:{},delayTime:{}", msg, delayTime);
        String msgId = RandomUtil.UUID32();
        CorrelationData correlationData = new CorrelationData(msgId);

        /***
         *  生产消息入库
         */
        MsgLog msgLog = new MsgLog(msgId, msg, RabbitMQDelayCConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayCConfig.DELAY_QUEUEC_ROUTING_KEY);
        msgLogMapper.insert(msgLog);


        rabbitTemplate.convertAndSend(RabbitMQDelayCConfig.DELAY_EXCHANGE_NAME, RabbitMQDelayCConfig.DELAY_QUEUEC_ROUTING_KEY, msg, a -> {
            a.getMessageProperties().setExpiration(String.valueOf(delayTime));
            return a;
        }, correlationData);

    }


}

MQ消息发送下半场,最上面图中的4-6

  1. 服务端MQ-server将消息发给接收端MQ-client
  2. 接收端MQ-client回ACK给服务端
  3. 服务端MQ-server将落地消息删除

接收端MQ-client回ACK给服务端MQ-server,是消息消费业务方的主动调用行为,不能由MQ-client自动发起,
因为MQ系统不知道消费方什么时候真正消费成功。
如果5丢失,服务端MQ-server超时后会重发消息,可能导致MQ-client收到重复的消息。

上面是生产方的代码,下面来介绍消费方的代码,消费方的代码稍微复杂点,用代理类来实现的,也是为了后面代码的扩展性
消费代码的入口,receiveC方法,这里是用了BaseConsumerProxy代理类来对RabbtiMQ的消费逻辑代码进行了封装
msgLogService 是对消息的逻辑处理,DeadLetterConsumer 是对消费者对象处理对应业务逻辑

package com.codrpwh.rabbitmq.mq.consumer;

import com.alibaba.druid.support.json.JSONUtils;
import com.codrpwh.rabbitmq.config.RabbitConfig;
import com.codrpwh.rabbitmq.config.RabbitMQDelayConfig;
import com.codrpwh.rabbitmq.mq.BaseConsumer;
import com.codrpwh.rabbitmq.mq.BaseConsumerProxy;
import com.codrpwh.rabbitmq.service.MsgLogService;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.Date;

import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEA_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayConfig.DEAD_LETTER_QUEUEB_NAME;
import static com.codrpwh.rabbitmq.config.RabbitMQDelayCConfig.DEAD_LETTER_QUEUEC_NAME;

@Slf4j
@Component
public class DeadLetterQueueConsumer {


    @Resource
    private MsgLogService msgLogService;


    @Resource
    private DeadLetterConsumer deadLetterConsumer;

    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("当前时间:{},死信队列A收到的消息为:{}", new Date().toString(), msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }


    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("当前时间:{},死信队列B收到的消息为:{}", new Date().toString(), msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

  /*  @RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
    public void receiveC(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());

        log.info("当前时间:{},死信队列C收到消息为:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }*/

    @RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
    public void receiveC(Message message, Channel channel) throws IOException {

        log.info("已经进入C队列........ 参数:message为:{},channel:{}", message, channel);
        BaseConsumerProxy baseConsumerProxy = new BaseConsumerProxy(deadLetterConsumer, msgLogService);
        BaseConsumer proxy = (BaseConsumer) baseConsumerProxy.getProxy();

        if (proxy != null) {
            log.info("生产者开始发送消息了....... 参数:message为:{},channel:{}", message, channel);
            proxy.consume(message, channel);
        }
    }


}

BaseConsumerProxy 代理实现类具体代码,代理类中有 2个私有方法一个是获取getCorrelationId,另一个则是更具correlationId来判断消息是否已经
消费掉,在 getProxy()代理方法中,处理对消费消息进行处理,消费者消费该消息则更新db的状态为3(已消费),同时进行手动确认
channel.basicAck(tag, false);这里完成的最上面的流程图中5,6步

package com.codrpwh.rabbitmq.mq;

import com.codrpwh.rabbitmq.common.Constant;
import com.codrpwh.rabbitmq.pojo.MsgLog;
import com.codrpwh.rabbitmq.service.MsgLogService;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;

import java.lang.reflect.Proxy;
import java.util.Map;

@Slf4j
public class BaseConsumerProxy {

    private Object target;

    private MsgLogService msgLogService;

    public BaseConsumerProxy(Object target, MsgLogService msgLogService) {
        this.target = target;
        this.msgLogService = msgLogService;
    }

    public Object getProxy() {

        ClassLoader classLoader = target.getClass().getClassLoader();

        Class[] interfaces = target.getClass().getInterfaces();


        Object proxy = Proxy.newProxyInstance(classLoader, interfaces, (proxy1, method, args) -> {

            Message message = (Message) args[0];

            Channel channel = (Channel) args[1];


            /***
             *  获取 correlationId
             */
            String correlationId = getCorrelationId(message);


            /***
             *  判断消息是否已经被消费
             */
            if (isConsumed(correlationId)) {
                log.info("消息已经被消费,correlationId:{}", correlationId);
                return null;
            }

            MessageProperties properties = message.getMessageProperties();

            long tag = properties.getDeliveryTag();

            try {
                Object result = method.invoke(target, args);
                /****
                 *  更新db
                 */
                msgLogService.updateStatus(correlationId, Constant.MsgLogStatus.CONSUMED_SUCCESS);

                /****
                 *  手动进行消息的确认
                 */
                channel.basicAck(tag, false);
                return result;

            } catch (Exception e) {
                log.error("getProxy error", e);
                channel.basicNack(tag, false, true);
                return null;
            }


        });

        return proxy;


    }


    /***
     *   获取CorrelationId
     * @param message
     * @return
     */
    public String getCorrelationId(Message message) {
        String correlationId = null;

        log.info("消费方开始接受消息...message:{}", message);
        MessageProperties properties = message.getMessageProperties();

        Map<String, Object> headers = properties.getHeaders();
        log.info("消费方开始接受消息..... header:{}", headers);

        for (Map.Entry entry : headers.entrySet()) {
            System.out.println("key:" + entry.getKey() + "  ,value:" + entry.getValue());
            String key = (String) entry.getKey();
            Object value = entry.getValue();
            if (key.equals("spring_returned_message_correlation")) {
                log.info("value:{}" + value);
                correlationId = (String) value;
            }
        }

        return correlationId;

    }


    /***
     *  消息是否已经被消费
     * @param correlationId
     * @return
     */
    private boolean isConsumed(String correlationId) {
        MsgLog msgLog = msgLogService.selectByMsgId(correlationId);

        if (msgLog == null || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {
            return true;
        }
        return false;
    }


}

运行结果图如下:

已经基本上实现RabbitMQ的消息的幂等性,在MQ的上半场,通过消息确认机制,确认消息是否已发送并插入数据库
在下半场获取唯一的correlationId,并更新数据库状态。整个RabbitMQ的消息幂等性由MQ本身和业务来实现的。更新db,可以换成redis或者其他方式.

代码的实现已上传到github


文章作者: coderpwh
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 coderpwh !
  目录