SpringBoot 3.4.3 整合 RabbitMQ 4.0.7 实战指南

1. 概述

消息队列是分布式系统中不可或缺的组件,用于解耦、异步处理、削峰填谷等场景。RabbitMQ 作为一种高性能、可靠的消息队列中间件,广泛应用于企业生产环境。本文将详细介绍如何在 SpringBoot 3.4.3 项目中整合 RabbitMQ 4.0.7,并实现三种交换机类型(Direct、Fanout、Topic)的完整示例。

2. RabbitMQ 核心概念

在深入实现之前,先简单回顾 RabbitMQ 的几个核心概念:

  • Producer:生产者,发送消息的应用程序
  • Consumer:消费者,接收和处理消息的应用程序
  • Queue:队列,存储消息的缓冲区
  • Exchange:交换机,接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列
  • Binding:绑定,Exchange 和 Queue 之间的关联关系
  • Routing Key:路由键,Exchange 根据路由键将消息路由到队列

RabbitMQ 支持多种类型的交换机,本文将重点介绍三种:

  1. Direct Exchange:直接交换机,根据精确的 Routing Key 匹配将消息发送到指定队列
  2. Fanout Exchange:扇形交换机,将消息广播到所有绑定的队列,忽略 Routing Key
  3. Topic Exchange:主题交换机,根据通配符匹配的 Routing Key 将消息发送到相应队列

3. 环境准备

  • SpringBoot 3.4.3
  • RabbitMQ 4.0.7
  • JDK 17
  • Maven 3.8+

4. 项目配置

4.1 添加依赖

首先在 pom.xml 文件中添加 RabbitMQ 相关依赖:

1
2
3
4
5
<!-- RabbitMQ 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在 SpringBoot 项目中,无需指定版本号,因为 SpringBoot 父 POM 已经管理了依赖版本。

4.2 RabbitMQ 配置

application-rabbitmq.yml 中添加 RabbitMQ 相关配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 消息确认机制
publisher-confirm-type: correlated # 开启发布确认
publisher-returns: true # 开启发布返回
template:
mandatory: true # 设置为true表示交换机找不到队列时会将消息返回给生产者
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 限制每次发送一条数据给消费者
concurrency: 3 # 消费者最小实例数
max-concurrency: 10 # 消费者最大实例数
# 重试相关配置
retry:
enabled: true # 开启重试
max-attempts: 3 # 最大重试次数
initial-interval: 1000 # 重试间隔(毫秒)
multiplier: 1.0 # 间隔乘数
max-interval: 10000 # 最大重试间隔
default-requeue-rejected: false # 重试次数超过上限后是否重新入队

在主配置文件 application.yml 中启用 RabbitMQ 配置文件:

1
2
3
spring:
profiles:
active: dev,rabbitmq

5. 实现 RabbitMQ 配置类

创建 RabbitMQ 配置类,定义交换机、队列和绑定关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package com.xingcy.app.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ配置类
*
* @author xingcy
*/
@Configuration
public class RabbitMQConfig {

// ------------- Direct Exchange -------------
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE = "direct.queue";
public static final String DIRECT_ROUTING_KEY = "direct.routing.key";

// ------------- Fanout Exchange -------------
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE_A = "fanout.queue.a";
public static final String FANOUT_QUEUE_B = "fanout.queue.b";

// ------------- Topic Exchange -------------
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE_A = "topic.queue.a";
public static final String TOPIC_QUEUE_B = "topic.queue.b";
public static final String TOPIC_ROUTING_KEY_A = "topic.routing.key.a.*"; // 匹配topic.routing.key.a.任意一个词
public static final String TOPIC_ROUTING_KEY_B = "topic.routing.key.b.#"; // 匹配topic.routing.key.b.任意多个词

/**
* 配置消息转换器
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

/**
* 配置RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}

// ------------------ Direct Exchange ------------------
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}

@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE);
}

@Bean
public Binding directBinding(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
}

// ------------------ Fanout Exchange ------------------
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}

@Bean
public Queue fanoutQueueA() {
return new Queue(FANOUT_QUEUE_A);
}

@Bean
public Queue fanoutQueueB() {
return new Queue(FANOUT_QUEUE_B);
}

@Bean
public Binding fanoutBindingA(Queue fanoutQueueA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
}

@Bean
public Binding fanoutBindingB(Queue fanoutQueueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
}

// ------------------ Topic Exchange ------------------
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}

@Bean
public Queue topicQueueA() {
return new Queue(TOPIC_QUEUE_A);
}

@Bean
public Queue topicQueueB() {
return new Queue(TOPIC_QUEUE_B);
}

@Bean
public Binding topicBindingA(Queue topicQueueA, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueA).to(topicExchange).with(TOPIC_ROUTING_KEY_A);
}

@Bean
public Binding topicBindingB(Queue topicQueueB, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueB).to(topicExchange).with(TOPIC_ROUTING_KEY_B);
}
}

6. 创建消息模型

定义消息传输对象(DTO)用于生产者和消费者之间传递消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.xingcy.app.mq.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;

/**
* 消息传输对象
*
* @author xingcy
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageDto implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 消息ID
*/
private String id;

/**
* 消息内容
*/
private String content;

/**
* 交换机类型
*/
private String exchangeType;

/**
* 发送时间
*/
private Date sendTime;

/**
* 额外数据
*/
private Object data;
}

