glmapper

SpringBoot 实践系列-集成 RocketMQ

字数统计: 2.6k阅读时长: 12 min
2020/04/05 Share

RocketMQ 简介:Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可伸缩性。它提供了多种功能,具体参考: https://github.com/apache/rocketmq

RocketMQ 快速开始

官方指导手册快速开始中提到,RocketMQ 安装需要具体以下条件:

  • 64bit OS, 推荐使用 Linux/Unix/Mac
  • 64bit JDK 1.8+
  • Maven 3.2.x
  • 4g+ free disk for Broker server (这个需要特别关注下)

下载安装和编译

1
2
3
4
5
wget https://archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip
unzip rocketmq-all-4.7.0-source-release.zip
cd rocketmq-all-4.7.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0

1、启动 Name Server

1
2
3
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2、启动 Broker

1
2
3
4
> nohup sh bin/mqbroker -n localhost:9876 &
# nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...

autoCreateTopicEnable:使用 RocketMQ 进行发消息时,必须要指定 topic,对于 topic 的设置有一个开关 autoCreateTopicEnable,一般在开发测试环境中会使用默认设置 autoCreateTopicEnable = true,但是这样就会导致 topic 的设置不容易规范管理,没有统一的审核等等,所以在正式环境中会在 Broker 启动时设置参数 autoCreateTopicEnable = false。这样当需要增加 topic 时就需要在 web 管理界面上或者通过 admin tools 添加即可

SpringBoot 集成

RocketMQ 目前没有提供集成 SpringBoot 的 starter,因此现在接入都是通过引入客户端进行编程。下面来看下 SpringBoot 集成 RocketMQ 的过程。

引入 RocketMQ 客户端依赖

github 上目前更新的最新版本是 4.7.0 版本,这里就使用最新版本:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>

提供生产者的自动配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* @author: guolei.sgl (glmapper_2018@163.com) 2020/4/5 5:17 PM
* @since:
**/
@Configuration
public class MQProducerConfiguration {

public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);

@Value("${rocketmq.producer.groupName}")
private String groupName;

@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;

@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize;

@Value("${rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;

@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;

@Bean
@ConditionalOnMissingBean
public DefaultMQProducer defaultMQProducer() throws RuntimeException {
DefaultMQProducer producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
//如果需要同一个 jvm 中不同的 producer 往不同的 mq 集群发送消息,需要设置不同的 instanceName
//producer.setInstanceName(instanceName);
//如果发送消息的最大限制
producer.setMaxMessageSize(this.maxMessageSize);
//如果发送消息超时时间
producer.setSendMsgTimeout(this.sendMsgTimeout);
//如果发送消息失败,设置重试次数,默认为 2 次
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
try {
producer.start();
LOGGER.info("producer is started. groupName:{}, namesrvAddr: {}", groupName, namesrvAddr);
} catch (MQClientException e) {
LOGGER.error("failed to start producer.", e);
throw new RuntimeException(e);
}
return producer;
}
}
  • groupName: 发送同一类消息的设置为同一个 group,保证唯一, 默认不需要设置,rocketmq 会使用 ip@pid(pid代表jvm名字) 作为唯一标示。
  • namesrvAddr:Name Server 地址
  • maxMessageSize:消息最大限制,默认 4M
  • sendMsgTimeout:消息发送超时时间,默认 3 秒
  • retryTimesWhenSendFailed:消息发送失败重试次数,默认 2 次

提供消费者的自动配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Configuration
public class MQConsumerConfiguration {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
// 订阅指定的 topic
@Value("${rocketmq.consumer.topics}")
private String topics;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;

@Autowired
private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;

@Bean
@ConditionalOnMissingBean
public DefaultMQPushConsumer defaultMQPushConsumer() throws RuntimeException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.registerMessageListener(mqMessageListenerProcessor);

// 设置 consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置消费模型,集群还是广播,默认为集群
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置一次消费消息的条数,默认为 1 条
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);

try {
// 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,使用*;
consumer.subscribe(topics, "*");
// 启动消费
consumer.start();
LOGGER.info("consumer is started. groupName:{}, topics:{}, namesrvAddr:{}",groupName,topics,namesrvAddr);

} catch (Exception e) {
LOGGER.error("failed to start consumer . groupName:{}, topics:{}, namesrvAddr:{}",groupName,topics,namesrvAddr,e);
throw new RuntimeException(e);
}
return consumer;
}
}

