kafka实践

Kafka实践及代码示例

简介

Apache Kafka是一款实时数据流处理中心,可以快速高效地处理大规模、高吞吐量的数据流,由LinkedIn于2011年开发,后于2012年成为了Apache基金会的一个顶级开源项目。

Kafka以分布式消息队列的形式提供了高可靠(容错性高,支持数据冗余备份)、高伸缩性(支持多节点、多分区)、高吞吐率和低延迟的特性,适用于构建实时数据流的处理方案,被广泛应用于互联网、金融、电商等领域。

本文将介绍如何使用Java代码进行Kafka实践,包括Kafka配置、消息的生产者和消费者的实现。

环境搭建

首先,需要搭建Kafka运行环境,具体步骤请参考Kafka官方文档

Kafka配置

Kafka的配置文件位于config/server.properties,可以根据需要进行修改。

以下是常用的配置项及其含义:

  • broker.id:Broker的唯一标识符,具有唯一性;
  • listeners:Broker监听的网络地址;
  • advertised.listeners:供客户端访问的Broker地址;
  • num.network.threads:处理网络请求的线程数;
  • num.io.threads:处理磁盘IO的线程数;
  • socket.send.buffer.bytes:发送数据缓冲区大小;
  • socket.receive.buffer.bytes:接收数据缓冲区大小;
  • socket.request.max.bytes:最大请求大小;
  • num.partitions:每个Topic的分区数;
  • log.dirs:Topic数据存储路径;
  • auto.create.topics.enable:是否允许自动创建Topic;
  • delete.topic.enable:是否允许删除Topic;
  • group.initial.rebalance.delay.ms:Consumer组初始负载均衡延迟时间;
  • offsets.topic.replication.factor:偏移量Topic的副本因子。

Kafka生产者

Kafka生产者用于向指定的Topic中发送消息,Java代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerDemo {
private static final String TOPIC_NAME = "test_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
producer.send(new ProducerRecord<>(TOPIC_NAME, message));
System.out.println("Sent:" + message);
}
producer.close();
}
}

以上代码实现了一个简单的Kafka生产者,可以向指定的TOPIC_NAME中发送10条消息。具体参数配置说明如下:

  • bootstrap.servers:Kafka集群地址;
  • acks:消息确认模式;
    • 0:不等待Broker的任何确认;
    • 1:等待Broker确认消息已写入本地磁盘;
    • all:等待Broker确认消息已写入所有ISR副本;
  • retries:消息发送失败后的重试次数;
  • batch.size:消息批处理大小,单位为字节;
  • linger.ms:延迟发送消息的时间,单位为毫秒;
  • buffer.memory:生产者可用的内存缓存大小,单位为字节;
  • key.serializer:序列化消息键的方式;
  • value.serializer:序列化消息值的方式。

Kafka消费者

Kafka消费者用于从指定的Topic中消费消息,Java代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {
private static final String TOPIC_NAME = "test_topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "test_group";

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("group.id", GROUP_ID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: value = %s, topic = %s, partition = %d, offset = %d%n",
record.value(), record.topic(), record.partition(), record.offset());
}
consumer.commitAsync();
}
}
}

以上代码实现了一个简单的Kafka消费者,可以从指定的TOPIC_NAME中消费消息。具体参数配置说明如下:

  • bootstrap.servers:Kafka集群地址;
  • group.id:消费者组的唯一标识符;
  • enable.auto.commit:是否开启自动提交偏移量;
  • auto.commit.interval.ms:自动提交偏移量的时间间隔,单位为毫秒;
  • key.deserializer:消息键的反序列化方式;
  • value.deserializer:消息值的反序列化方式。

总结

Kafka是一款高可靠、高伸缩性、高吞吐率和低延迟的分布式消息队列,广泛应用于互联网、金融、电商等领域。本文介绍了Kafka的基本概念及其Java代码实践,希望对读者有所帮助。

作者

Mozss

发布于

2020-09-01

许可协议

评论