7. 实现消息生产者

创建消息生产者类,负责发送消息到不同类型的交换机:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.xingcy.app.mq.producer;

import com.xingcy.app.mq.config.RabbitMQConfig;
import com.xingcy.app.mq.model.MessageDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

/**
* 消息生产者
*
* @author xingcy
*/
@Slf4j
@Component
public class MessageProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送Direct类型消息
*/
public String sendDirectMessage(String content, Object data) {
String messageId = UUID.randomUUID().toString();
MessageDto message = new MessageDto(
messageId,
content,
"Direct",
new Date(),
data
);

log.info("发送Direct消息,id: {}, content: {}", messageId, content);
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.DIRECT_ROUTING_KEY, message);
return messageId;
}

/**
* 发送Fanout类型消息
*/
public String sendFanoutMessage(String content, Object data) {
String messageId = UUID.randomUUID().toString();
MessageDto message = new MessageDto(
messageId,
content,
"Fanout",
new Date(),
data
);

log.info("发送Fanout消息,id: {}, content: {}", messageId, content);
// Fanout类型交换机忽略routingKey,这里传空字符串
rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, "", message);
return messageId;
}

/**
* 发送Topic类型消息到队列A
*/
public String sendTopicMessageToA(String content, Object data) {
String messageId = UUID.randomUUID().toString();
MessageDto message = new MessageDto(
messageId,
content,
"Topic-A",
new Date(),
data
);

log.info("发送Topic消息到队列A,id: {}, content: {}", messageId, content);
// 发送到topic.routing.key.a.anything,匹配队列A
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.routing.key.a.test", message);
return messageId;
}

/**
* 发送Topic类型消息到队列B
*/
public String sendTopicMessageToB(String content, Object data) {
String messageId = UUID.randomUUID().toString();
MessageDto message = new MessageDto(
messageId,
content,
"Topic-B",
new Date(),
data
);

log.info("发送Topic消息到队列B,id: {}, content: {}", messageId, content);
// 发送到topic.routing.key.b.anything.else,匹配队列B
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.routing.key.b.test.any", message);
return messageId;
}

/**
* 发送Topic类型消息到所有队列
*/
public String sendTopicMessageToAll(String content, Object data) {
String messageId = UUID.randomUUID().toString();
MessageDto message = new MessageDto(
messageId,
content,
"Topic-All",
new Date(),
data
);

log.info("发送Topic消息到所有队列,id: {}, content: {}", messageId, content);
// 这个方法会分别发送到队列A和队列B,使用不同的routingKey
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.routing.key.a.all", message);

// 需要克隆一个新的消息对象,否则消息ID会相同
MessageDto newMessage = new MessageDto(
messageId,
content,
"Topic-All",
new Date(),
data
);
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.routing.key.b.all.test", newMessage);

return messageId;
}
}

8. 实现消息消费者

创建消息消费者类,负责消费来自不同队列的消息,并支持手动确认和消息重试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.xingcy.app.mq.consumer;

