分布式环境下动态管理RabbitMq队列及常见问题

一、应用场景:

  • 根据实际业务动态创建队列发送消息并消费,队列名称无法提前确定
  • 业务处理完成后删除对应队列,避免资源占用
  • 分布式部署时,需要自动拉起其它节点的消费队列进行业务处理

二、功能设计及实现:

(1)设计流程
开始创建队列时首先判断队列是否存在,后续将通知发送至Notice队列通知其它节点此队列已创建开启
监听,消费完毕后删除队列,并发送队列删除信息至其它节点,停止监听。对于整个生产、监听过程中
产生的失败消息通过死信队列进行处理,后续对失败消息进行再次处理或入库记录

在这里插入图片描述
具体项目代码目录结构可按如下结构进行设计,描述信息如下:

  • config:配置RabbitMq相关配置信息(预取数量、自动重连时间等等)
  • controller:接口层(提供对外API接口)
  • model:模型类(业务定义模型类)
  • sender:mq发送消息业务(发送消息)
  • handler:mq消费处理消息业务
  • service:业务类

(2)核心方法
RabbitMqConfig完成消息队列相关信息配置。其主要配置项如下:
(1)设置预取数量、自动重连时间间隔

@Configuration
public class RabbitMqConfig {
    @Bean
    @Primary
    public SimpleRabbitListenerContainerFactory proxyRabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPrefetchCount(5); // 设置预取数量
        factory.setRecoveryInterval(5000L); // 自动重连间隔时间
        return factory;
    }
}

(2)设置队列与消费处理类绑定

@Bean
public SimpleMessageListenerContainer userMessageListenerContainer(ConnectionFactory connectionFactory,
                                                                   MessageListenerAdapter userListenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(USER_QUEUE_NAME); // 设置要监听的队列名称
    container.setMessageListener(userListenerAdapter);
    container.setPrefetchCount(5);
    container.setRecoveryInterval(5000L);
    return container;
}

@Bean
public MessageListenerAdapter userListenerAdapter(UserHandlerWithMessage handler) {
    return new MessageListenerAdapter(handler, "handleMessage");
}

(3)配置死信队列

// 定义死信交换机
@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 定义死信队列
@Bean
public Queue deadLetterQueue() {
    return new Queue(DEAD_LETTER_QUEUE);
}
// 绑定死信队列到死信交换机
@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);
}
// 配置普通队列,绑定到死信交换机
@Bean
public Queue userQueue() {
    return QueueBuilder.durable(USER_QUEUE_NAME)
            .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) // 设置死信交换机
            .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY) // 设置死信路由键
            .build();
}

(4)死信队列消息处理,此处不详细给出错误消息的代码逻辑,可根据业务需求进行处理,如:存入关系数据库进行记录,或创建消费者对队列信息进行消费都可

(5)配置并启动通知队列,确保分布式环境下所有节点都能进行消息消费,在每次队列创建或删除时通过Notice主题广播消息,通知其它节点开始消费或停止消费

public void broadcastQueue(String queueName, String routingKey, boolean isCreated) {
        String message = queueName + ":" + routingKey + ":" + (isCreated ? CommonConstants.PROXY_CREATED : CommonConstants.PROXY_DELETED);
        rabbitTemplate.convertAndSend(broadcastExchange.getName(), routingKey, message);
        LogUtils.logInfo("Broadcasted new queue creation: " + message);
 }

(6)队列管控业务类,此类主要提供队列的创建、删除、开启监听等操作,这里的RabbitAdmin、SimpleRabbitListenerContainerFactory等参数可根据业务需求自定义配置,使用时需要设置为自定义的参数,如:proxyRabbitListenerContainerFactory

