SpringBoot 实践系列-集成 RocketMQ

RocketMQ 快速开始

RocketMQ 简介:Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可伸缩性。它提供了多种功能,具体参考: https://github.com/apache/rocketmq

官方指导手册快速开始中提到,RocketMQ 安装需要具体以下条件:

  • 64bit OS, 推荐使用 Linux/Unix/Mac
  • 64bit JDK 1.8+
  • Maven 3.2.x
  • 4g+ free disk for Broker server (这个需要特别关注下)

下载安装和编译

1
2
3
4
5
wget https://archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip
unzip rocketmq-all-4.7.0-source-release.zip
cd rocketmq-all-4.7.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0

1、启动 Name Server

1
2
3
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2、启动 Broker

1
2
3
4
> nohup sh bin/mqbroker -n localhost:9876 &
# nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...

autoCreateTopicEnable:使用 RocketMQ 进行发消息时,必须要指定 topic,对于 topic 的设置有一个开关 autoCreateTopicEnable,一般在开发测试环境中会使用默认设置 autoCreateTopicEnable = true,但是这样就会导致 topic 的设置不容易规范管理,没有统一的审核等等,所以在正式环境中会在 Broker 启动时设置参数 autoCreateTopicEnable = false。这样当需要增加 topic 时就需要在 web 管理界面上或者通过 admin tools 添加即可

SpringBoot 集成

RocketMQ 目前没有提供集成 SpringBoot 的 starter,因此现在接入都是通过引入客户端进行编程。下面来看下 SpringBoot 集成 RocketMQ 的过程。

引入 RocketMQ 客户端依赖

github 上目前更新的最新版本是 4.7.0 版本,这里就使用最新版本:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

提供生产者的自动配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* @author: guolei.sgl (glmapper_2018@163.com) 2020/4/5 5:17 PM
* @since:
**/
@Configuration
public class MQProducerConfiguration {

public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);

@Value("${rocketmq.producer.groupName}")
private String groupName;

@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;

@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize;

@Value("${rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;

@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;

@Bean
@ConditionalOnMissingBean
public DefaultMQProducer defaultMQProducer() throws RuntimeException {
DefaultMQProducer producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
//如果需要同一个 jvm 中不同的 producer 往不同的 mq 集群发送消息,需要设置不同的 instanceName
//producer.setInstanceName(instanceName);
//如果发送消息的最大限制
producer.setMaxMessageSize(this.maxMessageSize);
//如果发送消息超时时间
producer.setSendMsgTimeout(this.sendMsgTimeout);
//如果发送消息失败,设置重试次数,默认为 2 次
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
try {
producer.start();
LOGGER.info("producer is started. groupName:{}, namesrvAddr: {}", groupName, namesrvAddr);
} catch (MQClientException e) {
LOGGER.error("failed to start producer.", e);
throw new RuntimeException(e);
}
return producer;
}
}
  • groupName: 发送同一类消息的设置为同一个 group,保证唯一, 默认不需要设置,rocketmq 会使用 ip@pid(pid代表jvm名字) 作为唯一标示。
  • namesrvAddr:Name Server 地址
  • maxMessageSize:消息最大限制,默认 4M
  • sendMsgTimeout:消息发送超时时间,默认 3 秒
  • retryTimesWhenSendFailed:消息发送失败重试次数,默认 2 次

提供消费者的自动配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Configuration
public class MQConsumerConfiguration {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
// 订阅指定的 topic
@Value("${rocketmq.consumer.topics}")
private String topics;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;

@Autowired
private MessageListenerHandler mqMessageListenerProcessor;

@Bean
@ConditionalOnMissingBean
public DefaultMQPushConsumer defaultMQPushConsumer() throws RuntimeException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.registerMessageListener(mqMessageListenerProcessor);

// 设置 consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置消费模型,集群还是广播,默认为集群
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置一次消费消息的条数,默认为 1 条
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);

try {
// 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,使用*;
consumer.subscribe(topics, "*");
// 启动消费
consumer.start();
LOGGER.info("consumer is started. groupName:{}, topics:{}, namesrvAddr:{}",groupName,topics,namesrvAddr);

} catch (Exception e) {
LOGGER.error("failed to start consumer . groupName:{}, topics:{}, namesrvAddr:{}",groupName,topics,namesrvAddr,e);
throw new RuntimeException(e);
}
return consumer;
}
}

