Jay's Blog

知而不行为不知


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 留言

  • 搜索

Spring 中的设计模式详解

发表于 2024-04-30 | 分类于 Java , 框架 , spring | 阅读次数:
字数统计: 4.4k 字 | 阅读时长 ≈ 16 分钟

“JDK 中用到了哪些设计模式? Spring 中用到了哪些设计模式? ”这两个问题,在面试中比较常见。

我在网上搜索了一下关于 Spring 中设计模式的讲解几乎都是千篇一律,而且大部分都年代久远。所以,花了几天时间自己总结了一下。

由于我的个人能力有限,文中如有任何错误各位都可以指出。另外,文章篇幅有限,对于设计模式以及一些源码的解读我只是一笔带过,这篇文章的主要目的是回顾一下 Spring 中的设计模式。

控制反转(IoC)和依赖注入(DI)

IoC(Inversion of Control,控制反转) 是 Spring 中一个非常非常重要的概念,它不是什么技术,而是一种解耦的设计思想。IoC 的主要目的是借助于“第三方”(Spring 中的 IoC 容器) 实现具有依赖关系的对象之间的解耦(IOC 容器管理对象,你只管使用即可),从而降低代码之间的耦合度。

IoC 是一个原则,而不是一个模式,以下模式(但不限于)实现了 IoC 原则。

ioc-patterns

Spring IoC 容器就像是一个工厂一样,当我们需要创建一个对象的时候,只需要配置好配置文件/注解即可,完全不用考虑对象是如何被创建出来的。 IoC 容器负责创建对象,将对象连接在一起,配置这些对象,并从创建中处理这些对象的整个生命周期,直到它们被完全销毁。

在实际项目中一个 Service 类如果有几百甚至上千个类作为它的底层,我们需要实例化这个 Service,你可能要每次都要搞清这个 Service 所有底层类的构造函数,这可能会把人逼疯。如果利用 IOC 的话,你只需要配置好,然后在需要的地方引用就行了,这大大增加了项目的可维护性且降低了开发难度。

关于 Spring IOC 的理解,推荐看这一下知乎的一个回答:https://www.zhihu.com/question/23277575/answer/169698662 ,非常不错。

控制反转怎么理解呢? 举个例子:”对象 a 依赖了对象 b,当对象 a 需要使用 对象 b 的时候必须自己去创建。但是当系统引入了 IOC 容器后, 对象 a 和对象 b 之间就失去了直接的联系。这个时候,当对象 a 需要使用 对象 b 的时候, 我们可以指定 IOC 容器去创建一个对象 b 注入到对象 a 中”。 对象 a 获得依赖对象 b 的过程,由主动行为变为了被动行为,控制权反转,这就是控制反转名字的由来。

DI(Dependency Inject,依赖注入)是实现控制反转的一种设计模式,依赖注入就是将实例变量传入到一个对象中去。

工厂设计模式

Spring 使用工厂模式可以通过 BeanFactory 或 ApplicationContext 创建 bean 对象。

两者对比:

  • BeanFactory:延迟注入(使用到某个 bean 的时候才会注入),相比于ApplicationContext 来说会占用更少的内存,程序启动速度更快。
  • ApplicationContext:容器启动的时候,不管你用没用到,一次性创建所有 bean 。BeanFactory 仅提供了最基本的依赖注入支持,ApplicationContext 扩展了 BeanFactory ,除了有BeanFactory的功能还有额外更多功能,所以一般开发人员使用ApplicationContext会更多。

ApplicationContext 的三个实现类:

  1. ClassPathXmlApplication:把上下文文件当成类路径资源。
  2. FileSystemXmlApplication:从文件系统中的 XML 文件载入上下文定义信息。
  3. XmlWebApplicationContext:从 Web 系统中的 XML 文件载入上下文定义信息。

Example:

1
2
3
4
5
6
7
8
9
10
11
12
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;

public class App {
public static void main(String[] args) {
ApplicationContext context = new FileSystemXmlApplicationContext(
"C:/work/IOC Containers/springframework.applicationcontext/src/main/resources/bean-factory-config.xml");

HelloApplicationContext obj = (HelloApplicationContext) context.getBean("helloApplicationContext");
obj.getMsg();
}
}

单例设计模式

在我们的系统中,有一些对象其实我们只需要一个,比如说:线程池、缓存、对话框、注册表、日志对象、充当打印机、显卡等设备驱动程序的对象。事实上,这一类对象只能有一个实例,如果制造出多个实例就可能会导致一些问题的产生,比如:程序的行为异常、资源使用过量、或者不一致性的结果。

使用单例模式的好处 :

  • 对于频繁使用的对象,可以省略创建对象所花费的时间,这对于那些重量级对象而言,是非常可观的一笔系统开销;
  • 由于 new 操作的次数减少,因而对系统内存的使用频率也会降低,这将减轻 GC 压力,缩短 GC 停顿时间。

Spring 中 bean 的默认作用域就是 singleton(单例)的。 除了 singleton 作用域,Spring 中 bean 还有下面几种作用域:

  • prototype : 每次获取都会创建一个新的 bean 实例。也就是说,连续 getBean() 两次,得到的是不同的 Bean 实例。
  • request (仅 Web 应用可用): 每一次 HTTP 请求都会产生一个新的 bean(请求 bean),该 bean 仅在当前 HTTP request 内有效。
  • session (仅 Web 应用可用) : 每一次来自新 session 的 HTTP 请求都会产生一个新的 bean(会话 bean),该 bean 仅在当前 HTTP session 内有效。
  • application/global-session (仅 Web 应用可用):每个 Web 应用在启动时创建一个 Bean(应用 Bean),,该 bean 仅在当前应用启动时间内有效。
  • websocket (仅 Web 应用可用):每一次 WebSocket 会话产生一个新的 bean。

Spring 通过 ConcurrentHashMap 实现单例注册表的特殊方式实现单例模式。

Spring 实现单例的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 通过 ConcurrentHashMap(线程安全) 实现单例注册表
private final Map<String, Object> singletonObjects = new ConcurrentHashMap<String, Object>(64);

public Object getSingleton(String beanName, ObjectFactory<?> singletonFactory) {
Assert.notNull(beanName, "'beanName' must not be null");
synchronized (this.singletonObjects) {
// 检查缓存中是否存在实例
Object singletonObject = this.singletonObjects.get(beanName);
if (singletonObject == null) {
//...省略了很多代码
try {
singletonObject = singletonFactory.getObject();
}
//...省略了很多代码
// 如果实例对象在不存在,我们注册到单例注册表中。
addSingleton(beanName, singletonObject);
}
return (singletonObject != NULL_OBJECT ? singletonObject : null);
}
}
//将对象添加到单例注册表
protected void addSingleton(String beanName, Object singletonObject) {
synchronized (this.singletonObjects) {
this.singletonObjects.put(beanName, (singletonObject != null ? singletonObject : NULL_OBJECT));

}
}
}

单例 Bean 存在线程安全问题吗?

大部分时候我们并没有在项目中使用多线程,所以很少有人会关注这个问题。单例 Bean 存在线程问题,主要是因为当多个线程操作同一个对象的时候是存在资源竞争的。

常见的有两种解决办法:

  1. 在 Bean 中尽量避免定义可变的成员变量。
  2. 在类中定义一个 ThreadLocal 成员变量,将需要的可变成员变量保存在 ThreadLocal 中(推荐的一种方式)。

不过,大部分 Bean 实际都是无状态(没有实例变量)的(比如 Dao、Service),这种情况下, Bean 是线程安全的。

代理设计模式

代理模式在 AOP 中的应用

AOP(Aspect-Oriented Programming,面向切面编程) 能够将那些与业务无关,却为业务模块所共同调用的逻辑或责任(例如事务处理、日志管理、权限控制等)封装起来,便于减少系统的重复代码,降低模块间的耦合度,并有利于未来的可拓展性和可维护性。

Spring AOP 就是基于动态代理的,如果要代理的对象,实现了某个接口,那么 Spring AOP 会使用 JDK Proxy 去创建代理对象,而对于没有实现接口的对象,就无法使用 JDK Proxy 去进行代理了,这时候 Spring AOP 会使用 Cglib 生成一个被代理对象的子类来作为代理,如下图所示:

SpringAOPProcess

当然,你也可以使用 AspectJ ,Spring AOP 已经集成了 AspectJ ,AspectJ 应该算的上是 Java 生态系统中最完整的 AOP 框架了。

使用 AOP 之后我们可以把一些通用功能抽象出来,在需要用到的地方直接使用即可,这样大大简化了代码量。我们需要增加新功能时也方便,这样也提高了系统扩展性。日志功能、事务管理等等场景都用到了 AOP 。

Spring AOP 和 AspectJ AOP 有什么区别?

Spring AOP 属于运行时增强,而 AspectJ 是编译时增强。 Spring AOP 基于代理(Proxying),而 AspectJ 基于字节码操作(Bytecode Manipulation)。

Spring AOP 已经集成了 AspectJ ,AspectJ 应该算的上是 Java 生态系统中最完整的 AOP 框架了。AspectJ 相比于 Spring AOP 功能更加强大,但是 Spring AOP 相对来说更简单,

如果我们的切面比较少,那么两者性能差异不大。但是,当切面太多的话,最好选择 AspectJ ,它比 Spring AOP 快很多。

模板方法

模板方法模式是一种行为设计模式,它定义一个操作中的算法的骨架,而将一些步骤延迟到子类中。 模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤的实现方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class Template {
//这是我们的模板方法
public final void TemplateMethod(){
PrimitiveOperation1();
PrimitiveOperation2();
PrimitiveOperation3();
}

protected void PrimitiveOperation1(){
//当前类实现
}

//被子类实现的方法
protected abstract void PrimitiveOperation2();
protected abstract void PrimitiveOperation3();

}
public class TemplateImpl extends Template {

@Override
public void PrimitiveOperation2() {
//当前类实现
}

@Override
public void PrimitiveOperation3() {
//当前类实现
}
}

Spring 中 JdbcTemplate、HibernateTemplate 等以 Template 结尾的对数据库操作的类,它们就使用到了模板模式。一般情况下,我们都是使用继承的方式来实现模板模式,但是 Spring 并没有使用这种方式,而是使用 Callback 模式与模板方法模式配合,既达到了代码复用的效果,同时增加了灵活性。

观察者模式

观察者模式是一种对象行为型模式。它表示的是一种对象与对象之间具有依赖关系,当一个对象发生改变的时候,依赖这个对象的所有对象也会做出反应。Spring 事件驱动模型就是观察者模式很经典的一个应用。Spring 事件驱动模型非常有用,在很多场景都可以解耦我们的代码。比如我们每次添加商品的时候都需要重新更新商品索引,这个时候就可以利用观察者模式来解决这个问题。

Spring 事件驱动模型中的三种角色

事件角色

ApplicationEvent (org.springframework.context包下)充当事件的角色,这是一个抽象类,它继承了java.util.EventObject并实现了 java.io.Serializable接口。

Spring 中默认存在以下事件,他们都是对 ApplicationContextEvent 的实现(继承自ApplicationContextEvent):

  • ContextStartedEvent:ApplicationContext 启动后触发的事件;
  • ContextStoppedEvent:ApplicationContext 停止后触发的事件;
  • ContextRefreshedEvent:ApplicationContext 初始化或刷新完成后触发的事件;
  • ContextClosedEvent:ApplicationContext 关闭后触发的事件。

ApplicationEvent-Subclass

事件监听者角色

ApplicationListener 充当了事件监听者角色,它是一个接口,里面只定义了一个 onApplicationEvent()方法来处理ApplicationEvent。ApplicationListener接口类源码如下,可以看出接口定义看出接口中的事件只要实现了 ApplicationEvent就可以了。所以,在 Spring 中我们只要实现 ApplicationListener 接口的 onApplicationEvent() 方法即可完成监听事件

1
2
3
4
5
6
package org.springframework.context;
import java.util.EventListener;
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E var1);
}

事件发布者角色

ApplicationEventPublisher 充当了事件的发布者,它也是一个接口。

1
2
3
4
5
6
7
8
9
@FunctionalInterface
public interface ApplicationEventPublisher {
default void publishEvent(ApplicationEvent event) {
this.publishEvent((Object)event);
}

void publishEvent(Object var1);
}

ApplicationEventPublisher 接口的publishEvent()这个方法在AbstractApplicationContext类中被实现,阅读这个方法的实现,你会发现实际上事件真正是通过ApplicationEventMulticaster来广播出去的。具体内容过多,就不在这里分析了,后面可能会单独写一篇文章提到。

Spring 的事件流程总结

  1. 定义一个事件: 实现一个继承自 ApplicationEvent,并且写相应的构造函数;
  2. 定义一个事件监听者:实现 ApplicationListener 接口,重写 onApplicationEvent() 方法;
  3. 使用事件发布者发布消息: 可以通过 ApplicationEventPublisher 的 publishEvent() 方法发布消息。

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 定义一个事件,继承自ApplicationEvent并且写相应的构造函数
public class DemoEvent extends ApplicationEvent{
private static final long serialVersionUID = 1L;

private String message;

public DemoEvent(Object source,String message){
super(source);
this.message = message;
}

public String getMessage() {
return message;
}


// 定义一个事件监听者,实现ApplicationListener接口,重写 onApplicationEvent() 方法;
@Component
public class DemoListener implements ApplicationListener<DemoEvent>{

//使用onApplicationEvent接收消息
@Override
public void onApplicationEvent(DemoEvent event) {
String msg = event.getMessage();
System.out.println("接收到的信息是:"+msg);
}

}
// 发布事件,可以通过ApplicationEventPublisher 的 publishEvent() 方法发布消息。
@Component
public class DemoPublisher {

@Autowired
ApplicationContext applicationContext;

public void publish(String message){
//发布事件
applicationContext.publishEvent(new DemoEvent(this, message));
}
}

当调用 DemoPublisher 的 publish() 方法的时候,比如 demoPublisher.publish("你好") ,控制台就会打印出:接收到的信息是:你好 。

适配器模式

适配器模式(Adapter Pattern) 将一个接口转换成客户希望的另一个接口,适配器模式使接口不兼容的那些类可以一起工作。

Spring AOP 中的适配器模式

我们知道 Spring AOP 的实现是基于代理模式,但是 Spring AOP 的增强或通知(Advice)使用到了适配器模式,与之相关的接口是AdvisorAdapter 。

Advice 常用的类型有:BeforeAdvice(目标方法调用前,前置通知)、AfterAdvice(目标方法调用后,后置通知)、AfterReturningAdvice(目标方法执行结束后,return 之前)等等。每个类型 Advice(通知)都有对应的拦截器:MethodBeforeAdviceInterceptor、AfterReturningAdviceInterceptor、ThrowsAdviceInterceptor 等等。

Spring 预定义的通知要通过对应的适配器,适配成 MethodInterceptor 接口(方法拦截器)类型的对象(如:MethodBeforeAdviceAdapter 通过调用 getInterceptor 方法,将 MethodBeforeAdvice 适配成 MethodBeforeAdviceInterceptor )。

Spring MVC 中的适配器模式

在 Spring MVC 中,DispatcherServlet 根据请求信息调用 HandlerMapping,解析请求对应的 Handler。解析到对应的 Handler(也就是我们平常说的 Controller 控制器)后,开始由HandlerAdapter 适配器处理。HandlerAdapter 作为期望接口,具体的适配器实现类用于对目标类进行适配,Controller 作为需要适配的类。

为什么要在 Spring MVC 中使用适配器模式?

Spring MVC 中的 Controller 种类众多,不同类型的 Controller 通过不同的方法来对请求进行处理。如果不利用适配器模式的话,DispatcherServlet 直接获取对应类型的 Controller,需要的自行来判断,像下面这段代码一样:

1
2
3
4
5
6
7
if(mappedHandler.getHandler() instanceof MultiActionController){
((MultiActionController)mappedHandler.getHandler()).xxx
}else if(mappedHandler.getHandler() instanceof XXX){
...
}else if(...){
...
}

假如我们再增加一个 Controller类型就要在上面代码中再加入一行 判断语句,这种形式就使得程序难以维护,也违反了设计模式中的开闭原则 – 对扩展开放,对修改关闭。

装饰者模式

装饰者模式可以动态地给对象添加一些额外的属性或行为。相比于使用继承,装饰者模式更加灵活。简单点儿说就是当我们需要修改原有的功能,但我们又不愿直接去修改原有的代码时,设计一个 Decorator 套在原有代码外面。其实在 JDK 中就有很多地方用到了装饰者模式,比如 InputStream家族,InputStream 类下有 FileInputStream (读取文件)、BufferedInputStream (增加缓存,使读取文件速度大大提升)等子类都在不修改InputStream 代码的情况下扩展了它的功能。

装饰者模式示意图

Spring 中配置 DataSource 的时候,DataSource 可能是不同的数据库和数据源。我们能否根据客户的需求在少修改原有类的代码下动态切换不同的数据源?这个时候就要用到装饰者模式(这一点我自己还没太理解具体原理)。Spring 中用到的包装器模式在类名上含有 Wrapper或者 Decorator。这些类基本上都是动态地给一个对象添加一些额外的职责

总结

Spring 框架中用到了哪些设计模式?