public DynamicQueueService(ConnectionFactory connectionFactory,
                               @Qualifier("proxyRabbitAdmin") RabbitAdmin rabbitAdmin,
                               @Qualifier("proxyRabbitListenerContainerFactory") SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                               RabbitTemplate rabbitTemplate,
                               TopicExchange exchange,
                               FanoutExchange broadcastExchange) {
        this.connectionFactory = connectionFactory;
        this.rabbitAdmin = rabbitAdmin;
        this.exchange = exchange;
        this.broadcastExchange = broadcastExchange;
        this.rabbitListenerContainerFactory = rabbitListenerContainerFactory;
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * 判断队列是否存在
     * @param queueName 队列名称
     * @return
     */
    public boolean isQueueExists(String queueName) {
        return Boolean.TRUE.equals(rabbitTemplate.execute(channel -> {
            try {
                channel.queueDeclarePassive(queueName);
                return true;
            } catch (Exception e) {
                return false;
            }
        }));
    }

    /**
     * 创建队列并开启监听
     * @param queueName 队列名称
     * @param routingKey 路由键
     * @param handlerBeanName 消费beanName
     */
    public void createQueueAndStartListener(String queueName, String routingKey, String handlerBeanName) {
        // 创建持久化队列,如果队列已经存在则不会重复创建
        Queue queue = new Queue(queueName, true);
        if (!isQueueExists(queueName)) {
            rabbitAdmin.declareQueue(queue);
            log.info("Declared queue: " + queueName);

            // 绑定队列到交换器并指定路由键
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
            rabbitAdmin.declareBinding(binding);
            log.info("Bound queue: " + queueName + " to exchange: " + exchange.getName() + " with routing key: " + routingKey);
        } else {
            log.info("Queue " + queueName + " already exists.");
        }
        // 若队列名称不是接收广播队列,则启动监听
        if (!queueName.equals(CommonConstants.BROADCAST_EXCHANGE)) {
            // 启动监听容器
            startListener(queueName, handlerBeanName);
        }
    }

    /**
     * 开启监听
     * @param queueName 队列名称
     * @param handlerBeanName 消费处理bean名称
     */
    public void startListener(String queueName, String handlerBeanName) {
        // 从 Spring 容器中获取 MessageHandler 实例
        Object messageHandler = applicationContext.getBean(handlerBeanName);
        // 创建消息监听容器
        SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(new MessageListenerAdapter(messageHandler));

        // 设置其他参数(例如并发消费者数量,预取数量等)
        container.setConcurrentConsumers(concurrency);
        container.setPrefetchCount(5);
        container.setRecoveryInterval(5000L); // 自动重连间隔时间

        // 启动监听容器
        try {
            container.start();
            log.info("Started listener container for queue: " + queueName);
            // 检查容器是否正在运行
            if (container.isRunning()) {
                log.info("Listener container is running.");
            } else {
                log.warn("Listener container did not start properly.");
            }
        } catch (Exception e) {
            log.error("Failed to start listener container for queue: " + queueName, e);
            // 处理启动失败的情况
        }
        log.info("Started listener container for queue: " + queueName);

        // 存储监听容器,以便后续管理
        listenerContainers.put(queueName, container);
    }

    /**
     * 删除队列停止监听
     * @param queueName 队列名称
     */
    public void deleteQueueAndStopListener(String queueName) {
        // 停止监听容器
        SimpleMessageListenerContainer container = listenerContainers.get(queueName);
        if (container != null) {
            container.stop();
            listenerContainers.remove(queueName);
        }

        // 删除队列
        rabbitAdmin.deleteQueue(queueName);
    }

    /**
     * 广播消息
     * @param queueName 队列名
     * @param routingKey 路由键
     * @param isCreated 创建/删除 (true/false)
     */
    public void broadcastQueue(String queueName, String routingKey, boolean isCreated) {
        String message = queueName + ":" + routingKey + ":" + (isCreated ? CommonConstants.QUEUE_CREATE : CommonConstants.QUEUE_DELETE);
        rabbitTemplate.convertAndSend(broadcastExchange.getName(), routingKey, message);
        log.info("Broadcasted new queue creation: " + message);
    }

注意事项
上文所介绍的消费处理业务与队列绑定是通过在配置类中进行实现,针对当前队列设置单独container,
给container设置MessageListenerAdapter进行消费处理类绑定,还有另一种直接在队列创建启动时通
过借助spring的上下文进行实现,底层逻辑相同。
使用applicationContext,在启动队列监听时将具体处理消息的bean名称传入,通过spring容器获取实
例,调用SimpleMessageListenerContainer的setMessageListener方法来配置消息处理类

public void startListener(String queueName, String handlerBeanName) {
        // 从 Spring 容器中获取 MessageHandler 实例
        Object messageHandler = applicationContext.getBean(handlerBeanName);
        // 创建消息监听容器
        SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(new MessageListenerAdapter(messageHandler));

        // 设置其他参数(例如并发消费者数量,预取数量等)
        container.setConcurrentConsumers(concurrency);
        container.setPrefetchCount(5);
        container.setRecoveryInterval(5000L); // 自动重连间隔时间

        // 启动监听容器
        try {
            container.start();
            log.info("Started listener container for queue: " + queueName);
            // 检查容器是否正在运行
            if (container.isRunning()) {
                log.info("Listener container is running.");
            } else {
                log.warn("Listener container did not start properly.");
            }
        } catch (Exception e) {
            log.error("Failed to start listener container for queue: " + queueName, e);
            // 处理启动失败的情况
        }
        log.info("Started listener container for queue: " + queueName);
    }

三、常见问题及解决方式

(1)如何保证消息不丢失
问题描述
从生产者到 mq,再从 mq 到消费者,都有可能因为网络,服务宕机等原因丢失消息。
解决方案
发布确认:保证生产者到交换机,交换机到队列。这里可以使用消息入库方案,将要发送的消息保存
到数据库中。
发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示消息发送了但还没收到确
认;收到确认后将status设为1,表示RabbitMQ已收到消息。使用定时检索消息表,将status=0并且超
过固定时间后(可能消息刚发出去还没来得及确认,这边定时器刚好检索到这条status=0的消息,所以
要设置时间)还没收到确认的消息取出重发 (消息重发可能会造成幂等性问题,这里消费端要做幂等性
处理),可能重发还会失败,所以还要添加一个最大重发次数字段retry_count,超过就做另外的处理。
持久化:交换机、队列、消息持久化,确保mq上消息不会因为宕机丢失。
消息确认:ack模式,每条消息都要确认,保证消费端不丢失。
(2)保证消息幂等性
问题描述
MQ消息的幂等性问题是指在消息传输和处理过程中,确保对同一消息的重复处理不会导致不同的
结果或副作用。幂等性在分布式系统中非常重要,因为消息传递过程中可能会发生重复发送或处理
的情况。
产生原因
消息重复发送:由于网络故障或重试机制,生产者可能会发送相同的消息多次。
消息重复消费:消费者在处理消息时,如果在处理过程中发生故障,消息可能会被重新消费。
网络分区:在网络分区恢复后,可能会有重复的消息传递。
解决方案
使用唯一标识符(ID):给每条消息分配一个唯一的标识符(ID),并在消费时记录已处理过的
ID。对于重复的ID,直接跳过处理。
使用去重表:使用数据库的去重表(如MySQL的唯一索引)来记录已处理的消息ID,在插入消息ID
时,如果已经存在则忽略。
使用乐观锁:在更新数据时,确保数据未被修改过。如果数据被修改过,则重试或跳过处理。
使用分布式锁:使用分布式锁(如Redis的分布式锁)来保证同一时刻只有一个消费者在处理相同的
消息。
(3)保证消息有序性(部分业务需求)
使用单一消费者:在队列上只配置一个消费者,这样可以保证消息按顺序被消费
使用事务:使用事务来确保消息的顺序消费。确保每个消息处理操作在一个事务中完成,只有在事
务成功提交后,消息才被确认。
消息分区:将消息分区,每个分区由一个消费者处理,从而在每个分区内部保持顺序,但允许并行
处理不同分区的消息。

public void sendMessage(String message, String partitionKey) {
    String routingKey = getRoutingKey(partitionKey);
    rabbitTemplate.convertAndSend("partitionExchange", routingKey, message);
}

private String getRoutingKey(String partitionKey) {
    // 根据分区键计算路由键
    return "partition" + (partitionKey.hashCode() % 2 + 1);
}

注意:对于rabbitmq实现消息有序性往往需要使用事务、单一消费者,会丧失很多消费性能,建议如果
业务要求消息必须有序可以使用kafka来进行替换

详细demo示例路径:
https://github.com/Itsuming/spring-rabbitmq-demo

相关推荐

  1. RabbitMQ 常见问题

    2024-07-20 15:48:01       67 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-20 15:48:01       172 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-20 15:48:01       190 阅读
  3. 在Django里面运行非项目文件

    2024-07-20 15:48:01       158 阅读
  4. Python语言-面向对象

    2024-07-20 15:48:01       171 阅读

热门阅读

  1. 素数极差

    2024-07-20 15:48:01       30 阅读
  2. 数据结构——栈

    2024-07-20 15:48:01       30 阅读
  3. 量化交易对短期收益的提升效果

    2024-07-20 15:48:01       29 阅读
  4. ArcGIS Pro SDK (九)几何 9 立方贝塞尔线段

    2024-07-20 15:48:01       34 阅读
  5. glibc: getifaddrs_internal 占用大量cpu

    2024-07-20 15:48:01       32 阅读
  6. 【关于使用swoole的知识点整理】

    2024-07-20 15:48:01       33 阅读
  7. 弹框管理类demo

    2024-07-20 15:48:01       32 阅读
  8. 单机 Redission 存在的问题以及怎么解决

    2024-07-20 15:48:01       33 阅读
  9. 力扣(LeetCode)——70. 爬楼梯

    2024-07-20 15:48:01       31 阅读