参数参考上述生产者部分。这里配置只是启动的消费端的监听,具体的消费需要再实现一个 MessageListenerConcurrently 接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @author: guolei.sgl (glmapper_2018@163.com) 2020/4/5 5:21 PM
* @since:
**/
@Component
public class MessageListenerHandler implements MessageListenerConcurrently {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageListenerHandler.class);
private static String TOPIC = "DemoTopic";

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
LOGGER.info("receive blank msgs...");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgs.get(0);
String msg = new String(messageExt.getBody());
if (messageExt.getTopic().equals(TOPIC)) {
// mock 消费逻辑
mockConsume(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

private void mockConsume(String msg){
LOGGER.info("receive msg: {}.", msg);
}
}

使用客户端发送消息

使用客户端发送消息的逻辑比较简单,就是拿到 DefaultMQProducer 对象,调用 send 方法,支持同步、异步、oneway 等多种调用方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
public class TestController {

private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class);

private static String TOPIC = "DemoTopic";
private static String TAGS = "glmapperTags";

@Autowired
private DefaultMQProducer defaultMQProducer;

@RequestMapping("send")
public String test() throws Throwable {
Message msg = new Message(TOPIC, TAGS, ("Say Hello RocketMQ to Glmapper").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 调用客户端发送消息
SendResult sendResult = defaultMQProducer.send(msg);
LOGGER.info("sendResult: {}.",sendResult);
return "SUCCESS";
}
}

测试

这里的测试应用是将生产端和消费端放在一起的,所以配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
spring.application.name=test-rocket
server.port=8008
#producer
rocketmq.producer.isOnOff=on #该应用是否启用生产者
rocketmq.producer.groupName=${spring.application.name}
rocketmq.producer.namesrvAddr=sofa.cloud.alipay.net:9876
rocketmq.producer.maxMessageSize=4096
rocketmq.producer.sendMsgTimeout=3000
rocketmq.producer.retryTimesWhenSendFailed=2

#consumer
rocketmq.consumer.isOnOff=on #该应用是否启用消费者
rocketmq.consumer.groupName=${spring.application.name}
rocketmq.consumer.namesrvAddr=sofa.cloud.alipay.net:9876
rocketmq.consumer.topics=DemoTopic
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
rocketmq.consumer.consumeMessageBatchMaxSize=1

启动程序,查看日志输出:

1
2
2020-04-05 22:53:15.141  INFO 46817 --- [           main] c.g.b.b.c.MQProducerConfiguration        : producer is started. groupName:test-rocket, namesrvAddr: sofa.cloud.alipay.net:9876
2020-04-05 22:53:15.577 INFO 46817 --- [ main] c.g.b.b.c.MQConsumerConfiguration : consumer is started. groupName:test-rocket, topics:DemoTopic, namesrvAddr:sofa.cloud.alipay.net:9876

这里看到,生产者和消费者自动配置已经生效并启动完成。通过 curl localhost:8008/send 来触发消息发送:

1
2
2020-04-05 22:54:21.654  INFO 46817 --- [nio-8008-exec-1] c.g.b.boot.controller.TestController     : sendResult: SendResult [sendStatus=SEND_OK, msgId=1E0FC3A2B6E118B4AAC21983B3C50000, offsetMsgId=64583D7C00002A9F0000000000011788, messageQueue=MessageQueue [topic=DemoTopic, brokerName=sofa.cloud.alipay.net, queueId=6], queueOffset=50].
2020-04-05 22:54:21.658 INFO 46817 --- [MessageThread_1] c.g.b.b.p.MessageListenerHandler : receive msg: Say Hello RocketMQ to Glmapper.

看到发送消息的日志和接受消息的日志。

使用 hook 拦截消息

RocKetMQ 中提供了两个 hook 接口:SendMessageHook 和 ConsumeMessageHook 接口,可以用于在消息发送之前、之后,消息消费之前、之后对消息进行拦截,官方文档中并没有关于这部分的描述,那么这里我们就来看下如何使用这两个 hook 接口来搞点事情。

SendMessageHook

自定义一个 ProducerTestHook ,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ProducerTestHook implements SendMessageHook {

public static final Logger LOGGER = LoggerFactory.getLogger(ProducerTestHook.class);

@Override
public String hookName() {
return ProducerTestHook.class.getName();
}

@Override
public void sendMessageBefore(SendMessageContext sendMessageContext) {
LOGGER.info("execute sendMessageBefore. sendMessageContext:{}", sendMessageContext);
}

@Override
public void sendMessageAfter(SendMessageContext sendMessageContext) {
LOGGER.info("execute sendMessageAfter. sendMessageContext:{}", sendMessageContext);
}
}

在上面生产者的自动配置类中,将 ProducerTestHook 注册给 producer。

1
2
// 注册 SendMessageHook
producer.getDefaultMQProducerImpl().registerSendMessageHook(new ProducerTestHook());

ConsumeMessageHook

自定义一个 ConsumerTestHook ,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ConsumerTestHook implements ConsumeMessageHook {

public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTestHook.class);

@Override
public String hookName() {
return ConsumerTestHook.class.getName();
}

@Override
public void consumeMessageBefore(ConsumeMessageContext consumeMessageContext) {
LOGGER.info("execute consumeMessageBefore. consumeMessageContext: {}",consumeMessageContext);
}

@Override
public void consumeMessageAfter(ConsumeMessageContext consumeMessageContext) {
LOGGER.info("execute consumeMessageAfter. consumeMessageContext: {}",consumeMessageContext);
}
}