  • 工厂设计模式 : Spring 使用工厂模式通过 BeanFactory、ApplicationContext 创建 bean 对象。
  • 代理设计模式 : Spring AOP 功能的实现。
  • 单例设计模式 : Spring 中的 Bean 默认都是单例的。
  • 模板方法模式 : Spring 中 jdbcTemplate、hibernateTemplate 等以 Template 结尾的对数据库操作的类,它们就使用到了模板模式。
  • 包装器设计模式 : 我们的项目需要连接多个数据库,而且不同的客户在每次访问中根据需要会去访问不同的数据库。这种模式让我们可以根据客户的需求能够动态切换不同的数据源。
  • 观察者模式: Spring 事件驱动模型就是观察者模式很经典的一个应用。
  • 适配器模式 :Spring AOP 的增强或通知(Advice)使用到了适配器模式、spring MVC 中也是用到了适配器模式适配Controller。
  • ……

参考

  • 《Spring 技术内幕》
  • https://blog.eduonix.com/java-programming-2/learn-design-patterns-used-spring-framework/
  • https://www.tutorialsteacher.com/ioc/inversion-of-control
  • https://design-patterns.readthedocs.io/zh_CN/latest/behavioral_patterns/observer.html
  • https://juejin.im/post/5a8eb261f265da4e9e307230
  • https://juejin.im/post/5ba28986f265da0abc2b6084

JVM线上问题排查和性能调优案例

发表于 2024-04-23 | 分类于 Java , JVM | 阅读次数:
字数统计: 973 字 | 阅读时长 ≈ 3 分钟

JVM 线上问题排查和性能调优也是面试常问的一个问题,尤其是社招中大厂的面试。

这篇文章,我会分享一些我看到的相关的案例。

下面是正文。

一次线上 OOM 问题分析 - 艾小仙 - 2023

  • 现象:线上某个服务有接口非常慢,通过监控链路查看发现,中间的 GAP 时间非常大,实际接口并没有消耗很多时间,并且在那段时间里有很多这样的请求。
  • 分析:使用 JDK 自带的jvisualvm分析 dump 文件(MAT 也能分析)。
  • 建议:对于 SQL 语句,如果监测到没有where条件的全表查询应该默认增加一个合适的limit作为限制,防止这种问题拖垮整个系统
  • 资料:实战案例:记一次 dump 文件分析历程转载 - HeapDump - 2022。

生产事故-记一次特殊的 OOM 排查 - 程语有云 - 2023

  • 现象:网络没有问题的情况下,系统某开放接口从 2023 年 3 月 10 日 14 时许开始无法访问和使用。
  • 临时解决办法:紧急回滚至上一稳定版本。
  • 分析:使用 MAT (Memory Analyzer Tool)工具分析 dump 文件。
  • 建议:正常情况下,-Xmn参数(控制 Young 区的大小)总是应当小于-Xmx参数(控制堆内存的最大大小),否则就会触发 OOM 错误。
  • 资料:最重要的 JVM 参数总结 - JavaGuide - 2023

一次大量 JVM Native 内存泄露的排查分析(64M 问题) - 掘金 - 2022

  • 现象:线上项目刚启动完使用 top 命令查看 RES 占用了超过 1.5G。
  • 分析:整个分析流程用到了较多工作,可以跟着作者思路一步一步来,值得学习借鉴。
  • 建议:远离 Hibernate。
  • 资料:Linux top 命令里的内存相关字段(VIRT, RES, SHR, CODE, DATA)

YGC 问题排查,又让我涨姿势了! - IT 人的职场进阶 - 2021

  • 现象:广告服务在新版本上线后,收到了大量的服务超时告警。
  • 分析:使用 MAT (Memory Analyzer Tool) 工具分析 dump 文件。
  • 建议:学会 YGC(Young GC) 问题的排查思路,掌握 YGC 的相关知识点。

听说 JVM 性能优化很难?今天我小试了一把! - 陈树义 - 2021

通过观察 GC 频率和停顿时间,来进行 JVM 内存空间调整,使其达到最合理的状态。调整过程记得小步快跑,避免内存剧烈波动影响线上服务。 这其实是最为简单的一种 JVM 性能调优方式了,可以算是粗调吧。

你们要的线上 GC 问题案例来啦 - 编了个程 - 2021

  • 案例 1:使用 guava cache 的时候,没有设置最大缓存数量和弱引用,导致频繁触发 Young GC
  • 案例 2: 对于一个查询和排序分页的 SQL,同时这个 SQL 需要 join 多张表,在分库分表下,直接调用 SQL 性能很差。于是,查单表,再在内存排序分页,用了一个 List 来保存数据,而有些数据量大,造成了这个现象。

Java 中 9 种常见的 CMS GC 问题分析与解决 - 美团技术团 - 2020

这篇文章共 2w+ 字,详细介绍了 GC 基础,总结了 CMS GC 的一些常见问题分析与解决办法。

给祖传系统做了点 GC 调优,暂停时间降低了 90% - 京东云技术团队 - 2023

这篇文章提到了一个在规则引擎系统中遇到的 GC(垃圾回收)问题,主要表现为系统在启动后发生了一次较长的 Young GC(年轻代垃圾回收)导致性能下降。经过分析,问题的核心在于动态对象年龄判定机制,它导致了过早的对象晋升,引起了长时间的垃圾回收。

布隆过滤器

发表于 2024-04-11 | 分类于 数据结构 | 阅读次数:
字数统计: 3.1k 字 | 阅读时长 ≈ 12 分钟

布隆过滤器相信大家没用过的话,也已经听过了。

布隆过滤器主要是为了解决海量数据的存在性问题。对于海量数据中判定某个数据是否存在且容忍轻微误差这一场景(比如缓存穿透、海量数据去重)来说,非常适合。

文章内容概览:

  1. 什么是布隆过滤器?
  2. 布隆过滤器的原理介绍。
  3. 布隆过滤器使用场景。
  4. 通过 Java 编程手动实现布隆过滤器。
  5. 利用 Google 开源的 Guava 中自带的布隆过滤器。
  6. Redis 中的布隆过滤器。

什么是布隆过滤器?

首先,我们需要了解布隆过滤器的概念。

布隆过滤器(Bloom Filter,BF)是一个叫做 Bloom 的老哥于 1970 年提出的。我们可以把它看作由二进制向量(或者说位数组)和一系列随机映射函数(哈希函数)两部分组成的数据结构。相比于我们平时常用的 List、Map、Set 等数据结构,它占用空间更少并且效率更高,但是缺点是其返回的结果是概率性的,而不是非常准确的。理论情况下添加到集合中的元素越多,误报的可能性就越大。并且,存放在布隆过滤器的数据不容易删除。

Bloom Filter 会使用一个较大的 bit 数组来保存所有的数据,数组中的每个元素都只占用 1 bit ,并且每个元素只能是 0 或者 1(代表 false 或者 true),这也是 Bloom Filter 节省内存的核心所在。这样来算的话,申请一个 100w 个元素的位数组只占用 1000000Bit / 8 = 125000 Byte = 125000/1024 KB ≈ 122KB 的空间。

位数组

总结:一个名叫 Bloom 的人提出了一种来检索元素是否在给定大集合中的数据结构,这种数据结构是高效且性能很好的,但缺点是具有一定的错误识别率和删除难度。并且,理论情况下,添加到集合中的元素越多,误报的可能性就越大。

布隆过滤器的原理介绍

当一个元素加入布隆过滤器中的时候,会进行如下操作:

  1. 使用布隆过滤器中的哈希函数对元素值进行计算,得到哈希值(有几个哈希函数得到几个哈希值)。
  2. 根据得到的哈希值,在位数组中把对应下标的值置为 1。

当我们需要判断一个元素是否存在于布隆过滤器的时候,会进行如下操作:

  1. 对给定元素再次进行相同的哈希计算;
  2. 得到值之后判断位数组中的每个元素是否都为 1,如果值都为 1,那么说明这个值在布隆过滤器中,如果存在一个值不为 1,说明该元素不在布隆过滤器中。

Bloom Filter 的简单原理图如下:

Bloom Filter 的简单原理示意图

如图所示,当字符串存储要加入到布隆过滤器中时,该字符串首先由多个哈希函数生成不同的哈希值,然后将对应的位数组的下标设置为 1(当位数组初始化时,所有位置均为 0)。当第二次存储相同字符串时,因为先前的对应位置已设置为 1,所以很容易知道此值已经存在(去重非常方便)。

如果我们需要判断某个字符串是否在布隆过滤器中时,只需要对给定字符串再次进行相同的哈希计算,得到值之后判断位数组中的每个元素是否都为 1,如果值都为 1,那么说明这个值在布隆过滤器中,如果存在一个值不为 1,说明该元素不在布隆过滤器中。

不同的字符串可能哈希出来的位置相同,这种情况我们可以适当增加位数组大小或者调整我们的哈希函数。

综上,我们可以得出:布隆过滤器说某个元素存在,小概率会误判。布隆过滤器说某个元素不在,那么这个元素一定不在。

布隆过滤器使用场景

  1. 判断给定数据是否存在:比如判断一个数字是否存在于包含大量数字的数字集中(数字集很大,上亿)、 防止缓存穿透(判断请求的数据是否有效避免直接绕过缓存请求数据库)等等、邮箱的垃圾邮件过滤(判断一个邮件地址是否在垃圾邮件列表中)、黑名单功能(判断一个 IP 地址或手机号码是否在黑名单中)等等。
  2. 去重:比如爬给定网址的时候对已经爬取过的 URL 去重、对巨量的 QQ 号/订单号去重。

去重场景也需要用到判断给定数据是否存在,因此布隆过滤器主要是为了解决海量数据的存在性问题。

编码实战

通过 Java 编程手动实现布隆过滤器

我们上面已经说了布隆过滤器的原理,知道了布隆过滤器的原理之后就可以自己手动实现一个了。

如果你想要手动实现一个的话,你需要:

  1. 一个合适大小的位数组保存数据
  2. 几个不同的哈希函数
  3. 添加元素到位数组(布隆过滤器)的方法实现
  4. 判断给定元素是否存在于位数组(布隆过滤器)的方法实现。

下面给出一个我觉得写的还算不错的代码(参考网上已有代码改进得到,对于所有类型对象皆适用):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import java.util.BitSet;

public class MyBloomFilter {

/**
* 位数组的大小
*/
private static final int DEFAULT_SIZE = 2 << 24;
/**
* 通过这个数组可以创建 6 个不同的哈希函数
*/
private static final int[] SEEDS = new int[]{3, 13, 46, 71, 91, 134};

/**
* 位数组。数组中的元素只能是 0 或者 1
*/
private BitSet bits = new BitSet(DEFAULT_SIZE);

/**
* 存放包含 hash 函数的类的数组
*/
private SimpleHash[] func = new SimpleHash[SEEDS.length];

/**
* 初始化多个包含 hash 函数的类的数组,每个类中的 hash 函数都不一样
*/
public MyBloomFilter() {
// 初始化多个不同的 Hash 函数
for (int i = 0; i < SEEDS.length; i++) {
func[i] = new SimpleHash(DEFAULT_SIZE, SEEDS[i]);
}
}

/**
* 添加元素到位数组
*/
public void add(Object value) {
for (SimpleHash f : func) {
bits.set(f.hash(value), true);
}
}

/**
* 判断指定元素是否存在于位数组
*/
public boolean contains(Object value) {
boolean ret = true;
for (SimpleHash f : func) {
ret = ret && bits.get(f.hash(value));
}
return ret;
}

/**
* 静态内部类。用于 hash 操作!
*/
public static class SimpleHash {

private int cap;
private int seed;

public SimpleHash(int cap, int seed) {
this.cap = cap;
this.seed = seed;
}

/**
* 计算 hash 值
*/
public int hash(Object value) {
int h;
return (value == null) ? 0 : Math.abs((cap - 1) & seed * ((h = value.hashCode()) ^ (h >>> 16)));
}

}
}

测试:

1
2
3
4
5
6
7
8
9
String value1 = "https://javaguide.cn/";
String value2 = "https://github.com/Snailclimb";
MyBloomFilter filter = new MyBloomFilter();
System.out.println(filter.contains(value1));
System.out.println(filter.contains(value2));
filter.add(value1);
filter.add(value2);
System.out.println(filter.contains(value1));
System.out.println(filter.contains(value2));

Output:

1
2
3
4
false
false
true
true

测试:

1
2
3
4
5
6
7
8
9
Integer value1 = 13423;
Integer value2 = 22131;
MyBloomFilter filter = new MyBloomFilter();
System.out.println(filter.contains(value1));
System.out.println(filter.contains(value2));
filter.add(value1);
filter.add(value2);
System.out.println(filter.contains(value1));
System.out.println(filter.contains(value2));

Output:

1
2
3
4
false
false
true
true

利用 Google 开源的 Guava 中自带的布隆过滤器

自己实现的目的主要是为了让自己搞懂布隆过滤器的原理,Guava 中布隆过滤器的实现算是比较权威的,所以实际项目中我们不需要手动实现一个布隆过滤器。

首先我们需要在项目中引入 Guava 的依赖:

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>

实际使用如下:

我们创建了一个最多存放 最多 1500 个整数的布隆过滤器,并且我们可以容忍误判的概率为百分之(0.01)

1
2
3
4
5
6
7
8
9
10
11
12
13
// 创建布隆过滤器对象
BloomFilter<Integer> filter = BloomFilter.create(
Funnels.integerFunnel(),
1500,
0.01);
// 判断指定元素是否存在
System.out.println(filter.mightContain(1));
System.out.println(filter.mightContain(2));
// 将元素添加进布隆过滤器
filter.put(1);
filter.put(2);
System.out.println(filter.mightContain(1));
System.out.println(filter.mightContain(2));

在我们的示例中,当 mightContain() 方法返回 true 时,我们可以 99%确定该元素在过滤器中,当过滤器返回 false 时,我们可以 100%确定该元素不存在于过滤器中。

Guava 提供的布隆过滤器的实现还是很不错的(想要详细了解的可以看一下它的源码实现),但是它有一个重大的缺陷就是只能单机使用(另外,容量扩展也不容易),而现在互联网一般都是分布式的场景。为了解决这个问题,我们就需要用到 Redis 中的布隆过滤器了。

Redis 中的布隆过滤器

介绍

Redis v4.0 之后有了 Module(模块/插件) 功能,Redis Modules 让 Redis 可以使用外部模块扩展其功能 。布隆过滤器就是其中的 Module。详情可以查看 Redis 官方对 Redis Modules 的介绍:https://redis.io/modules

另外,官网推荐了一个 RedisBloom 作为 Redis 布隆过滤器的 Module,地址:https://github.com/RedisBloom/RedisBloom
其他还有:

