2020年

2020

  • 思绪万千做总结

  • 春日同心战疫魔

  • 盛夏相拥闻香气

  • 秋雨窗前争闲事

  • 寒风绵绵月朦胧

  • 今年难忘处处情

  • 欲说还休此话长

  • 万语千言不可传

  • 道路且长长相忆

(PS: 2020年终, 再一次希望疫情早日结束!!!)

nio的实践

Java NIO 实践

Java NIO(New IO)是Java SE1.4后新的IO API。NIO提供了与传统的IO API(Java IO)相比,更快速,更灵活的IO操作。

NIO与IO的差异

IO的缺陷

IO的读写是阻塞操作,即在读写数据的过程中,线程会被一直阻塞,无法进行其他操作。因此,IO的吞吐量受限,无法支持高并发的情况。

NIO的优点

NIO使用了非阻塞IO来提高系统的IO性能。非阻塞IO的特点是在读写数据时,线程不会一直被阻塞,而是可以进行其他操作。这样,你可以使用一个线程来处理多个IO连接,提高了系统的吞吐量。

NIO的另一个优点是它的IO操作是面向缓冲区的。这就意味着数据是从缓冲区读取和写入的,而不是一个字节一个字节的读写。这减少了内存的拷贝次数,从而提高了系统的IO性能。

NIO的核心组件

缓冲区 Buffer

在NIO中,所有的数据都是从缓冲区读取或写入的。Buffer是一个字节数组或其他基本类型数组的容器。

Buffer的主要属性有:

  • capacity: 缓冲区的容量
  • position: 下一个要读或写的位置
  • limit: 缓冲区的限制
  • mark: 缓冲区的标记

Buffer的常用类型有:ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。

通道 Channel

在NIO中,所有的IO操作都是通过通道(Channel)来完成的。通道是一个双向的数据传输通道,可以完成读和写的操作。

通道的主要实现类有:FileChannel、DatagramChannel、SocketChannel和ServerSocketChannel。

选择器 Selector

Selector是NIO的多路复用器,它可以监控多个通道的IO事件,例如连接、读取和写入事件,使得一个线程可以同时处理多个IO连接。

NIO的实践

在本次实践中,将使用NIO实现一个简单的服务端和客户端通信的功能。

服务端

服务端的功能是监听客户端的请求,并返回当前系统时间给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class NIOServer {

public static void main(String[] args) throws IOException {
// 打开ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 绑定到指定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
// 配置为非阻塞模式
serverSocketChannel.configureBlocking(false);

// 创建一个Selector
Selector selector = Selector.open();
// 将ServerSocketChannel注册到Selector,并监听连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
// 阻塞等待IO事件
selector.select();

// 获取所有发生的IO事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 将已处理的事件移除
iterator.remove();

if (key.isAcceptable()) {
// 有新连接事件
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
// 获取SocketChannel通道
SocketChannel socketChannel = channel.accept();
// 配置为非阻塞模式
socketChannel.configureBlocking(false);
// 将SocketChannel注册到Selector,并监听读取事件
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 有可读事件
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取数据到缓冲区
socketChannel.read(buffer);

buffer.flip();// 切换为读取模式
byte[] bytes = buffer.array();
String request = new String(bytes).trim();
System.out.printf("request: %s\n", request);

// 处理请求
byte[] responseBytes = new Date().toString().getBytes();
ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
// 返回响应数据
socketChannel.write(responseBuffer);
}
}
}
}
}

客户端

客户端的功能是连接服务端,并向服务端发送请求,获取服务器返回的时间信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NIOClient {

public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
socketChannel.configureBlocking(false);

ByteBuffer buffer = ByteBuffer.wrap("Hello, server!".getBytes());
// 向服务端发送请求
socketChannel.write(buffer);
System.out.println("Send request to server success.");

ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
// 读取服务端的响应数据
socketChannel.read(responseBuffer);

responseBuffer.flip();// 切换为读取模式
byte[] bytes = responseBuffer.array();
String response = new String(bytes).trim();

System.out.println("Server response: " + response);

socketChannel.close();
}
}

总结

通过这次实践,我们学会了如何使用Java NIO实现一个简单的服务端和客户端通信的功能。NIO的非阻塞IO模式和多路复用机制使得它能够支持更高效的IO操作,适用于高并发的系统场景。

nio的入门

Java NIO 入门指南

Java NIO (Non-blocking I/O) 是 Java 平台中的一种 I/O 模型,允许使用非阻塞 I/O 方式进行高效的 I/O 操作。与传统的 I/O 模型相比,Java NIO 提供了更加底层的控制,更灵活的选择器和更高效的缓冲区管理,从而实现更高效的 I/O 操作。

Java NIO 的核心组件

