kafka实践

Kafka实践及代码示例

简介

Apache Kafka是一款实时数据流处理中心,可以快速高效地处理大规模、高吞吐量的数据流,由LinkedIn于2011年开发,后于2012年成为了Apache基金会的一个顶级开源项目。

Kafka以分布式消息队列的形式提供了高可靠(容错性高,支持数据冗余备份)、高伸缩性(支持多节点、多分区)、高吞吐率和低延迟的特性,适用于构建实时数据流的处理方案,被广泛应用于互联网、金融、电商等领域。

本文将介绍如何使用Java代码进行Kafka实践,包括Kafka配置、消息的生产者和消费者的实现。

环境搭建

首先,需要搭建Kafka运行环境,具体步骤请参考Kafka官方文档

Kafka配置

Kafka的配置文件位于config/server.properties,可以根据需要进行修改。

以下是常用的配置项及其含义:

  • broker.id:Broker的唯一标识符,具有唯一性;
  • listeners:Broker监听的网络地址;
  • advertised.listeners:供客户端访问的Broker地址;
  • num.network.threads:处理网络请求的线程数;
  • num.io.threads:处理磁盘IO的线程数;
  • socket.send.buffer.bytes:发送数据缓冲区大小;
  • socket.receive.buffer.bytes:接收数据缓冲区大小;
  • socket.request.max.bytes:最大请求大小;
  • num.partitions:每个Topic的分区数;
  • log.dirs:Topic数据存储路径;
  • auto.create.topics.enable:是否允许自动创建Topic;
  • delete.topic.enable:是否允许删除Topic;
  • group.initial.rebalance.delay.ms:Consumer组初始负载均衡延迟时间;
  • offsets.topic.replication.factor:偏移量Topic的副本因子。

Kafka生产者

Kafka生产者用于向指定的Topic中发送消息,Java代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerDemo {
private static final String TOPIC_NAME = "test_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
producer.send(new ProducerRecord<>(TOPIC_NAME, message));
System.out.println("Sent:" + message);
}
producer.close();
}
}

以上代码实现了一个简单的Kafka生产者,可以向指定的TOPIC_NAME中发送10条消息。具体参数配置说明如下:

  • bootstrap.servers:Kafka集群地址;
  • acks:消息确认模式;
    • 0:不等待Broker的任何确认;
    • 1:等待Broker确认消息已写入本地磁盘;
    • all:等待Broker确认消息已写入所有ISR副本;
  • retries:消息发送失败后的重试次数;
  • batch.size:消息批处理大小,单位为字节;
  • linger.ms:延迟发送消息的时间,单位为毫秒;
  • buffer.memory:生产者可用的内存缓存大小,单位为字节;
  • key.serializer:序列化消息键的方式;
  • value.serializer:序列化消息值的方式。

Kafka消费者

Kafka消费者用于从指定的Topic中消费消息,Java代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {
private static final String TOPIC_NAME = "test_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "test_group";

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("group.id", GROUP_ID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: value = %s, topic = %s, partition = %d, offset = %d%n",
record.value(), record.topic(), record.partition(), record.offset());
}
consumer.commitAsync();
}
}
}

以上代码实现了一个简单的Kafka消费者,可以从指定的TOPIC_NAME中消费消息。具体参数配置说明如下:

  • bootstrap.servers:Kafka集群地址;
  • group.id:消费者组的唯一标识符;
  • enable.auto.commit:是否开启自动提交偏移量;
  • auto.commit.interval.ms:自动提交偏移量的时间间隔,单位为毫秒;
  • key.deserializer:消息键的反序列化方式;
  • value.deserializer:消息值的反序列化方式。

总结

Kafka是一款高可靠、高伸缩性、高吞吐率和低延迟的分布式消息队列,广泛应用于互联网、金融、电商等领域。本文介绍了Kafka的基本概念及其Java代码实践,希望对读者有所帮助。

rocketmq架构

架构图

RocketMQ架构图

组件说明

NameServer

NameServer是RocketMQ Broker的路由信息注册中心。它将各个Broker的Cluster信息、Topic信息、路由信息等存储在内存中,提供查询服务给Producer和Consumer。

Broker

Broker是RocketMQ系统的核心组件,它是负责消息的存储、传输和处理的服务。每个Broker都包含了Message Store、消息队列、消费队列和消息处理线程。

Producer

Producer是消息的发送端,向某个Topic发送消息,消息将由NameServer查询到对应的Broker,然后通过网络传输到Broker。Producer将数据源封装成消息后发送给Broker。

Consumer

Consumer是消息的接收端,向Broker订阅某个Topic,消费消费队列中的消息。消息处理完成后,Consumer同步向Broker发送消息确认信息。