  • redis-lua-scaling-bloom-filter(lua 脚本实现):https://github.com/erikdubbelboer/redis-lua-scaling-bloom-filter
  • pyreBloom(Python 中的快速 Redis 布隆过滤器):https://github.com/seomoz/pyreBloom
  • ……

RedisBloom 提供了多种语言的客户端支持,包括:Python、Java、JavaScript 和 PHP。

使用 Docker 安装

如果我们需要体验 Redis 中的布隆过滤器非常简单,通过 Docker 就可以了!我们直接在 Google 搜索 docker redis bloomfilter 然后在排除广告的第一条搜素结果就找到了我们想要的答案(这是我平常解决问题的一种方式,分享一下),具体地址:https://hub.docker.com/r/redislabs/rebloom/ (介绍的很详细 )。

具体操作如下:

1
2
3
4
➜  ~ docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest
➜ ~ docker exec -it redis-redisbloom bash
root@21396d02c252:/data# redis-cli
127.0.0.1:6379>

注意:当前 rebloom 镜像已经被废弃,官方推荐使用redis-stack

常用命令一览

注意:key : 布隆过滤器的名称,item : 添加的元素。

  1. BF.ADD:将元素添加到布隆过滤器中,如果该过滤器尚不存在,则创建该过滤器。格式:BF.ADD {key} {item}。
  2. BF.MADD : 将一个或多个元素添加到“布隆过滤器”中,并创建一个尚不存在的过滤器。该命令的操作方式BF.ADD与之相同,只不过它允许多个输入并返回多个值。格式:BF.MADD {key} {item} [item ...] 。
  3. BF.EXISTS : 确定元素是否在布隆过滤器中存在。格式:BF.EXISTS {key} {item}。
  4. BF.MEXISTS:确定一个或者多个元素是否在布隆过滤器中存在格式:BF.MEXISTS {key} {item} [item ...]。

另外, BF.RESERVE 命令需要单独介绍一下:

这个命令的格式如下:

BF.RESERVE {key} {error_rate} {capacity} [EXPANSION expansion] 。

下面简单介绍一下每个参数的具体含义:

  1. key:布隆过滤器的名称
  2. error_rate : 期望的误报率。该值必须介于 0 到 1 之间。例如,对于期望的误报率 0.1%(1000 中为 1),error_rate 应该设置为 0.001。该数字越接近零,则每个项目的内存消耗越大,并且每个操作的 CPU 使用率越高。
  3. capacity: 过滤器的容量。当实际存储的元素个数超过这个值之后,性能将开始下降。实际的降级将取决于超出限制的程度。随着过滤器元素数量呈指数增长,性能将线性下降。

可选参数:

  • expansion:如果创建了一个新的子过滤器,则其大小将是当前过滤器的大小乘以expansion。默认扩展值为 2。这意味着每个后续子过滤器将是前一个子过滤器的两倍。

实际使用

1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> BF.ADD myFilter java
(integer) 1
127.0.0.1:6379> BF.ADD myFilter javaguide
(integer) 1
127.0.0.1:6379> BF.EXISTS myFilter java
(integer) 1
127.0.0.1:6379> BF.EXISTS myFilter javaguide
(integer) 1
127.0.0.1:6379> BF.EXISTS myFilter github
(integer) 0

Java内存区域详解(重点)

发表于 2024-04-08 | 分类于 Java , JVM | 阅读次数:
字数统计: 7.3k 字 | 阅读时长 ≈ 25 分钟

如果没有特殊说明,都是针对的是 HotSpot 虚拟机。

本文基于《深入理解 Java 虚拟机:JVM 高级特性与最佳实践》进行总结补充。

常见面试题:

  • 介绍下 Java 内存区域(运行时数据区)
  • Java 对象的创建过程(五步,建议能默写出来并且要知道每一步虚拟机做了什么)
  • 对象的访问定位的两种方式(句柄和直接指针两种方式)

前言

对于 Java 程序员来说,在虚拟机自动内存管理机制下,不再需要像 C/C++程序开发程序员这样为每一个 new 操作去写对应的 delete/free 操作,不容易出现内存泄漏和内存溢出问题。正是因为 Java 程序员把内存控制权利交给 Java 虚拟机,一旦出现内存泄漏和溢出方面的问题,如果不了解虚拟机是怎样使用内存的,那么排查错误将会是一个非常艰巨的任务。

运行时数据区域

Java 虚拟机在执行 Java 程序的过程中会把它管理的内存划分成若干个不同的数据区域。

JDK 1.8 和之前的版本略有不同,我们这里以 JDK 1.7 和 JDK 1.8 这两个版本为例介绍。

JDK 1.7:

Java 运行时数据区域(JDK1.7)

JDK 1.8:

Java 运行时数据区域(JDK1.8 )

线程私有的:

  • 程序计数器
  • 虚拟机栈
  • 本地方法栈

线程共享的:

  • 堆
  • 方法区
  • 直接内存 (非运行时数据区的一部分)

Java 虚拟机规范对于运行时数据区域的规定是相当宽松的。以堆为例:堆可以是连续空间,也可以不连续。堆的大小可以固定,也可以在运行时按需扩展 。虚拟机实现者可以使用任何垃圾回收算法管理堆,甚至完全不进行垃圾收集也是可以的。

程序计数器

程序计数器是一块较小的内存空间,可以看作是当前线程所执行的字节码的行号指示器。字节码解释器工作时通过改变这个计数器的值来选取下一条需要执行的字节码指令,分支、循环、跳转、异常处理、线程恢复等功能都需要依赖这个计数器来完成。

另外,为了线程切换后能恢复到正确的执行位置,每条线程都需要有一个独立的程序计数器,各线程之间计数器互不影响,独立存储,我们称这类内存区域为“线程私有”的内存。

从上面的介绍中我们知道了程序计数器主要有两个作用:

  • 字节码解释器通过改变程序计数器来依次读取指令,从而实现代码的流程控制,如:顺序执行、选择、循环、异常处理。
  • 在多线程的情况下,程序计数器用于记录当前线程执行的位置,从而当线程被切换回来的时候能够知道该线程上次运行到哪儿了。

⚠️ 注意:程序计数器是唯一一个不会出现 OutOfMemoryError 的内存区域,它的生命周期随着线程的创建而创建,随着线程的结束而死亡。

Java 虚拟机栈

与程序计数器一样,Java 虚拟机栈(后文简称栈)也是线程私有的,它的生命周期和线程相同,随着线程的创建而创建,随着线程的死亡而死亡。

栈绝对算的上是 JVM 运行时数据区域的一个核心,除了一些 Native 方法调用是通过本地方法栈实现的(后面会提到),其他所有的 Java 方法调用都是通过栈来实现的(也需要和其他运行时数据区域比如程序计数器配合)。

方法调用的数据需要通过栈进行传递,每一次方法调用都会有一个对应的栈帧被压入栈中,每一个方法调用结束后,都会有一个栈帧被弹出。

栈由一个个栈帧组成,而每个栈帧中都拥有:局部变量表、操作数栈、动态链接、方法返回地址。和数据结构上的栈类似,两者都是先进后出的数据结构,只支持出栈和入栈两种操作。

Java 虚拟机栈

局部变量表 主要存放了编译期可知的各种数据类型(boolean、byte、char、short、int、float、long、double)、对象引用(reference 类型,它不同于对象本身,可能是一个指向对象起始地址的引用指针,也可能是指向一个代表对象的句柄或其他与此对象相关的位置)。

局部变量表

操作数栈 主要作为方法调用的中转站使用,用于存放方法执行过程中产生的中间计算结果。另外,计算过程中产生的临时变量也会放在操作数栈中。

动态链接 主要服务一个方法需要调用其他方法的场景。Class 文件的常量池里保存有大量的符号引用比如方法引用的符号引用。当一个方法要调用其他方法,需要将常量池中指向方法的符号引用转化为其在内存地址中的直接引用。动态链接的作用就是为了将符号引用转换为调用方法的直接引用,这个过程也被称为 动态连接 。

栈空间虽然不是无限的,但一般正常调用的情况下是不会出现问题的。不过,如果函数调用陷入无限循环的话,就会导致栈中被压入太多栈帧而占用太多空间,导致栈空间过深。那么当线程请求栈的深度超过当前 Java 虚拟机栈的最大深度的时候,就抛出 StackOverFlowError 错误。

Java 方法有两种返回方式,一种是 return 语句正常返回,一种是抛出异常。不管哪种返回方式,都会导致栈帧被弹出。也就是说, 栈帧随着方法调用而创建,随着方法结束而销毁。无论方法正常完成还是异常完成都算作方法结束。

除了 StackOverFlowError 错误之外,栈还可能会出现OutOfMemoryError错误,这是因为如果栈的内存大小可以动态扩展, 如果虚拟机在动态扩展栈时无法申请到足够的内存空间,则抛出OutOfMemoryError异常。

简单总结一下程序运行中栈可能会出现两种错误:

  • StackOverFlowError: 若栈的内存大小不允许动态扩展,那么当线程请求栈的深度超过当前 Java 虚拟机栈的最大深度的时候,就抛出 StackOverFlowError 错误。
  • OutOfMemoryError: 如果栈的内存大小可以动态扩展, 如果虚拟机在动态扩展栈时无法申请到足够的内存空间,则抛出OutOfMemoryError异常。

本地方法栈

和虚拟机栈所发挥的作用非常相似,区别是:虚拟机栈为虚拟机执行 Java 方法 (也就是字节码)服务,而本地方法栈则为虚拟机使用到的 Native 方法服务。 在 HotSpot 虚拟机中和 Java 虚拟机栈合二为一。

本地方法被执行的时候,在本地方法栈也会创建一个栈帧,用于存放该本地方法的局部变量表、操作数栈、动态链接、出口信息。

方法执行完毕后相应的栈帧也会出栈并释放内存空间,也会出现 StackOverFlowError 和 OutOfMemoryError 两种错误。

堆

Java 虚拟机所管理的内存中最大的一块,Java 堆是所有线程共享的一块内存区域,在虚拟机启动时创建。此内存区域的唯一目的就是存放对象实例,几乎所有的对象实例以及数组都在这里分配内存。

Java 世界中“几乎”所有的对象都在堆中分配,但是,随着 JIT 编译器的发展与逃逸分析技术逐渐成熟,栈上分配、标量替换优化技术将会导致一些微妙的变化,所有的对象都分配到堆上也渐渐变得不那么“绝对”了。从 JDK 1.7 开始已经默认开启逃逸分析,如果某些方法中的对象引用没有被返回或者未被外面使用(也就是未逃逸出去),那么对象可以直接在栈上分配内存。

Java 堆是垃圾收集器管理的主要区域,因此也被称作 GC 堆(Garbage Collected Heap)。从垃圾回收的角度,由于现在收集器基本都采用分代垃圾收集算法,所以 Java 堆还可以细分为:新生代和老年代;再细致一点有:Eden、Survivor、Old 等空间。进一步划分的目的是更好地回收内存,或者更快地分配内存。

在 JDK 7 版本及 JDK 7 版本之前,堆内存被通常分为下面三部分:

  1. 新生代内存(Young Generation)
  2. 老生代(Old Generation)
  3. 永久代(Permanent Generation)

下图所示的 Eden 区、两个 Survivor 区 S0 和 S1 都属于新生代,中间一层属于老年代,最下面一层属于永久代。

堆内存结构

JDK 8 版本之后 PermGen(永久代) 已被 Metaspace(元空间) 取代,元空间使用的是本地内存。 (我会在方法区这部分内容详细介绍到)。

大部分情况,对象都会首先在 Eden 区域分配,在一次新生代垃圾回收后,如果对象还存活,则会进入 S0 或者 S1,并且对象的年龄还会加 1(Eden 区->Survivor 区后对象的初始年龄变为 1),当它的年龄增加到一定程度(默认为 15 岁),就会被晋升到老年代中。对象晋升到老年代的年龄阈值,可以通过参数 -XX:MaxTenuringThreshold 来设置。不过,设置的值应该在 0-15,否则会爆出以下错误:

1
MaxTenuringThreshold of 20 is invalid; must be between 0 and 15

为什么年龄只能是 0-15?

因为记录年龄的区域在对象头中,这个区域的大小通常是 4 位。这 4 位可以表示的最大二进制数字是 1111,即十进制的 15。因此,对象的年龄被限制为 0 到 15。

这里我们简单结合对象布局来详细介绍一下。

在 HotSpot 虚拟机中,对象在内存中存储的布局可以分为 3 块区域:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。其中,对象头包括两部分:标记字段(Mark Word)和类型指针(Klass Word)。关于对象内存布局的详细介绍,后文会介绍到,这里就不重复提了。

这个年龄信息就是在标记字段中存放的(标记字段还存放了对象自身的其他信息比如哈希码、锁状态信息等等)。markOop.hpp定义了标记字(mark word)的结构:

标记字段结构

可以看到对象年龄占用的大小确实是 4 位。

🐛 修正(参见:issue552):“Hotspot 遍历所有对象时,按照年龄从小到大对其所占用的大小进行累加,当累加到某个年龄时,所累加的大小超过了 Survivor 区的一半,则取这个年龄和 MaxTenuringThreshold 中更小的一个值,作为新的晋升年龄阈值”。

动态年龄计算的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
uint ageTable::compute_tenuring_threshold(size_t survivor_capacity) {
//survivor_capacity是survivor空间的大小
size_t desired_survivor_size = (size_t)((((double) survivor_capacity)*TargetSurvivorRatio)/100);//TargetSurvivorRatio 为50
size_t total = 0;
uint age = 1;
while (age < table_size) {
total += sizes[age];//sizes数组是每个年龄段对象大小
if (total > desired_survivor_size) break;
age++;
}
uint result = age < MaxTenuringThreshold ? age : MaxTenuringThreshold;
...
}

堆这里最容易出现的就是 OutOfMemoryError 错误,并且出现这种错误之后的表现形式还会有几种,比如:

  1. **java.lang.OutOfMemoryError: GC Overhead Limit Exceeded**:当 JVM 花太多时间执行垃圾回收并且只能回收很少的堆空间时,就会发生此错误。
  2. java.lang.OutOfMemoryError: Java heap space :假如在创建新的对象时, 堆内存中的空间不足以存放新创建的对象, 就会引发此错误。(和配置的最大堆内存有关,且受制于物理内存大小。最大堆内存可通过-Xmx参数配置,若没有特别配置,将会使用默认值,详见:Default Java 8 max heap size)
  3. ……

方法区

方法区属于是 JVM 运行时数据区域的一块逻辑区域,是各个线程共享的内存区域。

《Java 虚拟机规范》只是规定了有方法区这么个概念和它的作用,方法区到底要如何实现那就是虚拟机自己要考虑的事情了。也就是说,在不同的虚拟机实现上,方法区的实现是不同的。

当虚拟机要使用一个类时,它需要读取并解析 Class 文件获取相关信息,再将信息存入到方法区。方法区会存储已被虚拟机加载的 类信息、字段信息、方法信息、常量、静态变量、即时编译器编译后的代码缓存等数据。

方法区和永久代以及元空间是什么关系呢? 方法区和永久代以及元空间的关系很像 Java 中接口和类的关系,类实现了接口,这里的类就可以看作是永久代和元空间,接口可以看作是方法区,也就是说永久代以及元空间是 HotSpot 虚拟机对虚拟机规范中方法区的两种实现方式。并且,永久代是 JDK 1.8 之前的方法区实现,JDK 1.8 及以后方法区的实现变成了元空间。

HotSpot 虚拟机方法区的两种实现

为什么要将永久代 (PermGen) 替换为元空间 (MetaSpace) 呢?

下图来自《深入理解 Java 虚拟机》第 3 版 2.2.5

1、整个永久代有一个 JVM 本身设置的固定大小上限,无法进行调整(也就是受到 JVM 内存的限制),而元空间使用的是本地内存,受本机可用内存的限制,虽然元空间仍旧可能溢出,但是比原来出现的几率会更小。

当元空间溢出时会得到如下错误:java.lang.OutOfMemoryError: MetaSpace

你可以使用 -XX:MaxMetaspaceSize 标志设置最大元空间大小,默认值为 unlimited,这意味着它只受系统内存的限制。-XX:MetaspaceSize 调整标志定义元空间的初始大小如果未指定此标志,则 Metaspace 将根据运行时的应用程序需求动态地重新调整大小。

2、元空间里面存放的是类的元数据,这样加载多少类的元数据就不由 MaxPermSize 控制了, 而由系统的实际可用空间来控制,这样能加载的类就更多了。

3、在 JDK8,合并 HotSpot 和 JRockit 的代码时, JRockit 从来没有一个叫永久代的东西, 合并之后就没有必要额外的设置这么一个永久代的地方了。

4、永久代会为 GC 带来不必要的复杂度,并且回收效率偏低。

方法区常用参数有哪些?

JDK 1.8 之前永久代还没被彻底移除的时候通常通过下面这些参数来调节方法区大小。

1
2
-XX:PermSize=N //方法区 (永久代) 初始大小
-XX:MaxPermSize=N //方法区 (永久代) 最大大小,超过这个值将会抛出 OutOfMemoryError 异常:java.lang.OutOfMemoryError: PermGen

相对而言,垃圾收集行为在这个区域是比较少出现的,但并非数据进入方法区后就“永久存在”了。

JDK 1.8 的时候,方法区(HotSpot 的永久代)被彻底移除了(JDK1.7 就已经开始了),取而代之是元空间,元空间使用的是本地内存。下面是一些常用参数:

1
2
-XX:MetaspaceSize=N //设置 Metaspace 的初始(和最小大小)
-XX:MaxMetaspaceSize=N //设置 Metaspace 的最大大小

与永久代很大的不同就是,如果不指定大小的话,随着更多类的创建,虚拟机会耗尽所有可用的系统内存。

运行时常量池

Class 文件中除了有类的版本、字段、方法、接口等描述信息外,还有用于存放编译期生成的各种字面量(Literal)和符号引用(Symbolic Reference)的 常量池表(Constant Pool Table) 。

字面量是源代码中的固定值的表示法,即通过字面我们就能知道其值的含义。字面量包括整数、浮点数和字符串字面量。常见的符号引用包括类符号引用、字段符号引用、方法符号引用、接口方法符号。

《深入理解 Java 虚拟机》7.34 节第三版对符号引用和直接引用的解释如下:

符号引用和直接引用

常量池表会在类加载后存放到方法区的运行时常量池中。

运行时常量池的功能类似于传统编程语言的符号表,尽管它包含了比典型符号表更广泛的数据。

既然运行时常量池是方法区的一部分,自然受到方法区内存的限制,当常量池无法再申请到内存时会抛出 OutOfMemoryError 错误。

字符串常量池

字符串常量池 是 JVM 为了提升性能和减少内存消耗针对字符串(String 类)专门开辟的一块区域,主要目的是为了避免字符串的重复创建。

1
2
3
4
5
6
// 在字符串常量池中创建字符串对象 ”ab“
// 将字符串对象 ”ab“ 的引用赋值给给 aa
String aa = "ab";
// 直接返回字符串常量池中字符串对象 ”ab“,赋值给引用 bb
String bb = "ab";
System.out.println(aa==bb); // true

HotSpot 虚拟机中字符串常量池的实现是 src/hotspot/share/classfile/stringTable.cpp ,StringTable 可以简单理解为一个固定大小的HashTable ,容量为 StringTableSize(可以通过 -XX:StringTableSize 参数来设置),保存的是字符串(key)和 字符串对象的引用(value)的映射关系,字符串对象的引用指向堆中的字符串对象。

JDK1.7 之前,字符串常量池存放在永久代。JDK1.7 字符串常量池和静态变量从永久代移动到了 Java 堆中。

method-area-jdk1.6

method-area-jdk1.7

JDK 1.7 为什么要将字符串常量池移动到堆中?

主要是因为永久代(方法区实现)的 GC 回收效率太低,只有在整堆收集 (Full GC)的时候才会被执行 GC。Java 程序中通常会有大量的被创建的字符串等待回收,将字符串常量池放到堆中,能够更高效及时地回收字符串内存。

相关问题:JVM 常量池中存储的是对象还是引用呢? - RednaxelaFX - 知乎

最后再来分享一段周志明老师在《深入理解 Java 虚拟机(第 3 版)》样例代码&勘误 GitHub 仓库的 issue#112 中说过的话:

运行时常量池、方法区、字符串常量池这些都是不随虚拟机实现而改变的逻辑概念,是公共且抽象的,Metaspace、Heap 是与具体某种虚拟机实现相关的物理概念,是私有且具体的。

直接内存

直接内存是一种特殊的内存缓冲区,并不在 Java 堆或方法区中分配的,而是通过 JNI 的方式在本地内存上分配的。

直接内存并不是虚拟机运行时数据区的一部分,也不是虚拟机规范中定义的内存区域,但是这部分内存也被频繁地使用。而且也可能导致 OutOfMemoryError 错误出现。

JDK1.4 中新加入的 NIO(Non-Blocking I/O,也被称为 New I/O),引入了一种基于通道(Channel)与缓存区(Buffer)的 I/O 方式,它可以直接使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆中的 DirectByteBuffer 对象作为这块内存的引用进行操作。这样就能在一些场景中显著提高性能,因为避免了在 Java 堆和 Native 堆之间来回复制数据。

直接内存的分配不会受到 Java 堆的限制,但是,既然是内存就会受到本机总内存大小以及处理器寻址空间的限制。

类似的概念还有 堆外内存 。在一些文章中将直接内存等价于堆外内存,个人觉得不是特别准确。

堆外内存就是把内存对象分配在堆外的内存,这些内存直接受操作系统管理(而不是虚拟机),这样做的结果就是能够在一定程度上减少垃圾回收对应用程序造成的影响。

HotSpot 虚拟机对象探秘

通过上面的介绍我们大概知道了虚拟机的内存情况,下面我们来详细的了解一下 HotSpot 虚拟机在 Java 堆中对象分配、布局和访问的全过程。

对象的创建

Java 对象的创建过程我建议最好是能默写出来,并且要掌握每一步在做什么。

Step1:类加载检查

虚拟机遇到一条 new 指令时,首先将去检查这个指令的参数是否能在常量池中定位到这个类的符号引用,并且检查这个符号引用代表的类是否已被加载过、解析和初始化过。如果没有,那必须先执行相应的类加载过程。

Step2:分配内存

在类加载检查通过后,接下来虚拟机将为新生对象分配内存。对象所需的内存大小在类加载完成后便可确定,为对象分配空间的任务等同于把一块确定大小的内存从 Java 堆中划分出来。分配方式有 “指针碰撞” 和 “空闲列表” 两种,选择哪种分配方式由 Java 堆是否规整决定,而 Java 堆是否规整又由所采用的垃圾收集器是否带有压缩整理功能决定。

内存分配的两种方式 (补充内容,需要掌握):

  • 指针碰撞:
    • 适用场合:堆内存规整(即没有内存碎片)的情况下。
    • 原理:用过的内存全部整合到一边,没有用过的内存放在另一边,中间有一个分界指针,只需要向着没用过的内存方向将该指针移动对象内存大小位置即可。
    • 使用该分配方式的 GC 收集器:Serial, ParNew
  • 空闲列表:
    • 适用场合:堆内存不规整的情况下。
    • 原理:虚拟机会维护一个列表,该列表中会记录哪些内存块是可用的,在分配的时候,找一块儿足够大的内存块儿来划分给对象实例,最后更新列表记录。
    • 使用该分配方式的 GC 收集器:CMS

选择以上两种方式中的哪一种,取决于 Java 堆内存是否规整。而 Java 堆内存是否规整,取决于 GC 收集器的算法是”标记-清除”,还是”标记-整理”(也称作”标记-压缩”),值得注意的是,复制算法内存也是规整的。

内存分配并发问题(补充内容,需要掌握)

在创建对象的时候有一个很重要的问题,就是线程安全,因为在实际开发过程中,创建对象是很频繁的事情,作为虚拟机来说,必须要保证线程是安全的,通常来讲,虚拟机采用两种方式来保证线程安全:

  • CAS+失败重试: CAS 是乐观锁的一种实现方式。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。虚拟机采用 CAS 配上失败重试的方式保证更新操作的原子性。
  • TLAB: 为每一个线程预先在 Eden 区分配一块儿内存,JVM 在给线程中的对象分配内存时,首先在 TLAB 分配,当对象大于 TLAB 中的剩余内存或 TLAB 的内存已用尽时,再采用上述的 CAS 进行内存分配

Step3:初始化零值

内存分配完成后,虚拟机需要将分配到的内存空间都初始化为零值(不包括对象头),这一步操作保证了对象的实例字段在 Java 代码中可以不赋初始值就直接使用,程序能访问到这些字段的数据类型所对应的零值。

Step4:设置对象头

初始化零值完成之后,虚拟机要对对象进行必要的设置,例如这个对象是哪个类的实例、如何才能找到类的元数据信息、对象的哈希码、对象的 GC 分代年龄等信息。 这些信息存放在对象头中。 另外,根据虚拟机当前运行状态的不同,如是否启用偏向锁等,对象头会有不同的设置方式。

Step5:执行 init 方法

在上面工作都完成之后,从虚拟机的视角来看,一个新的对象已经产生了,但从 Java 程序的视角来看,对象创建才刚开始,<init> 方法还没有执行,所有的字段都还为零。所以一般来说,执行 new 指令之后会接着执行 <init> 方法,把对象按照程序员的意愿进行初始化,这样一个真正可用的对象才算完全产生出来。

对象的内存布局

在 Hotspot 虚拟机中,对象在内存中的布局可以分为 3 块区域:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。

对象头包括两部分信息:

  1. 标记字段(Mark Word):用于存储对象自身的运行时数据, 如哈希码(HashCode)、GC 分代年龄、锁状态标志、线程持有的锁、偏向线程 ID、偏向时间戳等等。
  2. 类型指针(Klass pointer):对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。

实例数据部分是对象真正存储的有效信息,也是在程序中所定义的各种类型的字段内容。

对齐填充部分不是必然存在的,也没有什么特别的含义,仅仅起占位作用。 因为 Hotspot 虚拟机的自动内存管理系统要求对象起始地址必须是 8 字节的整数倍,换句话说就是对象的大小必须是 8 字节的整数倍。而对象头部分正好是 8 字节的倍数(1 倍或 2 倍),因此,当对象实例数据部分没有对齐时,就需要通过对齐填充来补全。

对象的访问定位

建立对象就是为了使用对象,我们的 Java 程序通过栈上的 reference 数据来操作堆上的具体对象。对象的访问方式由虚拟机实现而定,目前主流的访问方式有:使用句柄、直接指针。

句柄

如果使用句柄的话,那么 Java 堆中将会划分出一块内存来作为句柄池,reference 中存储的就是对象的句柄地址,而句柄中包含了对象实例数据与对象类型数据各自的具体地址信息。

对象的访问定位-使用句柄

直接指针

如果使用直接指针访问,reference 中存储的直接就是对象的地址。

对象的访问定位-直接指针

这两种对象访问方式各有优势。使用句柄来访问的最大好处是 reference 中存储的是稳定的句柄地址,在对象被移动时只会改变句柄中的实例数据指针,而 reference 本身不需要修改。使用直接指针访问方式最大的好处就是速度快,它节省了一次指针定位的时间开销。

HotSpot 虚拟机主要使用的就是这种方式来进行对象访问。

参考