Java NIO 的核心组件包括以下几个部分:

  • 缓冲区 (Buffer): NIO 中的所有 I/O 操作都通过缓冲区进行,缓冲区是对数据对象的封装。

  • 通道 (Channel): NIO 中数据的读写操作是通过通道进行的,和传统的流不同,通道是可以双向的。

  • 选择器 (Selector): 选择器是用于检测通道是否准备好读或写的 I/O 操作。多个通道可以注册到同一个选择器中,因此只需要一个线程就可以处理多个通道。

NIO 缓冲区

在 NIO 中所有数据的读写操作都是通过缓冲区来完成的。缓冲区有以下几种类型:

  • ByteBuffer: 字节缓冲区

  • CharBuffer: 字符缓冲区

  • ShortBuffer: 短整型缓冲区

  • IntBuffer: 整型缓冲区

  • LongBuffer: 长整型缓冲区

  • FloatBuffer: 浮点型缓冲区

  • DoubleBuffer: 双精度型缓冲区

创建缓冲区

在 NIO 中缓冲区的创建一般有两种方式:

1
2
3
4
5
// 创建一个 ByteBuffer 缓冲区,默认容量为 1024 字节
ByteBuffer buffer = ByteBuffer.allocate(1024);

// 创建一个 ByteBuffer 缓冲区,容量为 1024 字节,限制为 512 字节
ByteBuffer buffer2 = ByteBuffer.allocate(1024).limit(512);

缓冲区的属性

在 NIO 中缓冲区的属性有以下几种:

  • Capacity: 缓冲区的容量

  • Position: 缓冲区当前的位置,下一个要读或写的位置

  • Limit: 缓冲区的限制,不能读写超过这个位置

  • Mark: 缓冲区的标记,用于记录当前位置

缓冲区的读写操作

在 NIO 中缓冲区的读写操作都是通过方法来实现的,常见的操作有:

1
2
3
4
5
6
7
// 写入数据到缓冲区
buffer.put((byte) 1);
buffer.putInt(10);

// 从缓冲区读取数据
byte b = buffer.get();
int i = buffer.getInt();

缓冲区的切换和复制

缓冲区之间的切换和复制是非常常见的操作,可以使用以下方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 切换到读模式
buffer.flip();

// 切换到写模式
buffer.clear();

// 切换到读模式并标记当前位置
buffer.mark();

// 重置到标记位置
buffer.reset();

// 复制原缓冲区
buffer.copy();

// 复制子缓冲区
buffer.slice();

NIO 通道

在 NIO 中数据的读写操作都是通过通道来完成的。通道是一个双向的数据通路,可以以单独的 I/O 操作来读取和写入数据。

创建通道

在 NIO 中可以通过以下几种方式来创建通道:

1
2
3
4
5
6
7
8
9
10
11
// 创建一个文件输入通道
FileChannel inChannel = new FileInputStream("path/to/file").getChannel();

// 创建一个文件输出通道
FileChannel outChannel = new FileOutputStream("path/to/file").getChannel();

// 创建一个网络输入通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8080));

// 创建一个网络输出通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8080));

通道的读写操作

在 NIO 中数据的读写操作都是通过通道来完成的,通道的读写操作一般使用缓冲区来进行:

1
2
3
4
5
6
// 写入数据到通道
channel.write(ByteBuffer.wrap("Hello, World!".getBytes()));

// 从通道读取数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = channel.read(buffer);

通道的类型

在 NIO 中通道的类型分为以下几种:

  • FileChannel: 文件通道,用于读写文件数据

  • DatagramChannel: 数据报通道,用于 UDP 连接

  • SocketChannel: 套接字通道,用于 TCP 连接

  • ServerSocketChannel: 服务套接字通道,用于监听客户端连接

NIO 选择器

在 NIO 中选择器是用于检测通道是否准备好读或写的 I/O 操作。选择器允许将多个通道注册到同一个选择器中,同一个线程就可以同时处理多个通道的 I/O 操作,从而提高 I/O 操作的效率。

创建选择器

在 NIO 中可以通过以下方法来创建选择器:

1
Selector selector = Selector.open();

注册通道

在 NIO 中可以通过以下方法将通道注册到选择器中:

1
channel.register(selector, SelectionKey.OP_READ);

选择器的工作方式

选择器一般使用循环来进行工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 阻塞等待一个或多个通道准备好读或写
int select = selector.select();

// 获取选择器中已经准备好的通道
Set<SelectionKey> selectedKeys = selector.selectedKeys();

for (SelectionKey key : selectedKeys) {
if (key.isReadable()) {
// 处理通道的读操作
} else if (key.isWritable()) {
// 处理通道的写操作
}
}

总结

Java NIO 提供了底层的 I/O 操作,允许使用非阻塞的方式进行高效的数据读写。熟练掌握 Java NIO 可以大大提高代码的性能和效率,建议对 Java NIO 进行深入学习并应用到实际项目中。

kafka入门

Kafka 相关概念

Kafka 是一款高吞吐量、分布式、可持久化的消息系统,具有以下特点:

  • 高吞吐量:Kafka 可以处理大量的消息流,每秒能够处理上百万条消息。
  • 分布式系统:Kafka 可以在多台服务器上运行,实现消息的分布式处理和存储。
  • 可持久化的消息存储:Kafka 通过消息存储机制,保证消息的可靠传递和持久化存储。

