一、应用场景:
- 根据实际业务动态创建队列发送消息并消费,队列名称无法提前确定
- 业务处理完成后删除对应队列,避免资源占用
- 分布式部署时,需要自动拉起其它节点的消费队列进行业务处理
二、功能设计及实现:
(1)设计流程
开始创建队列时首先判断队列是否存在,后续将通知发送至Notice队列通知其它节点此队列已创建开启
监听,消费完毕后删除队列,并发送队列删除信息至其它节点,停止监听。对于整个生产、监听过程中
产生的失败消息通过死信队列进行处理,后续对失败消息进行再次处理或入库记录
具体项目代码目录结构可按如下结构进行设计,描述信息如下:
- config:配置RabbitMq相关配置信息(预取数量、自动重连时间等等)
- controller:接口层(提供对外API接口)
- model:模型类(业务定义模型类)
- sender:mq发送消息业务(发送消息)
- handler:mq消费处理消息业务
- service:业务类
(2)核心方法
RabbitMqConfig完成消息队列相关信息配置。其主要配置项如下:
(1)设置预取数量、自动重连时间间隔
@Configuration
public class RabbitMqConfig {
@Bean
@Primary
public SimpleRabbitListenerContainerFactory proxyRabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setPrefetchCount(5); // 设置预取数量
factory.setRecoveryInterval(5000L); // 自动重连间隔时间
return factory;
}
}
(2)设置队列与消费处理类绑定
@Bean
public SimpleMessageListenerContainer userMessageListenerContainer(ConnectionFactory connectionFactory,
MessageListenerAdapter userListenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(USER_QUEUE_NAME); // 设置要监听的队列名称
container.setMessageListener(userListenerAdapter);
container.setPrefetchCount(5);
container.setRecoveryInterval(5000L);
return container;
}
@Bean
public MessageListenerAdapter userListenerAdapter(UserHandlerWithMessage handler) {
return new MessageListenerAdapter(handler, "handleMessage");
}
(3)配置死信队列
// 定义死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 定义死信队列
@Bean
public Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_QUEUE);
}
// 绑定死信队列到死信交换机
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);
}
// 配置普通队列,绑定到死信交换机
@Bean
public Queue userQueue() {
return QueueBuilder.durable(USER_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) // 设置死信交换机
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY) // 设置死信路由键
.build();
}
(4)死信队列消息处理,此处不详细给出错误消息的代码逻辑,可根据业务需求进行处理,如:存入关系数据库进行记录,或创建消费者对队列信息进行消费都可
(5)配置并启动通知队列,确保分布式环境下所有节点都能进行消息消费,在每次队列创建或删除时通过Notice主题广播消息,通知其它节点开始消费或停止消费
public void broadcastQueue(String queueName, String routingKey, boolean isCreated) {
String message = queueName + ":" + routingKey + ":" + (isCreated ? CommonConstants.PROXY_CREATED : CommonConstants.PROXY_DELETED);
rabbitTemplate.convertAndSend(broadcastExchange.getName(), routingKey, message);
LogUtils.logInfo("Broadcasted new queue creation: " + message);
}
(6)队列管控业务类,此类主要提供队列的创建、删除、开启监听等操作,这里的RabbitAdmin、SimpleRabbitListenerContainerFactory等参数可根据业务需求自定义配置,使用时需要设置为自定义的参数,如:proxyRabbitListenerContainerFactory
public DynamicQueueService(ConnectionFactory connectionFactory,
@Qualifier("proxyRabbitAdmin") RabbitAdmin rabbitAdmin,
@Qualifier("proxyRabbitListenerContainerFactory") SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
RabbitTemplate rabbitTemplate,
TopicExchange exchange,
FanoutExchange broadcastExchange) {
this.connectionFactory = connectionFactory;
this.rabbitAdmin = rabbitAdmin;
this.exchange = exchange;
this.broadcastExchange = broadcastExchange;
this.rabbitListenerContainerFactory = rabbitListenerContainerFactory;
this.rabbitTemplate = rabbitTemplate;
}
/**
* 判断队列是否存在
* @param queueName 队列名称
* @return
*/
public boolean isQueueExists(String queueName) {
return Boolean.TRUE.equals(rabbitTemplate.execute(channel -> {
try {
channel.queueDeclarePassive(queueName);
return true;
} catch (Exception e) {
return false;
}
}));
}
/**
* 创建队列并开启监听
* @param queueName 队列名称
* @param routingKey 路由键
* @param handlerBeanName 消费beanName
*/
public void createQueueAndStartListener(String queueName, String routingKey, String handlerBeanName) {
// 创建持久化队列,如果队列已经存在则不会重复创建
Queue queue = new Queue(queueName, true);
if (!isQueueExists(queueName)) {
rabbitAdmin.declareQueue(queue);
log.info("Declared queue: " + queueName);
// 绑定队列到交换器并指定路由键
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
rabbitAdmin.declareBinding(binding);
log.info("Bound queue: " + queueName + " to exchange: " + exchange.getName() + " with routing key: " + routingKey);
} else {
log.info("Queue " + queueName + " already exists.");
}
// 若队列名称不是接收广播队列,则启动监听
if (!queueName.equals(CommonConstants.BROADCAST_EXCHANGE)) {
// 启动监听容器
startListener(queueName, handlerBeanName);
}
}
/**
* 开启监听
* @param queueName 队列名称
* @param handlerBeanName 消费处理bean名称
*/
public void startListener(String queueName, String handlerBeanName) {
// 从 Spring 容器中获取 MessageHandler 实例
Object messageHandler = applicationContext.getBean(handlerBeanName);
// 创建消息监听容器
SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(new MessageListenerAdapter(messageHandler));
// 设置其他参数(例如并发消费者数量,预取数量等)
container.setConcurrentConsumers(concurrency);
container.setPrefetchCount(5);
container.setRecoveryInterval(5000L); // 自动重连间隔时间
// 启动监听容器
try {
container.start();
log.info("Started listener container for queue: " + queueName);
// 检查容器是否正在运行
if (container.isRunning()) {
log.info("Listener container is running.");
} else {
log.warn("Listener container did not start properly.");
}
} catch (Exception e) {
log.error("Failed to start listener container for queue: " + queueName, e);
// 处理启动失败的情况
}
log.info("Started listener container for queue: " + queueName);
// 存储监听容器,以便后续管理
listenerContainers.put(queueName, container);
}
/**
* 删除队列停止监听
* @param queueName 队列名称
*/
public void deleteQueueAndStopListener(String queueName) {
// 停止监听容器
SimpleMessageListenerContainer container = listenerContainers.get(queueName);
if (container != null) {
container.stop();
listenerContainers.remove(queueName);
}
// 删除队列
rabbitAdmin.deleteQueue(queueName);
}
/**
* 广播消息
* @param queueName 队列名
* @param routingKey 路由键
* @param isCreated 创建/删除 (true/false)
*/
public void broadcastQueue(String queueName, String routingKey, boolean isCreated) {
String message = queueName + ":" + routingKey + ":" + (isCreated ? CommonConstants.QUEUE_CREATE : CommonConstants.QUEUE_DELETE);
rabbitTemplate.convertAndSend(broadcastExchange.getName(), routingKey, message);
log.info("Broadcasted new queue creation: " + message);
}
注意事项:
上文所介绍的消费处理业务与队列绑定是通过在配置类中进行实现,针对当前队列设置单独container,
给container设置MessageListenerAdapter进行消费处理类绑定,还有另一种直接在队列创建启动时通
过借助spring的上下文进行实现,底层逻辑相同。
使用applicationContext,在启动队列监听时将具体处理消息的bean名称传入,通过spring容器获取实
例,调用SimpleMessageListenerContainer的setMessageListener方法来配置消息处理类
public void startListener(String queueName, String handlerBeanName) {
// 从 Spring 容器中获取 MessageHandler 实例
Object messageHandler = applicationContext.getBean(handlerBeanName);
// 创建消息监听容器
SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(new MessageListenerAdapter(messageHandler));
// 设置其他参数(例如并发消费者数量,预取数量等)
container.setConcurrentConsumers(concurrency);
container.setPrefetchCount(5);
container.setRecoveryInterval(5000L); // 自动重连间隔时间
// 启动监听容器
try {
container.start();
log.info("Started listener container for queue: " + queueName);
// 检查容器是否正在运行
if (container.isRunning()) {
log.info("Listener container is running.");
} else {
log.warn("Listener container did not start properly.");
}
} catch (Exception e) {
log.error("Failed to start listener container for queue: " + queueName, e);
// 处理启动失败的情况
}
log.info("Started listener container for queue: " + queueName);
}
三、常见问题及解决方式
(1)如何保证消息不丢失
问题描述
从生产者到 mq,再从 mq 到消费者,都有可能因为网络,服务宕机等原因丢失消息。
解决方案
发布确认:保证生产者到交换机,交换机到队列。这里可以使用消息入库方案,将要发送的消息保存
到数据库中。
发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示消息发送了但还没收到确
认;收到确认后将status设为1,表示RabbitMQ已收到消息。使用定时检索消息表,将status=0并且超
过固定时间后(可能消息刚发出去还没来得及确认,这边定时器刚好检索到这条status=0的消息,所以
要设置时间)还没收到确认的消息取出重发 (消息重发可能会造成幂等性问题,这里消费端要做幂等性
处理),可能重发还会失败,所以还要添加一个最大重发次数字段retry_count,超过就做另外的处理。
持久化:交换机、队列、消息持久化,确保mq上消息不会因为宕机丢失。
消息确认:ack模式,每条消息都要确认,保证消费端不丢失。
(2)保证消息幂等性
问题描述
MQ消息的幂等性问题是指在消息传输和处理过程中,确保对同一消息的重复处理不会导致不同的
结果或副作用。幂等性在分布式系统中非常重要,因为消息传递过程中可能会发生重复发送或处理
的情况。
产生原因
消息重复发送:由于网络故障或重试机制,生产者可能会发送相同的消息多次。
消息重复消费:消费者在处理消息时,如果在处理过程中发生故障,消息可能会被重新消费。
网络分区:在网络分区恢复后,可能会有重复的消息传递。
解决方案
使用唯一标识符(ID):给每条消息分配一个唯一的标识符(ID),并在消费时记录已处理过的
ID。对于重复的ID,直接跳过处理。
使用去重表:使用数据库的去重表(如MySQL的唯一索引)来记录已处理的消息ID,在插入消息ID
时,如果已经存在则忽略。
使用乐观锁:在更新数据时,确保数据未被修改过。如果数据被修改过,则重试或跳过处理。
使用分布式锁:使用分布式锁(如Redis的分布式锁)来保证同一时刻只有一个消费者在处理相同的
消息。
(3)保证消息有序性(部分业务需求)
使用单一消费者:在队列上只配置一个消费者,这样可以保证消息按顺序被消费
使用事务:使用事务来确保消息的顺序消费。确保每个消息处理操作在一个事务中完成,只有在事
务成功提交后,消息才被确认。
消息分区:将消息分区,每个分区由一个消费者处理,从而在每个分区内部保持顺序,但允许并行
处理不同分区的消息。
public void sendMessage(String message, String partitionKey) {
String routingKey = getRoutingKey(partitionKey);
rabbitTemplate.convertAndSend("partitionExchange", routingKey, message);
}
private String getRoutingKey(String partitionKey) {
// 根据分区键计算路由键
return "partition" + (partitionKey.hashCode() % 2 + 1);
}
注意:对于rabbitmq实现消息有序性往往需要使用事务、单一消费者,会丧失很多消费性能,建议如果
业务要求消息必须有序可以使用kafka来进行替换
详细demo示例路径:
https://github.com/Itsuming/spring-rabbitmq-demo