  • 《深入理解 Java 虚拟机:JVM 高级特性与最佳实践(第二版》
  • 《自己动手写 Java 虚拟机》
  • Chapter 2. The Structure of the Java Virtual Machine:https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html
  • JVM 栈帧内部结构-动态链接:https://chenxitag.com/archives/368
  • Java 中 new String(“字面量”) 中 “字面量” 是何时进入字符串常量池的? - 木女孩的回答 - 知乎:https://www.zhihu.com/question/55994121/answer/147296098
  • JVM 常量池中存储的是对象还是引用呢? - RednaxelaFX 的回答 - 知乎:https://www.zhihu.com/question/57109429/answer/151717241
  • http://www.pointsoftware.ch/en/under-the-hood-runtime-data-areas-javas-memory-model/
  • https://dzone.com/articles/jvm-permgen-%E2%80%93-where-art-thou
  • https://stackoverflow.com/questions/9095748/method-area-and-permgen

图

发表于 2024-03-22 | 分类于 数据结构 | 阅读次数:
字数统计: 1.4k 字 | 阅读时长 ≈ 5 分钟

图是一种较为复杂的非线性结构。 为啥说其较为复杂呢?

根据前面的内容,我们知道:

  • 线性数据结构的元素满足唯一的线性关系,每个元素(除第一个和最后一个外)只有一个直接前趋和一个直接后继。
  • 树形数据结构的元素之间有着明显的层次关系。

但是,图形结构的元素之间的关系是任意的。

何为图呢? 简单来说,图就是由顶点的有穷非空集合和顶点之间的边组成的集合。通常表示为:**G(V,E)**,其中,G 表示一个图,V 表示顶点的集合,E 表示边的集合。

下图所展示的就是图这种数据结构,并且还是一张有向图。

有向图

图在我们日常生活中的例子很多!比如我们在社交软件上好友关系就可以用图来表示。

图的基本概念

顶点

图中的数据元素,我们称之为顶点,图至少有一个顶点(非空有穷集合)

对应到好友关系图,每一个用户就代表一个顶点。

边

顶点之间的关系用边表示。

对应到好友关系图,两个用户是好友的话,那两者之间就存在一条边。

度

度表示一个顶点包含多少条边,在有向图中,还分为出度和入度,出度表示从该顶点出去的边的条数,入度表示进入该顶点的边的条数。

对应到好友关系图,度就代表了某个人的好友数量。

无向图和有向图

边表示的是顶点之间的关系,有的关系是双向的,比如同学关系,A 是 B 的同学,那么 B 也肯定是 A 的同学,那么在表示 A 和 B 的关系时,就不用关注方向,用不带箭头的边表示,这样的图就是无向图。

有的关系是有方向的,比如父子关系,师生关系,微博的关注关系,A 是 B 的爸爸,但 B 肯定不是 A 的爸爸,A 关注 B,B 不一定关注 A。在这种情况下,我们就用带箭头的边表示二者的关系,这样的图就是有向图。

无权图和带权图

对于一个关系,如果我们只关心关系的有无,而不关心关系有多强,那么就可以用无权图表示二者的关系。

对于一个关系,如果我们既关心关系的有无,也关心关系的强度,比如描述地图上两个城市的关系,需要用到距离,那么就用带权图来表示,带权图中的每一条边一个数值表示权值,代表关系的强度。

下图就是一个带权有向图。

带权有向图

图的存储

邻接矩阵存储

邻接矩阵将图用二维矩阵存储,是一种较为直观的表示方式。

如果第 i 个顶点和第 j 个顶点之间有关系,且关系权值为 n,则 A[i][j]=n 。

在无向图中,我们只关心关系的有无,所以当顶点 i 和顶点 j 有关系时,A[i][j]=1,当顶点 i 和顶点 j 没有关系时,A[i][j]=0。如下图所示:

无向图的邻接矩阵存储

值得注意的是:无向图的邻接矩阵是一个对称矩阵,因为在无向图中,顶点 i 和顶点 j 有关系,则顶点 j 和顶点 i 必有关系。

有向图的邻接矩阵存储

邻接矩阵存储的方式优点是简单直接(直接使用一个二维数组即可),并且,在获取两个定点之间的关系的时候也非常高效(直接获取指定位置的数组元素的值即可)。但是,这种存储方式的缺点也比较明显,那就是比较浪费空间,

邻接表存储

针对上面邻接矩阵比较浪费内存空间的问题,诞生了图的另外一种存储方法—邻接表 。

邻接链表使用一个链表来存储某个顶点的所有后继相邻顶点。对于图中每个顶点 Vi,把所有邻接于 Vi 的顶点 Vj 链成一个单链表,这个单链表称为顶点 Vi 的 邻接表。如下图所示:

无向图的邻接表存储

有向图的邻接表存储

大家可以数一数邻接表中所存储的元素的个数以及图中边的条数,你会发现:

  • 在无向图中,邻接表元素个数等于边的条数的两倍,如左图所示的无向图中,边的条数为 7,邻接表存储的元素个数为 14。
  • 在有向图中,邻接表元素个数等于边的条数,如右图所示的有向图中,边的条数为 8,邻接表存储的元素个数为 8。

图的搜索

广度优先搜索

广度优先搜索就像水面上的波纹一样一层一层向外扩展,如下图所示:

广度优先搜索图示

广度优先搜索的具体实现方式用到了之前所学过的线性数据结构——队列 。具体过程如下图所示:

第 1 步:

广度优先搜索1

第 2 步:

广度优先搜索2

第 3 步:

广度优先搜索3

第 4 步:

广度优先搜索4

第 5 步:

广度优先搜索5

第 6 步:

广度优先搜索6

深度优先搜索

深度优先搜索就是“一条路走到黑”,从源顶点开始,一直走到没有后继节点,才回溯到上一顶点,然后继续“一条路走到黑”,如下图所示:

深度优先搜索图示

和广度优先搜索类似,深度优先搜索的具体实现用到了另一种线性数据结构——栈 。具体过程如下图所示:

第 1 步:

深度优先搜索1

第 2 步:

深度优先搜索2

第 3 步:

深度优先搜索3

第 4 步:

深度优先搜索4

第 5 步:

深度优先搜索5

第 6 步:

深度优先搜索6

从ReentrantLock的实现看AQS的原理及应用

发表于 2024-03-20 | 分类于 Java , 并发 | 阅读次数:
字数统计: 9.5k 字 | 阅读时长 ≈ 37 分钟

本文转载自:https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

作者:美团技术团队

Java 中的大部分同步类(Semaphore、ReentrantLock 等)都是基于 AbstractQueuedSynchronizer(简称为 AQS)实现的。AQS 是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。

本文会从应用层逐渐深入到原理层,并通过 ReentrantLock 的基本特性和 ReentrantLock 与 AQS 的关联,来深入解读 AQS 相关独占锁的知识点,同时采取问答的模式来帮助大家理解 AQS。由于篇幅原因,本篇文章主要阐述 AQS 中独占锁的逻辑和 Sync Queue,不讲述包含共享锁和 Condition Queue 的部分(本篇文章核心为 AQS 原理剖析,只是简单介绍了 ReentrantLock,感兴趣同学可以阅读一下 ReentrantLock 的源码)。

1 ReentrantLock

1.1 ReentrantLock 特性概览

ReentrantLock 意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。为了帮助大家更好地理解 ReentrantLock 的特性,我们先将 ReentrantLock 跟常用的 Synchronized 进行比较,其特性如下(蓝色部分为本篇文章主要剖析的点):

下面通过伪代码,进行更加直观的比较:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// **************************Synchronized的使用方式**************************
// 1.用于代码块
synchronized (this) {}
// 2.用于对象
synchronized (object) {}
// 3.用于方法
public synchronized void test () {}
// 4.可重入
for (int i = 0; i < 100; i++) {
synchronized (this) {}
}
// **************************ReentrantLock的使用方式**************************
public void test () throw Exception {
// 1.初始化选择公平锁、非公平锁
ReentrantLock lock = new ReentrantLock(true);
// 2.可用于代码块
lock.lock();
try {
try {
// 3.支持多种加锁方式,比较灵活; 具有可重入特性
if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
} finally {
// 4.手动释放锁
lock.unlock()
}
} finally {
lock.unlock();
}
}

1.2 ReentrantLock 与 AQS 的关联

通过上文我们已经了解,ReentrantLock 支持公平锁和非公平锁(关于公平锁和非公平锁的原理分析,可参考《不可不说的 Java“锁”事》),并且 ReentrantLock 的底层就是由 AQS 来实现的。那么 ReentrantLock 是如何通过公平锁和非公平锁与 AQS 关联起来呢? 我们着重从这两者的加锁过程来理解一下它们与 AQS 之间的关系(加锁过程中与 AQS 的关联比较明显,解锁流程后续会介绍)。

非公平锁源码中的加锁流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// java.util.concurrent.locks.ReentrantLock#NonfairSync

// 非公平锁
static final class NonfairSync extends Sync {
...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}

这块代码的含义为:

  • 若通过 CAS 设置变量 State(同步状态)成功,也就是获取锁成功,则将当前线程设置为独占线程。
  • 若通过 CAS 设置变量 State(同步状态)失败,也就是获取锁失败,则进入 Acquire 方法进行后续处理。

第一步很好理解,但第二步获取锁失败后,后续的处理策略是怎么样的呢?这块可能会有以下思考:

  • 某个线程获取锁失败的后续流程是什么呢?有以下两种可能:

(1) 将当前线程获锁结果设置为失败,获取锁流程结束。这种设计会极大降低系统的并发度,并不满足我们实际的需求。所以就需要下面这种流程,也就是 AQS 框架的处理流程。

(2) 存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。

  • 对于问题 1 的第二种情况,既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
  • 处于排队等候机制中的线程,什么时候可以有机会获取锁呢?
  • 如果处于排队等候机制中的线程一直无法获取锁,还是需要一直等待吗,还是有别的策略来解决这一问题?

带着非公平锁的这些问题,再看下公平锁源码中获锁的方式:

1
2
3
4
5
6
7
8
9
// java.util.concurrent.locks.ReentrantLock#FairSync

static final class FairSync extends Sync {
...
final void lock() {
acquire(1);
}
...
}

看到这块代码,我们可能会存在这种疑问:Lock 函数通过 Acquire 方法进行加锁,但是具体是如何加锁的呢?

结合公平锁和非公平锁的加锁流程,虽然流程上有一定的不同,但是都调用了 Acquire 方法,而 Acquire 方法是 FairSync 和 UnfairSync 的父类 AQS 中的核心方法。

对于上边提到的问题,其实在 ReentrantLock 类源码中都无法解答,而这些问题的答案,都是位于 Acquire 方法所在的类 AbstractQueuedSynchronizer 中,也就是本文的核心——AQS。下面我们会对 AQS 以及 ReentrantLock 和 AQS 的关联做详细介绍(相关问题答案会在 2.3.5 小节中解答)。

2 AQS

首先,我们通过下面的架构图来整体了解一下 AQS 框架:

  • 上图中有颜色的为 Method,无颜色的为 Attribution。
  • 总的来说,AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外暴露的 API 到底层基础数据。
  • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的 API 进入 AQS 内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

下面我们会从整体到细节,从流程到方法逐一剖析 AQS 框架,主要分析过程如下:

2.1 原理概览

AQS 核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten 队列,是单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列(FIFO),AQS 是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

主要原理图如下:

AQS 使用一个 Volatile 的 int 类型的成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作,通过 CAS 完成对 State 值的修改。

2.1.1 AQS 数据结构

先来看下 AQS 中最基本的数据结构——Node,Node 即为上面 CLH 变体队列中的节点。

解释一下几个方法和属性值的含义:

方法和属性值 含义
waitStatus 当前节点在队列中的状态
thread 表示处于该节点的线程
prev 前驱指针
predecessor 返回前驱节点,没有的话抛出 npe
nextWaiter 指向下一个处于 CONDITION 状态的节点(由于本篇文章不讲述 Condition Queue 队列,这个指针不多介绍)
next 后继指针

线程两种锁的模式:

模式 含义
SHARED 表示线程以共享的模式等待锁
EXCLUSIVE 表示线程正在以独占的方式等待锁

waitStatus 有下面几个枚举值:

枚举 含义
0 当一个 Node 被初始化的时候的默认值
CANCELLED 为 1,表示线程获取锁的请求已经取消了
CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE 为-3,当前线程处在 SHARED 情况下,该字段才会使用
SIGNAL 为-1,表示线程已经准备好了,就等资源释放了

2.1.2 同步状态 State

在了解数据结构后,接下来了解一下 AQS 的同步状态——State。AQS 中维护了一个名为 state 的字段,意为同步状态,是由 Volatile 修饰的,用于展示当前临界资源的获锁情况。

1
2
3
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;

下面提供了几个访问这个字段的方法:

方法名 描述
protected final int getState() 获取 State 的值
protected final void setState(int newState) 设置 State 的值
protected final boolean compareAndSetState(int expect, int update) 使用 CAS 方式更新 State

这几个方法都是 Final 修饰的,说明子类中无法重写它们。我们可以通过修改 State 字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。

对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是 AQS 架构图中的第一层:API 层。

2.2 AQS 重要方法与 ReentrantLock 的关联

从架构图中可以得知,AQS 提供了大量用于自定义同步器实现的 Protected 方法。自定义同步器实现的相关方法也只是为了通过修改 State 字段来实现多线程的独占模式或者共享模式。自定义同步器需要实现以下方法(ReentrantLock 需要实现的方法如下,并不是全部):

方法名 描述
protected boolean isHeldExclusively() 该线程是否正在独占资源。只有用到 Condition 才需要去实现它。
protected boolean tryAcquire(int arg) 独占方式。arg 为获取锁的次数,尝试获取资源,成功则返回 True,失败则返回 False。
protected boolean tryRelease(int arg) 独占方式。arg 为释放锁的次数,尝试释放资源,成功则返回 True,失败则返回 False。
protected int tryAcquireShared(int arg) 共享方式。arg 为获取锁的次数,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int arg) 共享方式。arg 为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回 True,否则返回 False。

一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。ReentrantLock 是独占锁,所以实现了 tryAcquire-tryRelease。

以非公平锁为例,这里主要阐述一下非公平锁与 AQS 之间方法的关联之处,具体每一处核心方法的作用会在文章后面详细进行阐述。

🐛 修正(参见:issue#1761): 图中的一处小错误,(AQS)CAS 修改共享资源 State 成功之后应该是获取锁成功(非公平锁)。

对应的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//获取当前线程
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {//CAS抢锁
setExclusiveOwnerThread(current);//设置当前线程为独占线程
return true;//抢锁成功
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

为了帮助大家理解 ReentrantLock 和 AQS 之间方法的交互过程,以非公平锁为例,我们将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。

加锁:

  • 通过 ReentrantLock 的加锁方法 Lock 进行加锁操作。
  • 会调用到内部类 Sync 的 Lock 方法,由于 Sync#lock 是抽象方法,根据 ReentrantLock 初始化选择的公平锁和非公平锁,执行相关内部类的 Lock 方法,本质上都会执行 AQS 的 Acquire 方法。
  • AQS 的 Acquire 方法会执行 tryAcquire 方法,但是由于 tryAcquire 需要自定义同步器实现,因此执行了 ReentrantLock 中的 tryAcquire 方法,由于 ReentrantLock 是通过公平锁和非公平锁内部类实现的 tryAcquire 方法,因此会根据锁类型不同,执行不同的 tryAcquire。
  • tryAcquire 是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟 ReentrantLock 自定义同步器无关。

解锁:

  • 通过 ReentrantLock 的解锁方法 Unlock 进行解锁。
  • Unlock 会调用内部类 Sync 的 Release 方法,该方法继承于 AQS。
  • Release 中会调用 tryRelease 方法,tryRelease 需要自定义同步器实现,tryRelease 只在 ReentrantLock 中的 Sync 实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
  • 释放成功后,所有处理由 AQS 框架完成,与自定义同步器无关。

通过上面的描述,大概可以总结出 ReentrantLock 加锁解锁时 API 层核心方法的映射关系。

3 通过 ReentrantLock 理解 AQS

ReentrantLock 中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。

在非公平锁中,有一段这样的代码:

1
2
3
4
5
6
7
8
9
10
11
12
// java.util.concurrent.locks.ReentrantLock

static final class NonfairSync extends Sync {
...
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
...
}

看一下这个 Acquire 是怎么写的:

1
2
3
4
5
6
// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

再看一下 tryAcquire 方法:

1
2
3
4
5
// java.util.concurrent.locks.AbstractQueuedSynchronizer

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

可以看出,这里只是 AQS 的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以 ReentrantLock 为例)。如果该方法返回了 True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。下面会详细解释线程是何时以及怎样被加入进等待队列中的。

3.1 线程加入等待队列

3.1.1 加入队列的时机

当执行 Acquire(1)时,会通过 tryAcquire 获取锁。在这种情况下,如果获取锁失败,就会调用 addWaiter 加入到等待队列中去。

3.1.2 如何加入队列

获取锁失败后,会执行 addWaiter(Node.EXCLUSIVE)加入等待队列,具体实现方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

主要的流程如下:

  • 通过当前的线程和锁模式新建一个节点。
  • Pred 指针指向尾节点 Tail。
  • 将 New 中 Node 的 Prev 指针指向 Pred。
  • 通过 compareAndSetTail 方法,完成尾节点的设置。这个方法主要是对 tailOffset 和 Expect 进行比较,如果 tailOffset 的 Node 和 Expect 的 Node 地址是相同的,那么设置 Tail 的值为 Update 的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
// java.util.concurrent.locks.AbstractQueuedSynchronizer

static {
try {
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception ex) {
throw new Error(ex);
}
}

从 AQS 的静态代码块可以看出,都是获取一个对象的属性相对于该对象在内存当中的偏移量,这样我们就可以根据这个偏移量在对象内存当中找到这个属性。tailOffset 指的是 tail 对应的偏移量,所以这个时候会将 new 出来的 Node 置为当前队列的尾节点。同时,由于是双向链表,也需要将前一个节点指向尾节点。

  • 如果 Pred 指针是 Null(说明等待队列中没有元素),或者当前 Pred 指针和 Tail 指向的位置不同(说明被别的线程已经修改),就需要看一下 Enq 的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

如果没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter 就是一个在双端链表添加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。

总结一下,线程获取锁的时候,过程大体如下:

1、当没有线程获取到锁时,线程 1 获取锁成功。

2、线程 2 申请锁,但是锁被线程 1 占有。

img

3、如果再有线程要获取锁,依次在队列中往后排队即可。

回到上边的代码,hasQueuedPredecessors 是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回 False,说明当前线程可以争取共享资源;如果返回 True,说明队列中存在有效节点,当前线程必须加入到等待队列中。

1
2
3
4
5
6
7
8
9
10
11
// java.util.concurrent.locks.ReentrantLock

public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

看到这里,我们理解一下 h != t && ((s = h.next) == null || s.thread != Thread.currentThread());为什么要判断的头结点的下一个节点?第一个节点储存的数据是什么?

双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。当 h != t 时:如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了 Tail 指向 Head,没有将 Head 指向 Tail,此时队列中有元素,需要返回 True(这块具体见下边代码分析)。 如果(s = h.next) != null,说明此时队列中至少有一个有效节点。如果此时 s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;如果 s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。

1
2
3
4
5
6
7
8
9
10
11
12
// java.util.concurrent.locks.AbstractQueuedSynchronizer#enq

if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}

节点入队不是原子操作,所以会出现短暂的 head != tail,此时 Tail 指向最后一个节点,而且 Tail 指向 Head。如果 Head 没有指向 Tail(可见 5、6、7 行),这种情况下也需要将相关线程加入队列中。所以这块代码是为了解决极端情况下的并发问题。

3.1.3 等待队列中线程出队列时机

回到最初的源码:

1
2
3
4
5
6
// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

上文解释了 addWaiter 方法,这个方法其实就是把对应的线程以 Node 的数据结构形式加入到双端队列里,返回的是一个包含该线程的 Node。而这个 Node 会作为参数,进入到 acquireQueued 方法中。acquireQueued 方法可以对排队中的线程进行“获锁”操作。

总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

下面我们从“何时出队列?”和“如何出队列?”两个方向来分析一下 acquireQueued 源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

注:setHead 方法是把当前节点置为虚节点,但并没有修改 waitStatus,因为它是一直需要用的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer

// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取头结点的节点状态
int ws = pred.waitStatus;
// 说明头结点处于唤醒状态
if (ws == Node.SIGNAL)
return true;
// 通过枚举值我们知道waitStatus>0是取消状态
if (ws > 0) {
do {
// 循环向前查找取消节点,把取消节点从队列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前任节点等待状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

parkAndCheckInterrupt 主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。

1
2
3
4
5
6
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

上述方法的流程图如下:

从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致 CPU 资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire 流程):

从队列中释放节点的疑虑打消了,那么又有新问题了:

  • shouldParkAfterFailedAcquire 中取消节点是怎么生成的呢?什么时候会把一个节点的 waitStatus 设置为-1?
  • 是在什么时间释放节点通知到被挂起的线程呢?

3.2 CANCELLED 状态节点生成

acquireQueued 方法中的 Finally 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
...
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
...
failed = false;
...
}
...
} finally {
if (failed)
cancelAcquire(node);
}
}