本文将介绍 Kafka 的相关概念。

Topic

Topic 是 Kafka 中消息发送和接收的单位,每个 Topic 包含多个消息,每个消息由一个 Key 和一个 Value 组成,通常 Key 用于分割消息流,Value 用于存储具体的消息内容。

  • 创建 Topic:可以通过 Kafka 的命令行工具或者 API 来创建 Topic。
  • 分区:每个 Topic 可以分为多个分区,每个分区可以在不同节点上存储和处理,实现消息的并行处理。
  • 副本:为了保证消息的可靠传递,每个分区可以有多个副本,分布在不同的节点上,当主副本故障时,备用副本可以接替主副本的功能。

Producer

Producer 是生产者,负责向 Kafka 发送消息并将消息写入 Topic 的分区中。

  • 发送消息:Producer 可以通过 Kafka 的 API 来发送消息,向指定 Topic 发送消息。
  • 确认机制:Producer 发送消息后,可以通过确认机制获得发送消息的状态。

Consumer

Consumer 是消费者,负责从 Kafka 中读取消息并处理消息。

  • 订阅消息:Consumer 可以通过 Kafka 的 API 来订阅 Topic 上的消息。
  • 分配分区:当 Consumer 订阅某个 Topic 后,需要为其分配分区,以实现消息的并行消费。
  • 确认机制:消费者也可以通过确认机制来确认收到的消息,从而保证消息的可靠接收和处理。

Broker

Broker 是 Kafka 的消息中间件,负责接收和处理 Producer 发送的消息、为 Consumer 提供消息服务。

  • 存储和处理:Broker 负责存储和处理 Kafka 中的消息,实现消息的持久化存储和并行处理。
  • 管理 Topic、Partition 和 Replica:Broker 具有管理 Topic、Partition 和 Replica 的功能,可以实现动态扩展和缩小 Kafka 集群的节点数量,以适应不同的业务需求。
  • 队列存储:Broker 使用队列存储的方式,实现高性能的消息处理和传递。

ZooKeeper

ZooKeeper 是分布式系统协同服务,Kafka 集群中需要使用 ZooKeeper 来进行分布式协调和管理。

  • 为 Kafka 派发 Broker ID:ZooKeeper 可以为 Kafka Server 派发全局唯一的 Broker ID,以实现集群内部的节点通信。
  • 管理元数据:ZooKeeper 管理了 Kafka 集群中的元数据,包括 Topic、Partition、Broker 等信息。
  • 监控:ZooKeeper 可以对 Kafka 集群的状态进行监控和统计,发现和解决问题。

Conclusion

Kafka 是一款高性能、分布式的消息中间件,支持可靠,高容错性的消息处理,并可轻松与各种数据系统集成。通过学习 Kafka 的相关概念,可以更好地了解和掌握 Kafka 的使用和实践。

kafka架构

Kafka 架构

Kafka 架构

Kafka 架构主要分为以下几个部分:

  1. Topic:消息的录入点,每个 Topic 可以被分成多个 Partition,每个 Partition 可以存储多个消息。
  2. Producer:消息的生产者,负责往 Kafka 集群中的 Topic 发送消息,将消息放到指定的 Partition 中。
  3. Broker:消息经过 Producer 生产后,会发送到多个 Broker 上,Broker 维护了消息的持久化存储和 Snapshot 文件,使得 Kafka 集群可以在 Broker 崩溃或网络故障的情况下依然可以保证消息的持久性。
  4. Consumer:消息的消费者,从指定的 Topic 的 Partition 中消费消息,消费后的消息将不能被再次消费,一旦消息被消费,该消费者就读取该消息的 Offset 位置,以便下一次从该位置开始消费。
  5. ZooKeeper:用于管理 Broker 集群,负责维护集群中每个 Broker 的元数据,并且在 Broker 发生变化时通知 Consumer。

Kafka 的工作流程

  1. Producer 将消息发送到 Broker。
  2. Broker 将消息持久化到磁盘,并将消息存储到对应的 Partition 中。
  3. Consumer 从指定的 Partition 中消费消息,并将 Offset 位置保存到 ZooKeeper 中。
  4. 当有新消息到达时,Consumer 将自动从上一次消费的 Offset 位置继续消费。

Java 代码示例

以下是一个简单的 Java 代码示例,展示了如何使用 Kafka 的 Producer 模块发送消息到指定的 Topic 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 配置 Kafka 生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 创建 Kafka 生产者
Producer<String, String> producer = new KafkaProducer<>(props);

// 生产并发送消息
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
}

// 关闭 Kafka 生产者
producer.close();

结语

Kafka 是一个强大的消息引擎,它可以使得大规模数据处理变得更加高效、灵活,适合于各种大数据处理场景。希望本文能为大家对 Kafka 的理解和应用提供帮助。