rocketmq概念

RocketMQ消息中间件概念

你有没有想过,如果我们需要向成千上万的用户发送消息,该怎么办呢?这个时候,就需要一个高效而可靠的消息中间件来进行通信。而RocketMQ就是其中之一。

什么是RocketMQ?

RocketMQ是一个开源的分布式消息中间件,它最初由阿里巴巴集团开发并开源。它提供了高可靠性、高吞吐量、低延迟的分布式消息发布/订阅系统,广泛用于企业级分布式架构中。

怎么使用?

首先,您需要在代码中引入RocketMQ的Java客户端。接下来,您需要使用Producer对象来发送消息,或者使用Consumer对象来接收消息。

发送消息

1
2
3
4
5
6
7
8
9
10
// 对象实例化
DefaultMQProducer producer = new DefaultMQProducer("Group Name");
producer.setNamesrvAddr("IP Address:Port");
producer.start();
// 创建消息实例
Message message = new Message("Topic Name","Tag Name","Message Body".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult result = producer.send(message);
// 关闭生产端实例
producer.shutdown();

接收消息

1
2
3
4
5
6
7
8
9
10
11
// 创建消费者实例
DefaultMQConsumer consumer = new DefaultMQConsumer("Group Name");
consumer.setNamesrvAddr("IP Address:Port");
consumer.subscribe("Topic Name","Tag Name");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for(MessageExt message:msgs){
System.out.println(new String(message.getBody()));
}
});
// 启动消费者实例
consumer.start();

最后

RocketMQ是一个强大的消息中间件,可帮助您实现高效、可靠的信息交流。它易于使用,广泛应用于企业级架构中。如果您需要处理大量数据、高并发请求等情况,RocketMQ值得考虑。

juc-12

需要解决的问题

实例不一致 资源问题

提升性能

部分:Immutable 整体:Read-Write Lock模式

提高响应性

尽可能缩小临界区的范围,降低线程冲突的概率,这样就可以抑制性能的下降。

Thread-Per-Message

多线程的评价标准:四个指标

  • 安全性
  • 生存性
  • 可复用性
  • 性能

死锁

juc-11

线程的优先级

Java的优先级只在特定的Java平台运行环境起作用

Java的优先级只在特定的运行环境(JVM的实现和版本,以及操作系统)有效,Java规范中提到”优先级高的线程先执行”, 并没有写”优先”
的具体体系,因此没有太多的意义.

便是优先级的静态字段

Java线程的优先级是整数值(int)

  • Thread.MIN_PRIORITY: 最低
  • Thread.NORM_PRIORITY:默认
  • Thread.MAX_PRIORITY:最高

设置优先级的方法

setPriority方法用于设置优先级,是Thread类的实例方法.

获取优先级的方法

getPriority方法用于获取优先级,是Thread类的实例方法.

juc-10

关于java.lang.ThreadLocal类

java.lang.ThreadLocal就是储物间

java.lang.ThreadLocal与泛型

juc-09

java.util.concurrent包与线程同步

java.util.concurrent.CountDownLatch类

java.util.concurrent.CyclicBarrier类

juc-08

中断状态与InterruptedException异常的相互转换

中断状态->InterruptedException异常的转换

InterruptedException异常->中断状态的转换

InterruptedException异常->InterruptedException异常的转换

juc-07

理解InterruptedException异常

可能会花费时间,但可以取消

当习惯Java多线程设计之后,我们会留意方法后面是否加了throws InterruptedException. 如果方法加了,则表明该方法(
或该方法进一步调用的方法中)可能会抛出InterruptedException异常.

这里包含两层意思:

  • “花费时间”的方法
  • “可以取消”的方法 换言之,加了throws InterruptedException的方法可能会花费时间,当可以取消.

加了throws InterruptedException的方法

  • java.lang.Object类的wait方法
  • java.lang.Thread类的sleep方法
  • java.lang.Thread类的join方法

花费时间的方法

可以取消的方法

sleep方法和interrupt方法

wait方法和interrupt方法

join方法和interrupt方法

interrupt方法只是改变中断状态

isInterrupted方法: 检查中断状态

Thread.interrupted方法: 检查并清除中断状态

不可用使用Thread类的stop方法

juc-06

集合类与多线程

管理多个实例的接口或类统称为集合(collection)。

Java中的大部分集合都是非线程安全的。因此,在多线程操作集合的时候,需要去查看API文档, 确认要用的类或接口是否线程安全的。

例子1:非线程安全的java.util.ArrayList类

该例子中ArrayList(及迭代器)在被多个线程同时读写而失去安全性时,便会抛出ConcurrentModificationException异常。
该运行时(runtime)的异常用于表示“执行并发修改了”。

异常不过是调查Bug根本原因的提示而已,所以编写编程不能依赖于抛出的异常。

集合类与多线程-例子
例子

例子2: 利用Collections.synchronized方法所进行的同步

对例子1的改造,使得其具有安全性。

例子2

  • 通过Collections.synchronizedList方法同步ArrayList实例
  • 通过使用list同步后的读线程
    • “写”线程是显示调用add方法和remove方法,故可以不做调整。
1
2
3
4
5
synchronized(list){
for(int n : list){
System.out.println(n);
}
}

例子3: 使用写时复制(copy-on-write)的java.util.concurrent.CopyOnWriteArrayList类

例子2使用Collections.synchronized进行同步。
这里使用CopyOnWriteArrayList类通过copy-on-write避免读写冲突。

1
2
3
4
5
6
7
8
public class Main {
public static void main(String[] args) {
//这里使用Copy-on-write
final List<Integer> list = new CopyOnWriteArrayList<Integer>();
new WriterThread(list).start();
new ReaderThread(list).start();
}
}

程序如果频繁“写”操作,使用copy-on-write会比较耗时,如果写操作比较少,读操作比较多,是比较适合使用的。