在上面消费者的自动配置类中,将 ConsumerTestHook 注册给 consumer

1
2
// 注册 ConsumeMessageHook
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumerTestHook());

执行结果如下:

1
2
3
4
5
6
execute sendMessageBefore. sendMessageContext:org.apache.rocketmq.client.hook.SendMessageContext@a50ea34
execute sendMessageAfter. sendMessageContext:org.apache.rocketmq.client.hook.SendMessageContext@a50ea34
sendResult: SendResult [sendStatus=SEND_OK, msgId=0A0FE8F8C02F18B4AAC21C1275FB0000, offsetMsgId=64583D7C00002A9F0000000000011850, messageQueue=MessageQueue [topic=DemoTopic, brokerName=sofa.cloud.alipay.net, queueId=5], queueOffset=50].
execute consumeMessageBefore. consumeMessageContext: org.apache.rocketmq.client.hook.ConsumeMessageContext@6482209a
receive msg: Say Hello RocketMQ to Glmapper.
execute consumeMessageAfter. consumeMessageContext: org.apache.rocketmq.client.hook.ConsumeMessageContext@6482209a

遇到的一些问题

集成过程中遇到几个问题记录如下:

1、Broker 启动失败。

我在测试时遇到的情况是,在 Name Server 启动之后,再启动 Boker 时,ssh 连接会直接提示 connect conversation fail. 通过 dmesg | egrep -i -B100 'killed process' 查看进程被 kill 的记录,得到如下日志:

1
2
3
4
5
6
[2257026.030741] Memory cgroup out of memory: Kill process 110719 (systemd) score 0 or sacrifice child
[2257026.031888] Killed process 100735 (sh) total-vm:15708kB, anon-rss:176kB, file-rss:1800kB, shmem-rss:0kB
[2257026.133506] Memory cgroup out of memory: Kill process 110719 (systemd) score 0 or sacrifice child
[2257026.133539] Killed process 100745 (vsar) total-vm:172560kB, anon-rss:22936kB, file-rss:1360kB, shmem-rss:0kB
[2257026.206872] Memory cgroup out of memory: Kill process 104617 (java) score 3 or sacrifice child
[2257026.207742] Killed process 104617 (java) total-vm:9092924kB, anon-rss:4188528kB, file-rss:496kB, shmem-rss:0kB

那这里看到的结论是发生了 OOM,这里是启动时没哟分配到足够的空间导致的(默认配置文件初始内存设置的太大了)。解决办法是:进入到编译之后的 distribution/target/apache-rocketmq/bin 目录,找到 runbroker.sh 和 runserver.sh 两个脚本文件,这两个脚本理解启动时默认指定的参数是非常大的(4g/8g/2g),我线下测试机器总共才 1c2g,所以适当的调整了下参数:

  • runserver.sh
1
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  • runbroker.sh
1
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改后重新启动 namesrv 和 broker ,正常了

1
2
3
4
$ jps
98633 Jps
55689 BrokerStartup
54906 NamesrvStartup

2、No Topic Route Info,xxx

这个在官方的 FAQ 里面有提到,说明遇到的频次一定是很高的。官方给出的方案可以详解这里 http://rocketmq.apache.org/docs/faq/ 第4条。我是通过 If you can’t find this topic, create it on a broker via admin tools command updateTopic or web console. 这个解决的:

1
2
3
4
5
sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t DemoTopic # 执行此指令,创建 DemoTopic
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create topic to localhost:10911 success.
TopicConfig [topicName=DemoTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]

总结

之前在做 SOFATracer 集成消息组件时有看过 RocketMQ 的部分代码,但是在实际操作时还是饶了不少弯路。总体来看,SpringBoot 集成 RocketMQ 还是比较简单的,在此记录一下。如果文中有描述有误的地方,还请各位大佬留言指正。

参考文档

作者

卫恒

发布于

2020-04-05

更新于

2022-04-23

许可协议

评论