通过 cancelAcquire 方法,将 Node 的状态标记为 CANCELLED。接下来,我们逐行来分析这个方法的原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SIGNAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}

当前的流程:

  • 获取当前节点的前驱节点,如果前驱节点的状态是 CANCELLED,那就一直往前遍历,找到第一个 waitStatus <= 0 的节点,将找到的 Pred 节点和当前 Node 关联,将当前 Node 设置为 CANCELLED。
  • 根据当前节点的位置,考虑以下三种情况:

(1) 当前节点是尾节点。

(2) 当前节点是 Head 的后继节点。

(3) 当前节点不是 Head 的后继节点,也不是尾节点。

根据上述第二条,我们来分析每一种情况的流程。

当前节点是尾节点。

当前节点是 Head 的后继节点。

当前节点不是 Head 的后继节点,也不是尾节点。

通过上面的流程,我们对于 CANCELLED 节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?什么情况下会对 Prev 指针进行操作?

执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 方法了),如果此时修改 Prev 指针,有可能会导致 Prev 指向另一个已经移除队列的 Node,因此这块变化 Prev 指针不安全。 shouldParkAfterFailedAcquire 方法中,会执行下面的代码,其实就是在处理 Prev 指针。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 Prev 指针比较安全。

1
2
3
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

3.3 如何解锁

我们已经剖析了加锁过程中的基本流程,接下来再对解锁的基本流程进行分析。由于 ReentrantLock 在解锁的时候,并不区分公平锁和非公平锁,所以我们直接看解锁的源码:

1
2
3
4
5
// java.util.concurrent.locks.ReentrantLock

public void unlock() {
sync.release(1);
}

可以看到,本质释放锁的地方,是通过框架来完成的。

1
2
3
4
5
6
7
8
9
10
11
// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

在 ReentrantLock 里面的公平锁和非公平锁的父类 Sync 定义了可重入锁的释放锁机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// java.util.concurrent.locks.ReentrantLock.Sync

// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

我们来解释下述源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final boolean release(int arg) {
// 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

这里的判断条件为什么是 h != null && h.waitStatus != 0?

h == null Head 还没初始化。初始情况下,head == null,第一个节点入队,Head 会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现 head == null 的情况。

h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。

h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。

再看一下 unparkSuccessor 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void unparkSuccessor(Node node) {
// 获取头结点waitStatus
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的下一个节点
Node s = node.next;
// 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
// 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
if (s != null)
LockSupport.unpark(s.thread);
}

为什么要从后往前找第一个非 Cancelled 的节点呢?原因如下。

之前的 addWaiter 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

我们从这里可以看到,节点入队并不是原子操作,也就是说,node.prev = pred; compareAndSetTail(pred, node) 这两个地方可以看作 Tail 入队的原子操作,但是此时 pred.next = node;还没执行,如果这个时候执行了 unparkSuccessor 方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生 CANCELLED 状态节点的时候,先断开的是 Next 指针,Prev 指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的 Node。

综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和 CANCELLED 节点产生过程中断开 Next 指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。继续执行 acquireQueued 方法以后,中断如何处理?

3.4 中断恢复后的执行流程

唤醒后,会执行 return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除。

1
2
3
4
5
6
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

再回到 acquireQueued 代码,当 parkAndCheckInterrupt 返回 True 或者 False 的时候,interrupted 的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前 interrupted 返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

如果 acquireQueued 为 True,就会执行 selfInterrupt 方法。

1
2
3
4
5
// java.util.concurrent.locks.AbstractQueuedSynchronizer

static void selfInterrupt() {
Thread.currentThread().interrupt();
}

该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于 Java 提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:

  1. 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过 Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为 False),并记录下来,如果发现该线程被中断过,就再中断一次。
  2. 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。

这里的处理方式主要是运用线程池中基本运作单元 Worder 中的 runWorker,通过 Thread.interrupted()进行额外的判断处理,感兴趣的同学可以看下 ThreadPoolExecutor 源码。

3.5 小结

我们在 1.3 小节中提出了一些问题,现在来回答一下。

Q:某个线程获取锁失败的后续流程是什么呢?

A:存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。

Q:既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

A:是 CLH 变体的 FIFO 双端队列。

Q:处于排队等候机制中的线程,什么时候可以有机会获取锁呢?

A:可以详细看下 2.3.1.3 小节。

Q:如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问题?

A:线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放,具体可见 2.3.2 小节。

Q:Lock 函数通过 Acquire 方法进行加锁,但是具体是如何加锁的呢?

A:AQS 的 Acquire 会调用 tryAcquire 方法,tryAcquire 由各个自定义同步器实现,通过 tryAcquire 完成加锁过程。

4 AQS 应用

4.1 ReentrantLock 的可重入应用

ReentrantLock 的可重入性是 AQS 很好的应用之一,在了解完上述知识点以后,我们很容易得知 ReentrantLock 实现可重入的方法。在 ReentrantLock 里面,不管是公平锁还是非公平锁,都有一段逻辑。

公平锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire

if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}

非公平锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire

if (c == 0) {
if (compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}

从上面这两段都可以看到,有一个同步状态 State 来控制整体可重入的情况。State 是 Volatile 修饰的,用于保证一定的可见性和有序性。

1
2
3
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private volatile int state;

接下来看 State 这个字段主要的过程:

  1. State 初始化的时候为 0,表示没有任何线程持有锁。
  2. 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
  3. 解锁也是对这个字段-1,一直到 0,此线程对锁释放。

4.2 JUC 中的应用场景

除了上边 ReentrantLock 的可重入性的应用,AQS 作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了 JUC 中的几种同步工具,大体介绍一下 AQS 的应用场景:

同步工具 同步工具与 AQS 的关联
ReentrantLock 使用 AQS 保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock 记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。
Semaphore 使用 AQS 同步状态来保存信号量的当前计数。tryRelease 会增加计数,acquireShared 会减少计数。
CountDownLatch 使用 AQS 同步状态来表示计数。计数为 0 时,所有的 Acquire 操作(CountDownLatch 的 await 方法)才可以通过。
ReentrantReadWriteLock 使用 AQS 同步状态中的 16 位保存写锁持有的次数,剩下的 16 位用于保存读锁的持有次数。
ThreadPoolExecutor Worker 利用 AQS 同步状态实现对独占线程变量的设置(tryAcquire 和 tryRelease)。

4.3 自定义同步工具

了解 AQS 基本原理以后,按照上面所说的 AQS 知识点,自己实现一个同步工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class LeeLock  {

private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}

@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}

@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}

private Sync sync = new Sync();

public void lock () {
sync.acquire(1);
}

public void unlock () {
sync.release(1);
}
}

通过我们自己定义的 Lock 完成一定的同步功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class LeeMain {

static int count = 0;
static LeeLock leeLock = new LeeLock();

public static void main (String[] args) throws InterruptedException {

Runnable runnable = new Runnable() {
@Override
public void run () {
try {
leeLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
leeLock.unlock();
}

}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}

上述代码每次运行结果都会是 20000。通过简单的几行代码就能实现同步功能,这就是 AQS 的强大之处。

5 总结

我们日常开发中使用并发的场景太多,但是对并发内部的基本框架原理了解的人却不多。由于篇幅原因,本文仅介绍了可重入锁 ReentrantLock 的原理和 AQS 原理,希望能够成为大家了解 AQS 和 ReentrantLock 等同步器的“敲门砖”。

参考资料

  • Lea D. The java. util. concurrent synchronizer framework[J]. Science of Computer Programming, 2005, 58(3): 293-309.
  • 《Java 并发编程实战》
  • 不可不说的 Java“锁”事

缓存基础常见面试题总结(付费)

发表于 2024-03-14 | 分类于 分布式 , redis | 阅读次数:
字数统计: 51 字 | 阅读时长 ≈ 1 分钟

缓存基础 相关的面试题为我的 知识星球(点击链接即可查看详细介绍以及加入方法)专属内容,已经整理到了《Java 面试指北》中。

AQS 详解

发表于 2024-03-02 | 分类于 Java , 并发 | 阅读次数:
字数统计: 6.8k 字 | 阅读时长 ≈ 27 分钟

AQS 介绍

AQS 的全称为 AbstractQueuedSynchronizer ,翻译过来的意思就是抽象队列同步器。这个类在 java.util.concurrent.locks 包下面。

AQS 就是一个抽象类,主要用来构建锁和同步器。

1
2
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
}

AQS 为构建锁和同步器提供了一些通用功能的实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue等等皆是基于 AQS 的。

AQS 原理

在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。

AQS 核心思想

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 CLH 锁 (Craig, Landin, and Hagersten locks) 实现的。

CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

CLH 队列结构如下图所示:

CLH 队列结构

关于 AQS 核心数据结构-CLH 锁的详细解读,强烈推荐阅读 Java AQS 核心数据结构-CLH 锁 - Qunar 技术沙龙 这篇文章。

AQS(AbstractQueuedSynchronizer)的核心原理图:

CLH 队列

AQS 使用 int 成员变量 state 表示同步状态,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。

state 变量由 volatile 修饰,用于展示当前临界资源的获锁情况。

1
2
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;

另外,状态信息 state 可以通过 protected 类型的getState()、setState()和compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。

1
2
3
4
5
6
7
8
9
10
11
12
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

以可重入的互斥锁 ReentrantLock 为例,它的内部维护了一个 state 变量,用来表示锁的占用状态。state 的初始值为 0,表示锁处于未锁定状态。当线程 A 调用 lock() 方法时,会尝试通过 tryAcquire() 方法独占该锁,并让 state 的值加 1。如果成功了,那么线程 A 就获取到了锁。如果失败了,那么线程 A 就会被加入到一个等待队列(CLH 队列)中,直到其他线程释放该锁。假设线程 A 获取锁成功了,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 state 的值回到 0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁。

线程 A 尝试获取锁的过程如下图所示(图源从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队):

AQS 独占模式获取锁

再以倒计时器 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程开始执行任务,每执行完一个子线程,就调用一次 countDown() 方法。该方法会尝试使用 CAS(Compare and Swap) 操作,让 state 的值减少 1。当所有的子线程都执行完毕后(即 state 的值变为 0),CountDownLatch 会调用 unpark() 方法,唤醒主线程。这时,主线程就可以从 await() 方法(CountDownLatch 中的await() 方法而非 AQS 中的)返回,继续执行后续的操作。

AQS 资源共享方式

AQS 定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

自定义同步器

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:

1
2
3
4
5
6
7
8
9
10
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()

什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,一般使用 protected 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。

篇幅问题,这里就不详细介绍模板方法模式了,不太了解的小伙伴可以看看这篇文章:用 Java8 改造后的模板方法模式真的是 yyds!。

除了上面提到的钩子方法之外,AQS 类中的其他方法都是 final ,所以无法被其他类重写。

常见同步工具类

下面介绍几个基于 AQS 的常见同步工具类。

Semaphore(信号量)

介绍

synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,而Semaphore(信号量)可以用来控制同时访问特定资源的线程数量。

Semaphore 的使用简单,我们这里假设有 N(N>5) 个线程来获取 Semaphore 中的共享资源,下面的代码表示同一时刻 N 个线程中只有 5 个线程能获取到共享资源,其他线程都会阻塞,只有获取到共享资源的线程才能执行。等到有线程释放了共享资源,其他阻塞的线程才能获取到。

1
2
3
4
5
6
// 初始共享资源数量
final Semaphore semaphore = new Semaphore(5);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();

当初始的资源个数为 1 的时候,Semaphore 退化为排他锁。

Semaphore 有两种模式:。

  • 公平模式: 调用 acquire() 方法的顺序就是获取许可证的顺序,遵循 FIFO;
  • 非公平模式: 抢占式的。

Semaphore 对应的两个构造方法如下:

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。

Semaphore 通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。

原理

Semaphore 是共享锁的一种实现,它默认构造 AQS 的 state 值为 permits,你可以将 permits 的值理解为许可证的数量,只有拿到许可证的线程才能执行。

以无参 acquire 方法为例,调用semaphore.acquire() ,线程尝试获取许可证,如果 state > 0 的话,则表示可以获取成功,如果 state <= 0 的话,则表示许可证数量不足,获取失败。

如果可以获取成功的话(state > 0 ),会尝试使用 CAS 操作去修改 state 的值 state=state-1。如果获取失败则会创建一个 Node 节点加入等待队列,挂起当前线程。

1
2
3
4
5
6
7
8
9
10
// 获取1个许可证
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 获取一个或者多个许可证
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

acquireSharedInterruptibly方法是 AbstractQueuedSynchronizer 中的默认实现。

1
2
3
4
5
6
7
8
9
// 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入等待队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

这里再以非公平模式(NonfairSync)的为例,看看 tryAcquireShared 方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 共享模式下尝试获取资源(在Semaphore中的资源即许可证):
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

// 非公平的共享模式获取许可证
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 当前可用许可证数量
int available = getState();
/*
* 尝试获取许可证,当前可用许可证数量小于等于0时,返回负值,表示获取失败,
* 当前可用许可证大于0时才可能获取成功,CAS失败了会循环重新获取最新的值尝试获取
*/
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

以无参 release 方法为例,调用semaphore.release(); ,线程尝试释放许可证,并使用 CAS 操作去修改 state 的值 state=state+1。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改 state 的值 state=state-1 ,如果 state > 0 则获取令牌成功,否则重新进入等待队列,挂起线程。

1
2
3
4
5
6
7
8
9
10
// 释放一个许可证
public void release() {
sync.releaseShared(1);
}

// 释放一个或者多个许可证
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

releaseShared方法是 AbstractQueuedSynchronizer 中的默认实现。

1
2
3
4
5
6
7
8
9
10
11
// 释放共享锁
// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//释放当前节点的后置等待节点
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared 方法是Semaphore 的内部类 Sync 重写的一个方法, AbstractQueuedSynchronizer中的默认实现仅仅抛出 UnsupportedOperationException 异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 内部类 Sync 中重写的一个方法
// 尝试释放资源
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
// 可用许可证+1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS修改state的值
if (compareAndSetState(current, next))
return true;
}
}