import com.xingcy.app.mq.config.RabbitMQConfig;
import com.xingcy.app.mq.model.MessageDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* 消息消费者
*
* @author xingcy
*/
@Slf4j
@Component
public class MessageConsumer {

/**
* 消息重试计数器,使用消息ID作为键
*/
private final Map<String, Integer> retryCountMap = new ConcurrentHashMap<>();

/**
* 最大重试次数
*/
private static final int MAX_RETRY_COUNT = 3;

/**
* 消费Direct队列消息
*/
@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)
public void consumeDirectMessage(MessageDto message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
processQueueMessage(message, channel, deliveryTag, "Direct队列");
}

/**
* 消费Fanout队列A消息
*/
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_A)
public void consumeFanoutMessageA(MessageDto message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
processQueueMessage(message, channel, deliveryTag, "Fanout队列A");
}

/**
* 消费Fanout队列B消息
*/
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_B)
public void consumeFanoutMessageB(MessageDto message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
processQueueMessage(message, channel, deliveryTag, "Fanout队列B");
}

/**
* 消费Topic队列A消息
*/
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_A)
public void consumeTopicMessageA(MessageDto message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
processQueueMessage(message, channel, deliveryTag, "Topic队列A");
}

/**
* 消费Topic队列B消息
*/
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_B)
public void consumeTopicMessageB(MessageDto message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
processQueueMessage(message, channel, deliveryTag, "Topic队列B");
}

/**
* 统一处理队列消息的方法
*/
private void processQueueMessage(MessageDto message, Channel channel, long deliveryTag, String queueName) throws IOException {
String messageId = message.getId();
try {
log.info("{}接收到消息:id={}, content={}, time={}",
queueName, messageId, message.getContent(), message.getSendTime());

// 业务处理逻辑
processMessage(message);

// 处理成功,清除重试计数
retryCountMap.remove(messageId);

// 手动确认消息已处理
channel.basicAck(deliveryTag, false);
log.info("{}消息{}处理成功", queueName, messageId);
} catch (Exception e) {
handleProcessException(messageId, channel, deliveryTag, queueName, e);
}
}

/**
* 处理消息异常的方法
*/
private void handleProcessException(String messageId, Channel channel, long deliveryTag, String queueName, Exception e) throws IOException {
log.error("{}消息处理异常: {}", queueName, e.getMessage(), e);

// 获取当前重试次数
int retryCount = retryCountMap.getOrDefault(messageId, 0) + 1;
log.info("消息{}当前重试次数: {}", messageId, retryCount);

if (retryCount <= MAX_RETRY_COUNT) {
// 更新重试次数
retryCountMap.put(messageId, retryCount);
// 消息处理失败,拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
log.info("消息{}重新入队,等待第{}次重试", messageId, retryCount);
} else {
// 超过最大重试次数,记录日志,不再重新入队
log.error("消息{}处理失败,已超过最大重试次数{}", messageId, MAX_RETRY_COUNT);
// 清除重试计数
retryCountMap.remove(messageId);
// 拒绝消息且不再重新入队
channel.basicNack(deliveryTag, false, false);
// 也可以选择发送到死信队列进行后续处理
}
}

/**
* 处理消息的通用方法
*/
private void processMessage(MessageDto message) {
// 模拟业务处理
try {
// 模拟处理时间
Thread.sleep(100);
log.info("消息{}处理完成", message.getId());
} catch (InterruptedException e) {
log.error("消息处理异常", e);
Thread.currentThread().interrupt();
}
}
}

9. 创建API接口

最后,创建 Controller 提供给前端调用的接口,用于测试消息发送和消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.xingcy.app.mq.controller;

import com.xingcy.app.mq.model.MessageDto;
import com.xingcy.app.mq.producer.MessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

