rocketmq架构

架构图

RocketMQ架构图

组件说明

NameServer

NameServer是RocketMQ Broker的路由信息注册中心。它将各个Broker的Cluster信息、Topic信息、路由信息等存储在内存中,提供查询服务给Producer和Consumer。

Broker

Broker是RocketMQ系统的核心组件,它是负责消息的存储、传输和处理的服务。每个Broker都包含了Message Store、消息队列、消费队列和消息处理线程。

Producer

Producer是消息的发送端,向某个Topic发送消息,消息将由NameServer查询到对应的Broker,然后通过网络传输到Broker。Producer将数据源封装成消息后发送给Broker。

Consumer

Consumer是消息的接收端,向Broker订阅某个Topic,消费消费队列中的消息。消息处理完成后,Consumer同步向Broker发送消息确认信息。

Java代码示例

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RocketMQProducer {
public static void main(String[] args) throws MQClientException {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("group");
// 指定NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();

try {
// 创建消息实例
Message message = new Message("topic", "tag", "Hello RocketMQ".getBytes());
// 发送消息
SendResult result = producer.send(message);
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭生产者
producer.shutdown();
}
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
// 指定NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topic和Tag
consumer.subscribe("topic", "tag");
// 注册消息监听器
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
// 处理消息
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
}
}

rocketmq概念

RocketMQ消息中间件概念

你有没有想过,如果我们需要向成千上万的用户发送消息,该怎么办呢?这个时候,就需要一个高效而可靠的消息中间件来进行通信。而RocketMQ就是其中之一。

什么是RocketMQ?

RocketMQ是一个开源的分布式消息中间件,它最初由阿里巴巴集团开发并开源。它提供了高可靠性、高吞吐量、低延迟的分布式消息发布/订阅系统,广泛用于企业级分布式架构中。

怎么使用?

首先,您需要在代码中引入RocketMQ的Java客户端。接下来,您需要使用Producer对象来发送消息,或者使用Consumer对象来接收消息。

发送消息

1
2
3
4
5
6
7
8
9
10
// 对象实例化
DefaultMQProducer producer = new DefaultMQProducer("Group Name");
producer.setNamesrvAddr("IP Address:Port");
producer.start();
// 创建消息实例
Message message = new Message("Topic Name","Tag Name","Message Body".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult result = producer.send(message);
// 关闭生产端实例
producer.shutdown();

接收消息

1
2
3
4
5
6
7
8
9
10
11
// 创建消费者实例
DefaultMQConsumer consumer = new DefaultMQConsumer("Group Name");
consumer.setNamesrvAddr("IP Address:Port");
consumer.subscribe("Topic Name","Tag Name");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for(MessageExt message:msgs){
System.out.println(new String(message.getBody()));
}
});
// 启动消费者实例
consumer.start();

最后

RocketMQ是一个强大的消息中间件,可帮助您实现高效、可靠的信息交流。它易于使用,广泛应用于企业级架构中。如果您需要处理大量数据、高并发请求等情况,RocketMQ值得考虑。