可以看到,上面提到的几个方法底层基本都是通过同步器 sync 实现的。Sync 是 CountDownLatch 的内部类 , 继承了 AbstractQueuedSynchronizer ,重写了其中的某些方法。并且,Sync 对应的还有两个子类 NonfairSync(对应非公平模式) 和 FairSync(对应公平模式)。

1
2
3
4
5
6
7
8
9
private static final class Sync extends AbstractQueuedSynchronizer {
// ...
}
static final class NonfairSync extends Sync {
// ...
}
static final class FairSync extends Sync {
// ...
}

实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SemaphoreExample {
// 请求的数量
private static final int threadCount = 550;

public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
// 初始许可证数量
final Semaphore semaphore = new Semaphore(20);

for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20
test(threadnum);
semaphore.release();// 释放一个许可
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

});
}
threadPool.shutdown();
System.out.println("finish");
}

public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}

执行 acquire() 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 release 方法增加一个许可证,这可能会释放一个阻塞的 acquire() 方法。然而,其实并没有实际的许可证这个对象,Semaphore 只是维持了一个可获得许可证的数量。 Semaphore 经常用于限制获取某种资源的线程数量。

当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:

1
2
3
semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4
test(threadnum);
semaphore.release(5);// 释放5个许可

除了 acquire() 方法之外,另一个比较常用的与之对应的方法是 tryAcquire() 方法,该方法如果获取不到许可就立即返回 false。

issue645 补充内容:

Semaphore 基于 AQS 实现,用于控制并发访问的线程数量,但它与共享锁的概念有所不同。Semaphore 的构造函数使用 permits 参数初始化 AQS 的 state 变量,该变量表示可用的许可数量。当线程调用 acquire() 方法尝试获取许可时,state 会原子性地减 1。如果 state 减 1 后大于等于 0,则 acquire() 成功返回,线程可以继续执行。如果 state 减 1 后小于 0,表示当前并发访问的线程数量已达到 permits 的限制,该线程会被放入 AQS 的等待队列并阻塞,而不是自旋等待。当其他线程完成任务并调用 release() 方法时,state 会原子性地加 1。release() 操作会唤醒 AQS 等待队列中的一个或多个阻塞线程。这些被唤醒的线程将再次尝试 acquire() 操作,竞争获取可用的许可。因此,Semaphore 通过控制许可数量来限制并发访问的线程数量,而不是通过自旋和共享锁机制。

CountDownLatch (倒计时器)

介绍

CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

原理

CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。这个我们通过 CountDownLatch 的构造方法即可看出。

1
2
3
4
5
6
7
8
9
10
11
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
//...
}

当线程调用 countDown() 时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当 state 为 0 时,表示所有的线程都调用了 countDown 方法,那么在 CountDownLatch 上等待的线程就会被唤醒并继续执行。

1
2
3
4
public void countDown() {
// Sync 是 CountDownLatch 的内部类 , 继承了 AbstractQueuedSynchronizer
sync.releaseShared(1);
}

releaseShared方法是 AbstractQueuedSynchronizer 中的默认实现。

1
2
3
4
5
6
7
8
9
10
11
// 释放共享锁
// 如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//释放当前节点的后置等待节点
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared 方法是CountDownLatch 的内部类 Sync 重写的一个方法, AbstractQueuedSynchronizer中的默认实现仅仅抛出 UnsupportedOperationException 异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 对 state 进行递减,直到 state 变成 0;
// 只有 count 递减到 0 时,countDown 才会返回 true
protected boolean tryReleaseShared(int releases) {
// 自选检查 state 是否为 0
for (;;) {
int c = getState();
// 如果 state 已经是 0 了,直接返回 false
if (c == 0)
return false;
// 对 state 进行递减
int nextc = c-1;
// CAS 操作更新 state 的值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

以无参 await方法为例,当调用 await() 的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 就会一直阻塞,也就是说 await() 之后的语句不会被执行(main 线程被加入到等待队列也就是 CLH 队列中了)。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。

1
2
3
4
5
6
7
8
9
// 等待(也可以叫做加锁)
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 带有超时时间的等待
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

acquireSharedInterruptibly方法是 AbstractQueuedSynchronizer 中的默认实现。

1
2
3
4
5
6
7
8
9
10
// 尝试获取锁,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获得锁,获取成功则返回
if (tryAcquireShared(arg) < 0)
// 获取失败加入等待队列,挂起线程
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared 方法是CountDownLatch 的内部类 Sync 重写的一个方法,其作用就是判断 state 的值是否为 0,是的话就返回 1,否则返回 -1。

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

实战

CountDownLatch 的两种典型用法:

  1. 某一线程在开始运行前等待 n 个线程执行完毕 : 将 CountDownLatch 的计数器初始化为 n (new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减 1 (countdownlatch.countDown()),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  2. 实现多个线程开始执行任务的最大并行性:注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 (new CountDownLatch(1)),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。

CountDownLatch 代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class CountDownLatchExample {
// 请求的数量
private static final int THREAD_COUNT = 550;

public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
// 只是测试使用,实际场景请手动赋值线程池参数
ExecutorService threadPool = Executors.newFixedThreadPool(300);
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadNum = i;
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 表示一个请求已经被完成
countDownLatch.countDown();
}

});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}

public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);
System.out.println("threadNum:" + threadnum);
Thread.sleep(1000);
}
}

上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行System.out.println("finish");。

与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await()方法,恢复执行自己的任务。

再插一嘴:CountDownLatch 的 await() 方法使用不当很容易产生死锁,比如我们上面代码中的 for 循环改为:

1
2
3
for (int i = 0; i < threadCount-1; i++) {
.......
}

这样就导致 count 的值没办法等于 0,然后就会导致一直等待。

CyclicBarrier(循环栅栏)

介绍

CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

原理

CyclicBarrier 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减 1。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。

1
2
3
4
//每次拦截的线程数
private final int parties;
//计数器
private int count;

下面我们结合源码来简单看看。

1、CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用 await() 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

1
2
3
4
5
6
7
8
9
10
public CyclicBarrier(int parties) {
this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

其中,parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。

2、当调用 CyclicBarrier 对象调用 await() 方法时,实际上调用的是 dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties 的值时,栅栏才会打开,线程才得以通过执行。

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

dowait(false, 0L)方法源码分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 锁住
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

// 如果线程中断了,抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// count 减1
int index = --count;
// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 将 count 重置为 parties 属性的初始化值
// 唤醒之前等待的线程
// 下一波执行开始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

实战

示例 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CyclicBarrierExample1 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}

public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒,保证子线程完全执行结束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}

}

运行结果,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......

可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await() 方法之后的方法才被执行。

另外,CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。

示例 2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CyclicBarrierExample2 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("------当线程数达到之后,优先执行------");
});

public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}

public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
cyclicBarrier.await();
System.out.println("threadnum:" + threadnum + "is finish");
}

}

运行结果,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------当线程数达到之后,优先执行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------当线程数达到之后,优先执行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......

参考

  • Java 并发之 AQS 详解:https://www.cnblogs.com/waterystone/p/4920797.html
  • 从 ReentrantLock 的实现看 AQS 的原理及应用:https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

Java 线程池详解

发表于 2024-02-21 | 分类于 Java , 并发 | 阅读次数:
字数统计: 8.5k 字 | 阅读时长 ≈ 33 分钟

池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、HTTP 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

这篇文章我会详细介绍一下线程池的基本概念以及核心原理。

线程池介绍

顾名思义,线程池就是管理一系列线程的资源池,其提供了一种限制和管理线程资源的方式。每个线程池还维护一些基本统计信息,例如已完成任务的数量。

这里借用《Java 并发编程的艺术》书中的部分内容来总结一下使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。

Executor 框架介绍

Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。

this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发令人疑惑的错误。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

Executor 框架结构主要由三大部分组成:

1、任务(Runnable /Callable)

执行任务需要实现的 Runnable 接口 或 Callable接口。**Runnable 接口**或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

2、任务的执行(Executor)

如下图所示,包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。

这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。

注意: 通过查看 ScheduledThreadPoolExecutor 源代码我们发现 ScheduledThreadPoolExecutor 实际上是继承了 ThreadPoolExecutor 并实现了 ScheduledExecutorService ,而 ScheduledExecutorService 又实现了 ExecutorService,正如我们上面给出的类关系图显示的一样。

ThreadPoolExecutor 类描述:

1
2
//AbstractExecutorService实现了ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService

ScheduledThreadPoolExecutor 类描述:

1
2
3
4
//ScheduledExecutorService继承ExecutorService接口
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService

3、异步计算的结果(Future)

Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。

当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

Executor 框架的使用示意图:

Executor 框架的使用示意图

  1. 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
  2. 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable <T> task))。
  3. 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
  4. 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

ThreadPoolExecutor 类介绍(重要)

线程池实现类 ThreadPoolExecutor 是 Executor 框架最核心的类。

线程池参数分析

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

下面这些参数非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数 :

  • keepAliveTime:线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :executor 创建新线程的时候会用到。
  • handler :拒绝策略(后面会单独详细介绍一下)。

下面这张图可以加深你对线程池中各个参数的相互关系的理解(图片来源:《Java 性能调优实战》):

线程池各个参数的关系

ThreadPoolExecutor 拒绝策略定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。

举个例子:

举个例子:Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 拒绝策略来配置线程池的时候,默认使用的是 AbortPolicy。在这种拒绝策略下,如果队列满了,ThreadPoolExecutor 将抛出 RejectedExecutionException 异常来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。如果不想丢弃任务的话,可以使用CallerRunsPolicy。CallerRunsPolicy 和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务

1
2
3
4
5
6
7
8
9
10
11
public static class CallerRunsPolicy implements RejectedExecutionHandler {

public CallerRunsPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 直接主线程执行,而不是线程池中的线程执行
r.run();
}
}
}

线程池创建的两种方式

方式一:通过ThreadPoolExecutor构造函数来创建(推荐)。

通过构造方法实现

方式二:通过 Executor 框架的工具类 Executors 来创建。

Executors工具类提供的创建线程池的方法如下图所示:

可以看出,通过Executors工具类可以创建多种类型的线程池,包括:

  • FixedThreadPool:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
  • ScheduledThreadPool:给定的延迟后运行任务或者定期执行任务的线程池。

《阿里巴巴 Java 开发手册》强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors 返回线程池对象的弊端如下(后文会详细介绍到):

  • FixedThreadPool 和 SingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。
  • ScheduledThreadPool 和 SingleThreadScheduledExecutor:使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 无界队列 LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

}

// 无界队列 LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));

}

// 同步队列 SynchronousQueue,没有容量,最大线程数是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

}

// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

线程池常用的阻塞队列总结

新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。

  • 容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue(无界队列):FixedThreadPool 和 SingleThreadExector 。FixedThreadPool最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExector只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。
  • SynchronousQueue(同步队列):CachedThreadPool 。SynchronousQueue 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。
  • DelayedWorkQueue(延迟阻塞队列):ScheduledThreadPool 和 SingleThreadScheduledExecutor 。DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

线程池原理分析(重要)

我们上面讲解了 Executor框架以及 ThreadPoolExecutor 类,下面让我们实战一下,来通过写一个 ThreadPoolExecutor 的小 Demo 来回顾上面的内容。

线程池示例代码

首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们后面会介绍两者的区别。)

MyRunnable.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.Date;

/**
* 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
* @author shuang.kou
*/
public class MyRunnable implements Runnable {

private String command;

public MyRunnable(String s) {
this.command = s;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
}

private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return this.command;
}
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

ThreadPoolExecutorDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {

//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

for (int i = 0; i < 10; i++) {
//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

可以看到我们上面的代码指定了:

  • corePoolSize: 核心线程数为 5。
  • maximumPoolSize:最大线程数 10
  • keepAliveTime : 等待时间为 1L。
  • unit: 等待时间的单位为 TimeUnit.SECONDS。
  • workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  • handler:拒绝策略为 CallerRunsPolicy。

输出结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
Finished all threads // 任务全部执行完了才会跳出来,因为executor.isTerminated()判断为true了才会跳出while循环,当且仅当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

线程池原理分析

我们通过前面的代码输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)

现在,我们就分析上面的输出内容来简单分析一下线程池原理。

为了搞懂线程池的原理,我们需要首先分析一下 execute方法。 在示例代码中,我们使用 executor.execute(worker)来提交一个任务到线程池中去。

这个方法非常重要,下面我们来看看它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
return c & CAPACITY;
}
//任务队列
private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();

// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里,表明创建新的线程失败。
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前工作线程数量为0,新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
// 传入 false 代表增加线程时判断当前线程数是否少于 maxPoolSize
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}

这里简单分析一下整个流程(对整个逻辑进行了简化,方便理解):

  1. 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
  2. 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
  3. 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
  4. 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。

图解线程池实现原理

在 execute 方法中,多次调用 addWorker 方法。addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
 // 全局锁,并发操作必备
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合
private int largestPoolSize;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}