Java代码示例

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RocketMQProducer {
public static void main(String[] args) throws MQClientException {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("group");
// 指定NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();

try {
// 创建消息实例
Message message = new Message("topic", "tag", "Hello RocketMQ".getBytes());
// 发送消息
SendResult result = producer.send(message);
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭生产者
producer.shutdown();
}
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
// 指定NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅Topic和Tag
consumer.subscribe("topic", "tag");
// 注册消息监听器
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
// 处理消息
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
}
}

rocketmq概念

RocketMQ消息中间件概念

你有没有想过,如果我们需要向成千上万的用户发送消息,该怎么办呢?这个时候,就需要一个高效而可靠的消息中间件来进行通信。而RocketMQ就是其中之一。

什么是RocketMQ?

RocketMQ是一个开源的分布式消息中间件,它最初由阿里巴巴集团开发并开源。它提供了高可靠性、高吞吐量、低延迟的分布式消息发布/订阅系统,广泛用于企业级分布式架构中。

怎么使用?

首先,您需要在代码中引入RocketMQ的Java客户端。接下来,您需要使用Producer对象来发送消息,或者使用Consumer对象来接收消息。

发送消息

1
2
3
4
5
6
7
8
9
10
// 对象实例化
DefaultMQProducer producer = new DefaultMQProducer("Group Name");
producer.setNamesrvAddr("IP Address:Port");
producer.start();
// 创建消息实例
Message message = new Message("Topic Name","Tag Name","Message Body".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult result = producer.send(message);
// 关闭生产端实例
producer.shutdown();

接收消息

1
2
3
4
5
6
7
8
9
10
11
// 创建消费者实例
DefaultMQConsumer consumer = new DefaultMQConsumer("Group Name");
consumer.setNamesrvAddr("IP Address:Port");
consumer.subscribe("Topic Name","Tag Name");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for(MessageExt message:msgs){
System.out.println(new String(message.getBody()));
}
});
// 启动消费者实例
consumer.start();

最后

RocketMQ是一个强大的消息中间件,可帮助您实现高效、可靠的信息交流。它易于使用,广泛应用于企业级架构中。如果您需要处理大量数据、高并发请求等情况,RocketMQ值得考虑。

juc-12

需要解决的问题

实例不一致 资源问题

提升性能

部分:Immutable 整体:Read-Write Lock模式

提高响应性

尽可能缩小临界区的范围,降低线程冲突的概率,这样就可以抑制性能的下降。

Thread-Per-Message

多线程的评价标准:四个指标

  • 安全性
  • 生存性
  • 可复用性
  • 性能

死锁

juc-11

线程的优先级

Java的优先级只在特定的Java平台运行环境起作用

Java的优先级只在特定的运行环境(JVM的实现和版本,以及操作系统)有效,Java规范中提到”优先级高的线程先执行”, 并没有写”优先”
的具体体系,因此没有太多的意义.

便是优先级的静态字段

Java线程的优先级是整数值(int)

  • Thread.MIN_PRIORITY: 最低
  • Thread.NORM_PRIORITY:默认
  • Thread.MAX_PRIORITY:最高

设置优先级的方法

setPriority方法用于设置优先级,是Thread类的实例方法.

获取优先级的方法

getPriority方法用于获取优先级,是Thread类的实例方法.

juc-10

关于java.lang.ThreadLocal类

java.lang.ThreadLocal就是储物间

java.lang.ThreadLocal与泛型

juc-09

java.util.concurrent包与线程同步

java.util.concurrent.CountDownLatch类

java.util.concurrent.CyclicBarrier类

juc-08

中断状态与InterruptedException异常的相互转换

中断状态->InterruptedException异常的转换

InterruptedException异常->中断状态的转换

InterruptedException异常->InterruptedException异常的转换

juc-07

理解InterruptedException异常

可能会花费时间,但可以取消

当习惯Java多线程设计之后,我们会留意方法后面是否加了throws InterruptedException. 如果方法加了,则表明该方法(
或该方法进一步调用的方法中)可能会抛出InterruptedException异常.

这里包含两层意思:

  • “花费时间”的方法
  • “可以取消”的方法 换言之,加了throws InterruptedException的方法可能会花费时间,当可以取消.

加了throws InterruptedException的方法

  • java.lang.Object类的wait方法
  • java.lang.Thread类的sleep方法
  • java.lang.Thread类的join方法

花费时间的方法

可以取消的方法

sleep方法和interrupt方法

wait方法和interrupt方法

join方法和interrupt方法

interrupt方法只是改变中断状态

isInterrupted方法: 检查中断状态

Thread.interrupted方法: 检查并清除中断状态

不可用使用Thread类的stop方法