/**
* RabbitMQ测试接口
*
* @author xingcy
*/
@Slf4j
@RestController
@RequestMapping("/api/mq")
public class RabbitMQController {

@Autowired
private MessageProducer messageProducer;

/**
* 发送Direct类型消息
*/
@PostMapping("/direct")
public Map<String, Object> sendDirectMessage(@RequestParam String content) {
log.info("发送Direct消息,内容:{}", content);

Map<String, Object> data = new HashMap<>();
data.put("type", "direct");
data.put("timestamp", System.currentTimeMillis());

String messageId = messageProducer.sendDirectMessage(content, data);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("messageId", messageId);
result.put("type", "Direct");
return result;
}

/**
* 发送Fanout类型消息
*/
@PostMapping("/fanout")
public Map<String, Object> sendFanoutMessage(@RequestParam String content) {
log.info("发送Fanout消息,内容:{}", content);

Map<String, Object> data = new HashMap<>();
data.put("type", "fanout");
data.put("timestamp", System.currentTimeMillis());

String messageId = messageProducer.sendFanoutMessage(content, data);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("messageId", messageId);
result.put("type", "Fanout");
return result;
}

/**
* 发送Topic类型消息到队列A
*/
@PostMapping("/topic/a")
public Map<String, Object> sendTopicMessageToA(@RequestParam String content) {
log.info("发送Topic消息到队列A,内容:{}", content);

Map<String, Object> data = new HashMap<>();
data.put("type", "topic-a");
data.put("timestamp", System.currentTimeMillis());

String messageId = messageProducer.sendTopicMessageToA(content, data);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("messageId", messageId);
result.put("type", "Topic-A");
return result;
}

/**
* 发送Topic类型消息到队列B
*/
@PostMapping("/topic/b")
public Map<String, Object> sendTopicMessageToB(@RequestParam String content) {
log.info("发送Topic消息到队列B,内容:{}", content);

Map<String, Object> data = new HashMap<>();
data.put("type", "topic-b");
data.put("timestamp", System.currentTimeMillis());

String messageId = messageProducer.sendTopicMessageToB(content, data);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("messageId", messageId);
result.put("type", "Topic-B");
return result;
}

/**
* 发送Topic类型消息到所有队列
*/
@PostMapping("/topic/all")
public Map<String, Object> sendTopicMessageToAll(@RequestParam String content) {
log.info("发送Topic消息到所有队列,内容:{}", content);

Map<String, Object> data = new HashMap<>();
data.put("type", "topic-all");
data.put("timestamp", System.currentTimeMillis());

String messageId = messageProducer.sendTopicMessageToAll(content, data);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("messageId", messageId);
result.put("type", "Topic-All");
return result;
}
}

10. 测试接口

项目实现完成后,可以使用以下接口进行测试:

  1. 发送Direct消息:

    1
    POST /api/mq/direct?content=测试Direct消息
  2. 发送Fanout消息:

    1
    POST /api/mq/fanout?content=测试Fanout消息
  3. 发送Topic消息到队列A:

    1
    POST /api/mq/topic/a?content=测试Topic消息A
  4. 发送Topic消息到队列B:

    1
    POST /api/mq/topic/b?content=测试Topic消息B
  5. 发送Topic消息到所有队列:

    1
    POST /api/mq/topic/all?content=测试Topic消息All

11. 总结与最佳实践

在 SpringBoot 项目中整合 RabbitMQ 时,我们需要注意以下几点:

  1. 消息序列化:使用 Jackson2JsonMessageConverter 将 Java 对象转换为 JSON 格式,方便传输和解析。

  2. 消息确认机制:通过手动确认(manual acknowledge)确保消息被正确处理,避免消息丢失。

  3. 异常处理:在消费者端捕获异常,并根据业务需求决定是否重新入队消息。

  4. 消息重试:实现消息重试机制,对于处理失败的消息给予一定次数的重试机会,超过重试次数后再做其他处理。

  5. 代码优化:抽取公共方法,减少重复代码,提高代码可维护性。

  6. 日志记录:全面记录消息的发送、接收和处理过程,便于排查问题。

通过本文的实现,我们不仅完成了 RabbitMQ 的基本整合,还实现了消息的手动确认和重试机制,提高了系统的可靠性和稳定性。在实际生产环境中,还可以根据业务需求扩展更多功能,如消息持久化、死信队列、延迟队列等。

希望本文对你在 SpringBoot 项目中集成 RabbitMQ 有所帮助!

参考资料

  1. Spring AMQP 官方文档
  2. RabbitMQ 官方文档
  3. Spring Boot 官方文档