/**
* 添加新的工作线程到线程池
* @param firstTask 要执行
* @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
* @return 添加成功就返回true否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//这两句用来获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
//获取线程池中工作的线程的数量
int wc = workerCountOf(c);
// core参数为false的话表明队列也满了,线程池大小变为 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将workcount的数量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程的状态改变了就再次执行上述操作
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {

w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
//(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
// firstTask == null证明只新建线程而不执行任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//更新当前工作线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程是否启动成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
//// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
if (workerAdded) {
t.start();
/// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从工作线程中移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

更多关于线程池源码分析的内容推荐这篇文章:硬核干货:4W 字从源码上分析 JUC 线程池 ThreadPoolExecutor 的实现原理

现在,让我们在回到示例代码, 现在应该是不是很容易就可以搞懂它的原理了呢?

没搞懂的话,也没关系,可以看看我的分析:

我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。

几个常见的对比

Runnable vs Callable

Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口不会返回结果或抛出检查异常,但是 Callable 接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。

工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。(Executors.callable(Runnable task) 或 Executors.callable(Runnable task, Object result))。

Runnable.java

1
2
3
4
5
6
7
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}

Callable.java

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}

execute() vs submit()

execute() 和 submit()是两种提交任务到线程池的方法,有一些区别:

  • 返回值:execute() 方法用于提交不需要返回值的任务。通常用于执行 Runnable 任务,无法判断任务是否被线程池成功执行。submit() 方法用于提交需要返回值的任务。可以提交 Runnable 或 Callable 任务。submit() 方法返回一个 Future 对象,通过这个 Future 对象可以判断任务是否执行成功,并获取任务的返回值(get()方法会阻塞当前线程直到任务完成, get(long timeout,TimeUnit unit)多了一个超时时间,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException)。
  • 异常处理:在使用 submit() 方法时,可以通过 Future 对象处理任务执行过程中抛出的异常;而在使用 execute() 方法时,异常处理需要通过自定义的 ThreadFactory (在线程工厂创建线程的时候设置UncaughtExceptionHandler对象来 处理异常)或 ThreadPoolExecutor 的 afterExecute() 方法来处理

示例 1:使用 get()方法获取返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 这里只是为了演示使用,推荐使用 `ThreadPoolExecutor` 构造方法来创建线程池。
ExecutorService executorService = Executors.newFixedThreadPool(3);

Future<String> submit = executorService.submit(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});

String s = submit.get();
System.out.println(s);
executorService.shutdown();

输出:

1
abc

示例 2:使用 get(long timeout,TimeUnit unit)方法获取返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService executorService = Executors.newFixedThreadPool(3);

Future<String> submit = executorService.submit(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});

String s = submit.get(3, TimeUnit.SECONDS);
System.out.println(s);
executorService.shutdown();

输出:

1
2
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)

shutdown()VSshutdownNow()

  • shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • shutdownNow() :关闭线程池,线程池的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

isTerminated() VS isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

几种常见的内置线程池

FixedThreadPool

介绍

FixedThreadPool 被称为可重用固定线程数的线程池。通过 Executors 类中的相关源代码来看一下相关实现:

1
2
3
4
5
6
7
8
9
/**
* 创建一个可重用固定数量线程的线程池
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

另外还有一个 FixedThreadPool 的实现方法,和上面的类似,所以这里不多做阐述:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

从上面源代码可以看出新创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。

即使 maximumPoolSize 的值比 corePoolSize 大,也至多只会创建 corePoolSize 个线程。这是因为FixedThreadPool 使用的是容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue(无界队列),队列永远不会被放满。

执行任务过程介绍

FixedThreadPool 的 execute() 方法运行示意图(该图片来源:《Java 并发编程的艺术》):

FixedThreadPool的execute()方法运行示意图

上图说明:

  1. 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
  2. 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入 LinkedBlockingQueue;
  3. 线程池中的线程执行完 手头的任务后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行;

为什么不推荐使用FixedThreadPool?

FixedThreadPool 使用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:

  1. 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;
  2. 由于使用无界队列时 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建 FixedThreadPool的源码可以看出创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 被设置为同一个值。
  3. 由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数;
  4. 运行中的 FixedThreadPool(未执行 shutdown()或 shutdownNow())不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。

SingleThreadExecutor

介绍

SingleThreadExecutor 是只有一个线程的线程池。下面看看SingleThreadExecutor 的实现:

1
2
3
4
5
6
7
8
9
10
/**
*返回只有一个线程的线程池
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

从上面源代码可以看出新创建的 SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 都被设置为 1,其他参数和 FixedThreadPool 相同。

执行任务过程介绍

SingleThreadExecutor 的运行示意图(该图片来源:《Java 并发编程的艺术》):

SingleThreadExecutor的运行示意图

上图说明 :

  1. 如果当前运行的线程数少于 corePoolSize,则创建一个新的线程执行任务;
  2. 当前线程池中有一个运行的线程后,将任务加入 LinkedBlockingQueue
  3. 线程执行完当前的任务后,会在循环中反复从LinkedBlockingQueue 中获取任务来执行;

为什么不推荐使用SingleThreadExecutor?

SingleThreadExecutor 和 FixedThreadPool 一样,使用的都是容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue(无界队列)作为线程池的工作队列。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点,就是可能会导致 OOM。

CachedThreadPool

介绍

CachedThreadPool 是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool 的实现:

1
2
3
4
5
6
7
8
9
10
/**
* 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

CachedThreadPool 的corePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

执行任务过程介绍

CachedThreadPool 的 execute() 方法的执行示意图(该图片来源:《Java 并发编程的艺术》):

CachedThreadPool的execute()方法的执行示意图

上图说明:

  1. 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有闲线程正在执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;
  2. 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;

为什么不推荐使用CachedThreadPool?

CachedThreadPool 使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

ScheduledThreadPool

介绍

ScheduledThreadPool 用来在给定的延迟后运行任务或者定期执行任务。这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下即可。

1
2
3
4
5
6
7
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

ScheduledThreadPool 是通过 ScheduledThreadPoolExecutor 创建的,使用的DelayedWorkQueue(延迟阻塞队列)作为线程池的任务队列。

DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。

ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,所以创建 ScheduledThreadExecutor 本质也是创建一个 ThreadPoolExecutor 线程池,只是传入的参数不相同。

1
2
3
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService

ScheduledThreadPoolExecutor 和 Timer 对比

  • Timer 对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是;
  • Timer 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;
  • 在TimerTask 中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机即计划任务将不再运行。ScheduledThreadExecutor 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 afterExecute 方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行。

关于定时任务的详细介绍,可以看这篇文章:Java 定时任务详解 。

线程池最佳实践

Java 线程池最佳实践这篇文章总结了一些使用线程池的时候应该注意的东西,实际项目使用线程池之前可以看看。

参考

  • 《Java 并发编程的艺术》
  • Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Example
  • java.util.concurrent.ScheduledThreadPoolExecutor Example
  • ThreadPoolExecutor – Java Thread Pool Example

IoC & AOP详解(快速搞懂)

发表于 2024-02-17 | 分类于 Java , 框架 , spring | 阅读次数:
字数统计: 3k 字 | 阅读时长 ≈ 11 分钟

这篇文章会从下面从以下几个问题展开对 IoC & AOP 的解释

  • 什么是 IoC?
  • IoC 解决了什么问题?
  • IoC 和 DI 的区别?
  • 什么是 AOP?
  • AOP 解决了什么问题?
  • AOP 的应用场景有哪些?
  • AOP 为什么叫做切面编程?
  • AOP 实现方式有哪些?

首先声明:IoC & AOP 不是 Spring 提出来的,它们在 Spring 之前其实已经存在了,只不过当时更加偏向于理论。Spring 在技术层次将这两个思想进行了很好的实现。

IoC (Inversion of control )

什么是 IoC?

IoC (Inversion of Control )即控制反转/反转控制。它是一种思想不是一个技术实现。描述的是:Java 开发领域对象的创建以及管理的问题。

例如:现有类 A 依赖于类 B

  • 传统的开发方式 :往往是在类 A 中手动通过 new 关键字来 new 一个 B 的对象出来
  • 使用 IoC 思想的开发方式 :不通过 new 关键字来创建对象,而是通过 IoC 容器(Spring 框架) 来帮助我们实例化对象。我们需要哪个对象,直接从 IoC 容器里面去取即可。

从以上两种开发方式的对比来看:我们 “丧失了一个权力” (创建、管理对象的权力),从而也得到了一个好处(不用再考虑对象的创建、管理等一系列的事情)

为什么叫控制反转?

  • 控制 :指的是对象创建(实例化、管理)的权力
  • 反转 :控制权交给外部环境(IoC 容器)

IoC 图解

IoC 解决了什么问题?

IoC 的思想就是两方之间不互相依赖,由第三方容器来管理相关资源。这样有什么好处呢?

  1. 对象之间的耦合度或者说依赖程度降低;
  2. 资源变的容易管理;比如你用 Spring 容器提供的话很容易就可以实现一个单例。

例如:现有一个针对 User 的操作,利用 Service 和 Dao 两层结构进行开发

在没有使用 IoC 思想的情况下,Service 层想要使用 Dao 层的具体实现的话,需要通过 new 关键字在UserServiceImpl 中手动 new 出 IUserDao 的具体实现类 UserDaoImpl(不能直接 new 接口类)。

很完美,这种方式也是可以实现的,但是我们想象一下如下场景:

开发过程中突然接到一个新的需求,针对IUserDao 接口开发出另一个具体实现类。因为 Server 层依赖了IUserDao的具体实现,所以我们需要修改UserServiceImpl中 new 的对象。如果只有一个类引用了IUserDao的具体实现,可能觉得还好,修改起来也不是很费力气,但是如果有许许多多的地方都引用了IUserDao的具体实现的话,一旦需要更换IUserDao 的实现方式,那修改起来将会非常的头疼。

IoC&Aop-ioc-illustration-dao-service

使用 IoC 的思想,我们将对象的控制权(创建、管理)交由 IoC 容器去管理,我们在使用的时候直接向 IoC 容器 “要” 就可以了

IoC 和 DI 有区别吗?

IoC(Inverse of Control:控制反转)是一种设计思想或者说是某种模式。这个设计思想就是 将原本在程序中手动创建对象的控制权交给第三方比如 IoC 容器。 对于我们常用的 Spring 框架来说, IoC 容器实际上就是个 Map(key,value),Map 中存放的是各种对象。不过,IoC 在其他语言中也有应用,并非 Spring 特有。

IoC 最常见以及最合理的实现方式叫做依赖注入(Dependency Injection,简称 DI)。

老马(Martin Fowler)在一篇文章中提到将 IoC 改名为 DI,原文如下,原文地址:https://martinfowler.com/articles/injection.html 。

老马的大概意思是 IoC 太普遍并且不表意,很多人会因此而迷惑,所以,使用 DI 来精确指名这个模式比较好。

AOP(Aspect oriented programming)

这里不会涉及太多专业的术语,核心目的是将 AOP 的思想说清楚。

什么是 AOP?

AOP(Aspect Oriented Programming)即面向切面编程,AOP 是 OOP(面向对象编程)的一种延续,二者互补,并不对立。

AOP 的目的是将横切关注点(如日志记录、事务管理、权限控制、接口限流、接口幂等等)从核心业务逻辑中分离出来,通过动态代理、字节码操作等技术,实现代码的复用和解耦,提高代码的可维护性和可扩展性。OOP 的目的是将业务逻辑按照对象的属性和行为进行封装,通过类、对象、继承、多态等概念,实现代码的模块化和层次化(也能实现代码的复用),提高代码的可读性和可维护性。

AOP 为什么叫面向切面编程?

AOP 之所以叫面向切面编程,是因为它的核心思想就是将横切关注点从核心业务逻辑中分离出来,形成一个个的切面(Aspect)。

面向切面编程图解

这里顺带总结一下 AOP 关键术语(不理解也没关系,可以继续往下看):

  • 横切关注点(cross-cutting concerns) :多个类或对象中的公共行为(如日志记录、事务管理、权限控制、接口限流、接口幂等等)。
  • 切面(Aspect):对横切关注点进行封装的类,一个切面是一个类。切面可以定义多个通知,用来实现具体的功能。
  • 连接点(JoinPoint):连接点是方法调用或者方法执行时的某个特定时刻(如方法调用、异常抛出等)。
  • 通知(Advice):通知就是切面在某个连接点要执行的操作。通知有五种类型,分别是前置通知(Before)、后置通知(After)、返回通知(AfterReturning)、异常通知(AfterThrowing)和环绕通知(Around)。前四种通知都是在目标方法的前后执行,而环绕通知可以控制目标方法的执行过程。
  • 切点(Pointcut):一个切点是一个表达式,它用来匹配哪些连接点需要被切面所增强。切点可以通过注解、正则表达式、逻辑运算等方式来定义。比如 execution(* com.xyz.service..*(..))匹配 com.xyz.service 包及其子包下的类或接口。
  • 织入(Weaving):织入是将切面和目标对象连接起来的过程,也就是将通知应用到切点匹配的连接点上。常见的织入时机有两种,分别是编译期织入(Compile-Time Weaving 如:AspectJ)和运行期织入(Runtime Weaving 如:AspectJ、Spring AOP)。

AOP 常见的通知类型有哪些?

  • Before(前置通知):目标对象的方法调用之前触发
  • After (后置通知):目标对象的方法调用之后触发
  • AfterReturning(返回通知):目标对象的方法调用完成,在返回结果值之后触发
  • AfterThrowing(异常通知):目标对象的方法运行中抛出 / 触发异常后触发。AfterReturning 和 AfterThrowing 两者互斥。如果方法调用成功无异常,则会有返回值;如果方法抛出了异常,则不会有返回值。
  • Around (环绕通知):编程式控制目标对象的方法调用。环绕通知是所有通知类型中可操作范围最大的一种,因为它可以直接拿到目标对象,以及要执行的方法,所以环绕通知可以任意的在目标对象的方法调用前后搞事,甚至不调用目标对象的方法

AOP 解决了什么问题?

OOP 不能很好地处理一些分散在多个类或对象中的公共行为(如日志记录、事务管理、权限控制、接口限流、接口幂等等),这些行为通常被称为 横切关注点(cross-cutting concerns) 。如果我们在每个类或对象中都重复实现这些行为,那么会导致代码的冗余、复杂和难以维护。

AOP 可以将横切关注点(如日志记录、事务管理、权限控制、接口限流、接口幂等等)从 核心业务逻辑(core concerns,核心关注点) 中分离出来,实现关注点的分离。

以日志记录为例进行介绍,假如我们需要对某些方法进行统一格式的日志记录,没有使用 AOP 技术之前,我们需要挨个写日志记录的逻辑代码,全是重复的的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public CommonResponse<Object> method1() {
// 业务逻辑
xxService.method1();
// 省略具体的业务处理逻辑
// 日志记录
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
// 省略记录日志的具体逻辑 如:获取各种信息,写入数据库等操作...
return CommonResponse.success();
}

public CommonResponse<Object> method2() {
// 业务逻辑
xxService.method2();
// 省略具体的业务处理逻辑
// 日志记录
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
// 省略记录日志的具体逻辑 如:获取各种信息,写入数据库等操作...
return CommonResponse.success();
}

// ...

使用 AOP 技术之后,我们可以将日志记录的逻辑封装成一个切面,然后通过切入点和通知来指定在哪些方法需要执行日志记录的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

// 日志注解
@Target({ElementType.PARAMETER,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Log {

/**
* 描述
*/
String description() default "";

/**
* 方法类型 INSERT DELETE UPDATE OTHER
*/
MethodType methodType() default MethodType.OTHER;
}

// 日志切面
@Component
@Aspect
public class LogAspect {
// 切入点,所有被 Log 注解标注的方法
@Pointcut("@annotation(cn.javaguide.annotation.Log)")
public void webLog() {
}

/**
* 环绕通知
*/
@Around("webLog()")
public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
// 省略具体的处理逻辑
}

// 省略其他代码
}

这样的话,我们一行注解即可实现日志记录:

1
2
3
4
5
6
7
@Log(description = "method1",methodType = MethodType.INSERT)
public CommonResponse<Object> method1() {
// 业务逻辑
xxService.method1();
// 省略具体的业务处理逻辑
return CommonResponse.success();
}

AOP 的应用场景有哪些?

  • 日志记录:自定义日志记录注解,利用 AOP,一行代码即可实现日志记录。
  • 性能统计:利用 AOP 在目标方法的执行前后统计方法的执行时间,方便优化和分析。
  • 事务管理:@Transactional 注解可以让 Spring 为我们进行事务管理比如回滚异常操作,免去了重复的事务管理逻辑。@Transactional注解就是基于 AOP 实现的。
  • 权限控制:利用 AOP 在目标方法执行前判断用户是否具备所需要的权限,如果具备,就执行目标方法,否则就不执行。例如,SpringSecurity 利用@PreAuthorize 注解一行代码即可自定义权限校验。
  • 接口限流:利用 AOP 在目标方法执行前通过具体的限流算法和实现对请求进行限流处理。
  • 缓存管理:利用 AOP 在目标方法执行前后进行缓存的读取和更新。
  • ……

AOP 实现方式有哪些?

AOP 的常见实现方式有动态代理、字节码操作等方式。

Spring AOP 就是基于动态代理的,如果要代理的对象,实现了某个接口,那么 Spring AOP 会使用 JDK Proxy,去创建代理对象,而对于没有实现接口的对象,就无法使用 JDK Proxy 去进行代理了,这时候 Spring AOP 会使用 Cglib 生成一个被代理对象的子类来作为代理,如下图所示:

SpringAOPProcess

当然你也可以使用 AspectJ !Spring AOP 已经集成了 AspectJ ,AspectJ 应该算的上是 Java 生态系统中最完整的 AOP 框架了。

Spring AOP 属于运行时增强,而 AspectJ 是编译时增强。 Spring AOP 基于代理(Proxying),而 AspectJ 基于字节码操作(Bytecode Manipulation)。

Spring AOP 已经集成了 AspectJ ,AspectJ 应该算的上是 Java 生态系统中最完整的 AOP 框架了。AspectJ 相比于 Spring AOP 功能更加强大,但是 Spring AOP 相对来说更简单,

如果我们的切面比较少,那么两者性能差异不大。但是,当切面太多的话,最好选择 AspectJ ,它比 Spring AOP 快很多。

<i class="fa fa-angle-left"></i>1…567…27<i class="fa fa-angle-right"></i>

264 日志
34 分类
38 标签
GitHub Zhihu Wechat
© 2024 史海杰 | Site words total count: 722k
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4