参数参考上述生产者部分。这里配置只是启动的消费端的监听,具体的消费需要再实现一个 MessageListenerConcurrently 接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @author: guolei.sgl (glmapper_2018@163.com) 2020/4/5 5:21 PM
* @since:
**/
@Component
public class MessageListenerHandler implements MessageListenerConcurrently {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageListenerHandler.class);
private static String TOPIC = "DemoTopic";

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
LOGGER.info("receive blank msgs...");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgs.get(0);
String msg = new String(messageExt.getBody());
if (messageExt.getTopic().equals(TOPIC)) {
// mock 消费逻辑
mockConsume(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

private void mockConsume(String msg){
LOGGER.info("receive msg: {}.", msg);
}
}

使用客户端发送消息

使用客户端发送消息的逻辑比较简单,就是拿到 DefaultMQProducer 对象,调用 send 方法,支持同步、异步、oneway 等多种调用方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
public class TestController {

private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class);

private static String TOPIC = "DemoTopic";
private static String TAGS = "glmapperTags";

@Autowired
private DefaultMQProducer defaultMQProducer;

@RequestMapping("send")
public String test() throws Throwable {
Message msg = new Message(TOPIC, TAGS, ("Say Hello RocketMQ to Glmapper").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 调用客户端发送消息
SendResult sendResult = defaultMQProducer.send(msg);
LOGGER.info("sendResult: {}.",sendResult);
return "SUCCESS";
}
}

测试

这里的测试应用是将生产端和消费端放在一起的,所以配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
spring.application.name=test-rocket
server.port=8008
#producer
rocketmq.producer.isOnOff=on #该应用是否启用生产者
rocketmq.producer.groupName=${spring.application.name}
rocketmq.producer.namesrvAddr=sofa.cloud.alipay.net:9876
rocketmq.producer.maxMessageSize=4096
rocketmq.producer.sendMsgTimeout=3000
rocketmq.producer.retryTimesWhenSendFailed=2

#consumer
rocketmq.consumer.isOnOff=on #该应用是否启用消费者
rocketmq.consumer.groupName=${spring.application.name}
rocketmq.consumer.namesrvAddr=sofa.cloud.alipay.net:9876
rocketmq.consumer.topics=DemoTopic
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
rocketmq.consumer.consumeMessageBatchMaxSize=1

启动程序,查看日志输出:

1
2
2020-04-05 22:53:15.141  INFO 46817 --- [           main] c.g.b.b.c.MQProducerConfiguration        : producer is started. groupName:test-rocket, namesrvAddr: sofa.cloud.alipay.net:9876
2020-04-05 22:53:15.577 INFO 46817 --- [ main] c.g.b.b.c.MQConsumerConfiguration : consumer is started. groupName:test-rocket, topics:DemoTopic, namesrvAddr:sofa.cloud.alipay.net:9876

这里看到,生产者和消费者自动配置已经生效并启动完成。通过 curl localhost:8008/send 来触发消息发送:

1
2
2020-04-05 22:54:21.654  INFO 46817 --- [nio-8008-exec-1] c.g.b.boot.controller.TestController     : sendResult: SendResult [sendStatus=SEND_OK, msgId=1E0FC3A2B6E118B4AAC21983B3C50000, offsetMsgId=64583D7C00002A9F0000000000011788, messageQueue=MessageQueue [topic=DemoTopic, brokerName=sofa.cloud.alipay.net, queueId=6], queueOffset=50].
2020-04-05 22:54:21.658 INFO 46817 --- [MessageThread_1] c.g.b.b.p.MessageListenerHandler : receive msg: Say Hello RocketMQ to Glmapper.

看到发送消息的日志和接受消息的日志。

使用 hook 拦截消息

RocKetMQ 中提供了两个 hook 接口:SendMessageHook 和 ConsumeMessageHook 接口,可以用于在消息发送之前、之后,消息消费之前、之后对消息进行拦截,官方文档中并没有关于这部分的描述,那么这里我们就来看下如何使用这两个 hook 接口来搞点事情。

SendMessageHook

自定义一个 ProducerTestHook ,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ProducerTestHook implements SendMessageHook {

public static final Logger LOGGER = LoggerFactory.getLogger(ProducerTestHook.class);

@Override
public String hookName() {
return ProducerTestHook.class.getName();
}

@Override
public void sendMessageBefore(SendMessageContext sendMessageContext) {
LOGGER.info("execute sendMessageBefore. sendMessageContext:{}", sendMessageContext);
}

@Override
public void sendMessageAfter(SendMessageContext sendMessageContext) {
LOGGER.info("execute sendMessageAfter. sendMessageContext:{}", sendMessageContext);
}
}

在上面生产者的自动配置类中,将 ProducerTestHook 注册给 producer。

1
2
// 注册 SendMessageHook
producer.getDefaultMQProducerImpl().registerSendMessageHook(new ProducerTestHook());

ConsumeMessageHook

自定义一个 ConsumerTestHook ,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ConsumerTestHook implements ConsumeMessageHook {

public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTestHook.class);

@Override
public String hookName() {
return ConsumerTestHook.class.getName();
}

@Override
public void consumeMessageBefore(ConsumeMessageContext consumeMessageContext) {
LOGGER.info("execute consumeMessageBefore. consumeMessageContext: {}",consumeMessageContext);
}

@Override
public void consumeMessageAfter(ConsumeMessageContext consumeMessageContext) {
LOGGER.info("execute consumeMessageAfter. consumeMessageContext: {}",consumeMessageContext);
}
}

在上面消费者的自动配置类中,将 ConsumerTestHook 注册给 consumer

1
2
// 注册 ConsumeMessageHook
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumerTestHook());

执行结果如下:

1
2
3
4
5
6
execute sendMessageBefore. sendMessageContext:org.apache.rocketmq.client.hook.SendMessageContext@a50ea34
execute sendMessageAfter. sendMessageContext:org.apache.rocketmq.client.hook.SendMessageContext@a50ea34
sendResult: SendResult [sendStatus=SEND_OK, msgId=0A0FE8F8C02F18B4AAC21C1275FB0000, offsetMsgId=64583D7C00002A9F0000000000011850, messageQueue=MessageQueue [topic=DemoTopic, brokerName=sofa.cloud.alipay.net, queueId=5], queueOffset=50].
execute consumeMessageBefore. consumeMessageContext: org.apache.rocketmq.client.hook.ConsumeMessageContext@6482209a
receive msg: Say Hello RocketMQ to Glmapper.
execute consumeMessageAfter. consumeMessageContext: org.apache.rocketmq.client.hook.ConsumeMessageContext@6482209a

遇到的一些问题

集成过程中遇到几个问题记录如下:

1、Broker 启动失败。

我在测试时遇到的情况是,在 Name Server 启动之后,再启动 Boker 时,ssh 连接会直接提示 connect conversation fail. 通过 dmesg | egrep -i -B100 'killed process' 查看进程被 kill 的记录,得到如下日志:

1
2
3
4
5
6
[2257026.030741] Memory cgroup out of memory: Kill process 110719 (systemd) score 0 or sacrifice child
[2257026.031888] Killed process 100735 (sh) total-vm:15708kB, anon-rss:176kB, file-rss:1800kB, shmem-rss:0kB
[2257026.133506] Memory cgroup out of memory: Kill process 110719 (systemd) score 0 or sacrifice child
[2257026.133539] Killed process 100745 (vsar) total-vm:172560kB, anon-rss:22936kB, file-rss:1360kB, shmem-rss:0kB
[2257026.206872] Memory cgroup out of memory: Kill process 104617 (java) score 3 or sacrifice child
[2257026.207742] Killed process 104617 (java) total-vm:9092924kB, anon-rss:4188528kB, file-rss:496kB, shmem-rss:0kB

那这里看到的结论是发生了 OOM,这里是启动时没哟分配到足够的空间导致的(默认配置文件初始内存设置的太大了)。解决办法是:进入到编译之后的 distribution/target/apache-rocketmq/bin 目录,找到 runbroker.sh 和 runserver.sh 两个脚本文件,这两个脚本理解启动时默认指定的参数是非常大的(4g/8g/2g),我线下测试机器总共才 1c2g,所以适当的调整了下参数:

  • runserver.sh
1
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  • runbroker.sh
1
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

修改后重新启动 namesrv 和 broker ,正常了

1
2
3
4
$ jps
98633 Jps
55689 BrokerStartup
54906 NamesrvStartup

2、No Topic Route Info,xxx

这个在官方的 FAQ 里面有提到,说明遇到的频次一定是很高的。官方给出的方案可以详解这里 http://rocketmq.apache.org/docs/faq/ 第4条。我是通过 If you can’t find this topic, create it on a broker via admin tools command updateTopic or web console. 这个解决的:

1
2
3
4
5
sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t DemoTopic # 执行此指令,创建 DemoTopic
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create topic to localhost:10911 success.
TopicConfig [topicName=DemoTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]

总结

之前在做 SOFATracer 集成消息组件时有看过 RocketMQ 的部分代码,但是在实际操作时还是饶了不少弯路。总体来看,SpringBoot 集成 RocketMQ 还是比较简单的,在此记录一下。如果文中有描述有误的地方,还请各位大佬留言指正。

参考文档

原文作者:GuoLei Song

原文链接:http://www.glmapper.com/2020/04/05/springboot/springboot-series-rocketmq/

发表日期:April 5th 2020, 7:39:17 pm

更新日期:October 28th 2020, 7:02:41 pm

版权声明:转载请注明出处

CATALOG
  1. 1. RocketMQ 快速开始
    1. 1.1. 下载安装和编译
  2. 2. SpringBoot 集成
    1. 2.1. 引入 RocketMQ 客户端依赖
    2. 2.2. 提供生产者的自动配置类
    3. 2.3. 提供消费者的自动配置类
    4. 2.4. 使用客户端发送消息
    5. 2.5. 测试
  3. 3. 使用 hook 拦截消息
    1. 3.1. SendMessageHook
    2. 3.2. ConsumeMessageHook
  4. 4. 遇到的一些问题
  5. 5. 总结
  6. 6. 参考文档