Jay's Blog

知而不行为不知


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 留言

  • 搜索

负载均衡原理及算法详解

发表于 2021-07-17 | 分类于 分布式 , 高性能 | 阅读次数:
字数统计: 4.5k 字 | 阅读时长 ≈ 16 分钟

什么是负载均衡?

负载均衡 指的是将用户请求分摊到不同的服务器上处理,以提高系统整体的并发处理能力以及可靠性。负载均衡服务可以有由专门的软件或者硬件来完成,一般情况下,硬件的性能更好,软件的价格更便宜(后文会详细介绍到)。

下图是《Java 面试指北》 「高并发篇」中的一篇文章的配图,从图中可以看出,系统的商品服务部署了多份在不同的服务器上,为了实现访问商品服务请求的分流,我们用到了负载均衡。

多服务实例-负载均衡

负载均衡是一种比较常用且实施起来较为简单的提高系统并发能力和可靠性的手段,不论是单体架构的系统还是微服务架构的系统几乎都会用到。

负载均衡分为哪几种?

负载均衡可以简单分为 服务端负载均衡 和 客户端负载均衡 这两种。

服务端负载均衡涉及到的知识点更多,工作中遇到的也比较多,因此,我会花更多时间来介绍。

服务端负载均衡

服务端负载均衡 主要应用在 系统外部请求 和 网关层 之间,可以使用 软件 或者 硬件 实现。

下图是我画的一个简单的基于 Nginx 的服务端负载均衡示意图:

基于 Nginx 的服务端负载均衡

硬件负载均衡 通过专门的硬件设备(比如 F5、A10、Array )实现负载均衡功能。

硬件负载均衡的优势是性能很强且稳定,缺点就是实在是太贵了。像基础款的 F5 最低也要 20 多万,绝大部分公司是根本负担不起的,业务量不大的话,真没必要非要去弄个硬件来做负载均衡,用软件负载均衡就足够了!

在我们日常开发中,一般很难接触到硬件负载均衡,接触的比较多的还是 软件负载均衡 。软件负载均衡通过软件(比如 LVS、Nginx、HAproxy )实现负载均衡功能,性能虽然差一些,但价格便宜啊!像基础款的 Linux 服务器也就几千,性能好一点的 2~3 万的就很不错了。

根据 OSI 模型,服务端负载均衡还可以分为:

  • 二层负载均衡
  • 三层负载均衡
  • 四层负载均衡
  • 七层负载均衡

最常见的是四层和七层负载均衡,因此,本文也是重点介绍这两种负载均衡。

Nginx 官网对四层负载和七层负载均衡均衡做了详细介绍,感兴趣的可以看看。

  • What Is Layer 4 Load Balancing?
  • What Is Layer 7 Load Balancing?

OSI 七层模型

  • 四层负载均衡 工作在 OSI 模型第四层,也就是传输层,这一层的主要协议是 TCP/UDP,负载均衡器在这一层能够看到数据包里的源端口地址以及目的端口地址,会基于这些信息通过一定的负载均衡算法将数据包转发到后端真实服务器。也就是说,四层负载均衡的核心就是 IP+端口层面的负载均衡,不涉及具体的报文内容。
  • 七层负载均衡 工作在 OSI 模型第七层,也就是应用层,这一层的主要协议是 HTTP 。这一层的负载均衡比四层负载均衡路由网络请求的方式更加复杂,它会读取报文的数据部分(比如说我们的 HTTP 部分的报文),然后根据读取到的数据内容(如 URL、Cookie)做出负载均衡决策。也就是说,七层负载均衡器的核心是报文内容(如 URL、Cookie)层面的负载均衡,执行第七层负载均衡的设备通常被称为 反向代理服务器 。

七层负载均衡比四层负载均衡会消耗更多的性能,不过,也相对更加灵活,能够更加智能地路由网络请求,比如说你可以根据请求的内容进行优化如缓存、压缩、加密。

简单来说,四层负载均衡性能很强,七层负载均衡功能更强! 不过,对于绝大部分业务场景来说,四层负载均衡和七层负载均衡的性能差异基本可以忽略不计的。

下面这段话摘自 Nginx 官网的 What Is Layer 4 Load Balancing? 这篇文章。

Layer 4 load balancing was a popular architectural approach to traffic handling when commodity hardware was not as powerful as it is now, and the interaction between clients and application servers was much less complex. It requires less computation than more sophisticated load balancing methods (such as Layer 7), but CPU and memory are now sufficiently fast and cheap that the performance advantage for Layer 4 load balancing has become negligible or irrelevant in most situations.

第 4 层负载平衡是一种流行的流量处理体系结构方法,当时商用硬件没有现在这么强大,客户端和应用程序服务器之间的交互也不那么复杂。它比更复杂的负载平衡方法(如第 7 层)需要更少的计算量,但是 CPU 和内存现在足够快和便宜,在大多数情况下,第 4 层负载平衡的性能优势已经变得微不足道或无关紧要。

在工作中,我们通常会使用 Nginx 来做七层负载均衡,LVS(Linux Virtual Server 虚拟服务器, Linux 内核的 4 层负载均衡)来做四层负载均衡。

关于 Nginx 的常见知识点总结,《Java 面试指北》 中「技术面试题篇」中已经有对应的内容了,感兴趣的小伙伴可以去看看。

不过,LVS 这个绝大部分公司真用不上,像阿里、百度、腾讯、eBay 等大厂才会使用到,用的最多的还是 Nginx。

客户端负载均衡

客户端负载均衡 主要应用于系统内部的不同的服务之间,可以使用现成的负载均衡组件来实现。

在客户端负载均衡中,客户端会自己维护一份服务器的地址列表,发送请求之前,客户端会根据对应的负载均衡算法来选择具体某一台服务器处理请求。

客户端负载均衡器和服务运行在同一个进程或者说 Java 程序里,不存在额外的网络开销。不过,客户端负载均衡的实现会受到编程语言的限制,比如说 Spring Cloud Load Balancer 就只能用于 Java 语言。

Java 领域主流的微服务框架 Dubbo、Spring Cloud 等都内置了开箱即用的客户端负载均衡实现。Dubbo 属于是默认自带了负载均衡功能,Spring Cloud 是通过组件的形式实现的负载均衡,属于可选项,比较常用的是 Spring Cloud Load Balancer(官方,推荐) 和 Ribbon(Netflix,已被弃用)。

下图是我画的一个简单的基于 Spring Cloud Load Balancer(Ribbon 也类似) 的客户端负载均衡示意图:

负载均衡常见的算法有哪些?

随机法

随机法 是最简单粗暴的负载均衡算法。

如果没有配置权重的话,所有的服务器被访问到的概率都是相同的。如果配置权重的话,权重越高的服务器被访问的概率就越大。

未加权重的随机算法适合于服务器性能相近的集群,其中每个服务器承载相同的负载。加权随机算法适合于服务器性能不等的集群,权重的存在可以使请求分配更加合理化。

不过,随机算法有一个比较明显的缺陷:部分机器在一段时间之内无法被随机到,毕竟是概率算法,就算是大家权重一样, 也可能会出现这种情况。

于是,轮询法 来了!

轮询法

轮询法是挨个轮询服务器处理,也可以设置权重。

如果没有配置权重的话,每个请求按时间顺序逐一分配到不同的服务器处理。如果配置权重的话,权重越高的服务器被访问的次数就越多。

未加权重的轮询算法适合于服务器性能相近的集群,其中每个服务器承载相同的负载。加权轮询算法适合于服务器性能不等的集群,权重的存在可以使请求分配更加合理化。

在加权轮询的基础上,还有进一步改进得到的负载均衡算法,比如平滑的加权轮训算法。

平滑的加权轮训算法最早是在 Nginx 中被实现,可以参考这个 commit:https://github.com/phusion/nginx/commit/27e94984486058d73157038f7950a0a36ecc6e35。如果你认真学习过 Dubbo 负载均衡策略的话,就会发现 Dubbo 的加权轮询就借鉴了该算法实现并进一步做了优化。

Dubbo 加权轮询负载均衡算法

两次随机法

两次随机法在随机法的基础上多增加了一次随机,多选出一个服务器。随后再根据两台服务器的负载等情况,从其中选择出一个最合适的服务器。

两次随机法的好处是可以动态地调节后端节点的负载,使其更加均衡。如果只使用一次随机法,可能会导致某些服务器过载,而某些服务器空闲。

哈希法

将请求的参数信息通过哈希函数转换成一个哈希值,然后根据哈希值来决定请求被哪一台服务器处理。

在服务器数量不变的情况下,相同参数的请求总是发到同一台服务器处理,比如同个 IP 的请求、同一个用户的请求。

一致性 Hash 法

和哈希法类似,一致性 Hash 法也可以让相同参数的请求总是发到同一台服务器处理。不过,它解决了哈希法存在的一些问题。

常规哈希法在服务器数量变化时,哈希值会重新落在不同的服务器上,这明显违背了使用哈希法的本意。而一致性哈希法的核心思想是将数据和节点都映射到一个哈希环上,然后根据哈希值的顺序来确定数据属于哪个节点。当服务器增加或删除时,只影响该服务器的哈希,而不会导致整个服务集群的哈希键值重新分布。

最小连接法

当有新的请求出现时,遍历服务器节点列表并选取其中连接数最小的一台服务器来响应当前请求。相同连接的情况下,可以进行加权随机。

最少连接数基于一个服务器连接数越多,负载就越高这一理想假设。然而, 实际情况是连接数并不能代表服务器的实际负载,有些连接耗费系统资源更多,有些连接不怎么耗费系统资源。

最少活跃法

最少活跃法和最小连接法类似,但要更科学一些。最少活跃法以活动连接数为标准,活动连接数可以理解为当前正在处理的请求数。活跃数越低,说明处理能力越强,这样就可以使处理能力强的服务器处理更多请求。相同活跃数的情况下,可以进行加权随机。

最快响应时间法

不同于最小连接法和最少活跃法,最快响应时间法以响应时间为标准来选择具体是哪一台服务器处理。客户端会维持每个服务器的响应时间,每次请求挑选响应时间最短的。相同响应时间的情况下,可以进行加权随机。

这种算法可以使得请求被更快处理,但可能会造成流量过于集中于高性能服务器的问题。

七层负载均衡可以怎么做?

简单介绍两种项目中常用的七层负载均衡解决方案:DNS 解析和反向代理。

除了我介绍的这两种解决方案之外,HTTP 重定向等手段也可以用来实现负载均衡,不过,相对来说,还是 DNS 解析和反向代理用的更多一些,也更推荐一些。

DNS 解析

DNS 解析是比较早期的七层负载均衡实现方式,非常简单。

DNS 解析实现负载均衡的原理是这样的:在 DNS 服务器中为同一个主机记录配置多个 IP 地址,这些 IP 地址对应不同的服务器。当用户请求域名的时候,DNS 服务器采用轮询算法返回 IP 地址,这样就实现了轮询版负载均衡。

现在的 DNS 解析几乎都支持 IP 地址的权重配置,这样的话,在服务器性能不等的集群中请求分配会更加合理化。像我自己目前正在用的阿里云 DNS 就支持权重配置。

反向代理

客户端将请求发送到反向代理服务器,由反向代理服务器去选择目标服务器,获取数据后再返回给客户端。对外暴露的是反向代理服务器地址,隐藏了真实服务器 IP 地址。反向代理“代理”的是目标服务器,这一个过程对于客户端而言是透明的。

Nginx 就是最常用的反向代理服务器,它可以将接收到的客户端请求以一定的规则(负载均衡策略)均匀地分配到这个服务器集群中所有的服务器上。

反向代理负载均衡同样属于七层负载均衡。

客户端负载均衡通常是怎么做的?

我们上面也说了,客户端负载均衡可以使用现成的负载均衡组件来实现。

Netflix Ribbon 和 Spring Cloud Load Balancer 就是目前 Java 生态最流行的两个负载均衡组件。

Ribbon 是老牌负载均衡组件,由 Netflix 开发,功能比较全面,支持的负载均衡策略也比较多。 Spring Cloud Load Balancer 是 Spring 官方为了取代 Ribbon 而推出的,功能相对更简单一些,支持的负载均衡也少一些。

Ribbon 支持的 7 种负载均衡策略:

  • RandomRule:随机策略。
  • RoundRobinRule(默认):轮询策略
  • WeightedResponseTimeRule:权重(根据响应时间决定权重)策略
  • BestAvailableRule:最小连接数策略
  • RetryRule:重试策略(按照轮询策略来获取服务,如果获取的服务实例为 null 或已经失效,则在指定的时间之内不断地进行重试来获取服务,如果超过指定时间依然没获取到服务实例则返回 null)
  • AvailabilityFilteringRule:可用敏感性策略(先过滤掉非健康的服务实例,然后再选择连接数较小的服务实例)
  • ZoneAvoidanceRule:区域敏感性策略(根据服务所在区域的性能和服务的可用性来选择服务实例)

Spring Cloud Load Balancer 支持的 2 种负载均衡策略:

  • RandomLoadBalancer:随机策略
  • RoundRobinLoadBalancer(默认):轮询策略
1
2
3
4
5
6
7
8
9
10
11
public class CustomLoadBalancerConfiguration {

@Bean
ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RandomLoadBalancer(loadBalancerClientFactory
.getLazyProvider(name, ServiceInstanceListSupplier.class),
name);
}
}

不过,Spring Cloud Load Balancer 支持的负载均衡策略其实不止这两种,ServiceInstanceListSupplier 的实现类同样可以让其支持类似于 Ribbon 的负载均衡策略。这个应该是后续慢慢完善引入的,不看官方文档还真发现不了,所以说阅读官方文档真的很重要!

这里举两个官方的例子:

  • ZonePreferenceServiceInstanceListSupplier:实现基于区域的负载平衡
  • HintBasedServiceInstanceListSupplier:实现基于 hint 提示的负载均衡
1
2
3
4
5
6
7
8
9
10
11
12
public class CustomLoadBalancerConfiguration {
// 使用基于区域的负载平衡方法
@Bean
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withDiscoveryClient()
.withZonePreference()
.withCaching()
.build(context);
}
}

关于 Spring Cloud Load Balancer 更详细更新的介绍,推荐大家看看官方文档:https://docs.spring.io/spring-cloud-commons/docs/current/reference/html/#spring-cloud-loadbalancer ,一切以官方文档为主。

轮询策略基本可以满足绝大部分项目的需求,我们的实际项目中如果没有特殊需求的话,通常使用的就是默认的轮询策略。并且,Ribbon 和 Spring Cloud Load Balancer 都支持自定义负载均衡策略。

个人建议如非必需 Ribbon 某个特有的功能或者负载均衡策略的话,就优先选择 Spring 官方提供的 Spring Cloud Load Balancer。

最后再说说为什么我不太推荐使用 Ribbon 。

Spring Cloud 2020.0.0 版本移除了 Netflix 除 Eureka 外的所有组件。Spring Cloud Hoxton.M2 是第一个支持 Spring Cloud Load Balancer 来替代 Netfix Ribbon 的版本。

我们早期学习微服务,肯定接触过 Netflix 公司开源的 Feign、Ribbon、Zuul、Hystrix、Eureka 等知名的微服务系统构建所必须的组件,直到现在依然有非常非常多的公司在使用这些组件。不夸张地说,Netflix 公司引领了 Java 技术栈下的微服务发展。

那为什么 Spring Cloud 这么急着移除 Netflix 的组件呢? 主要是因为在 2018 年的时候,Netflix 宣布其开源的核心组件 Hystrix、Ribbon、Zuul、Eureka 等进入维护状态,不再进行新特性开发,只修 BUG。于是,Spring 官方不得不考虑移除 Netflix 的组件。

Spring Cloud Alibaba 是一个不错的选择,尤其是对于国内的公司和个人开发者来说。

参考

  • 干货 | eBay 的 4 层软件负载均衡实现:https://mp.weixin.qq.com/s/bZMxLTECOK3mjdgiLbHj-g
  • HTTP Load Balancing(Nginx 官方文档):https://docs.nginx.com/nginx/admin-guide/load-balancer/http-load-balancer/
  • 深入浅出负载均衡 - vivo 互联网技术:https://www.cnblogs.com/vivotech/p/14859041.html

LinkedList 源码分析

发表于 2021-07-16 | 分类于 Java , 集合 | 阅读次数:
字数统计: 3.9k 字 | 阅读时长 ≈ 16 分钟

LinkedList 简介

LinkedList 是一个基于双向链表实现的集合类,经常被拿来和 ArrayList 做比较。关于 LinkedList 和ArrayList的详细对比,我们 Java 集合常见面试题总结(上)有详细介绍到。

双向链表

不过,我们在项目中一般是不会使用到 LinkedList 的,需要用到 LinkedList 的场景几乎都可以使用 ArrayList 来代替,并且,性能通常会更好!就连 LinkedList 的作者约书亚 · 布洛克(Josh Bloch)自己都说从来不会使用 LinkedList 。

另外,不要下意识地认为 LinkedList 作为链表就最适合元素增删的场景。我在上面也说了,LinkedList 仅仅在头尾插入或者删除元素的时候时间复杂度近似 O(1),其他情况增删元素的平均时间复杂度都是 O(n) 。

LinkedList 插入和删除元素的时间复杂度?

  • 头部插入/删除:只需要修改头结点的指针即可完成插入/删除操作,因此时间复杂度为 O(1)。
  • 尾部插入/删除:只需要修改尾结点的指针即可完成插入/删除操作,因此时间复杂度为 O(1)。
  • 指定位置插入/删除:需要先移动到指定位置,再修改指定节点的指针完成插入/删除,不过由于有头尾指针,可以从较近的指针出发,因此需要遍历平均 n/4 个元素,时间复杂度为 O(n)。

LinkedList 为什么不能实现 RandomAccess 接口?

RandomAccess 是一个标记接口,用来表明实现该接口的类支持随机访问(即可以通过索引快速访问元素)。由于 LinkedList 底层数据结构是链表,内存地址不连续,只能通过指针来定位,不支持随机快速访问,所以不能实现 RandomAccess 接口。

LinkedList 源码分析

这里以 JDK1.8 为例,分析一下 LinkedList 的底层核心源码。

LinkedList 的类定义如下:

1
2
3
4
5
6
public class LinkedList<E>
extends AbstractSequentialList<E>
implements List<E>, Deque<E>, Cloneable, java.io.Serializable
{
//...
}

LinkedList 继承了 AbstractSequentialList ,而 AbstractSequentialList 又继承于 AbstractList 。

阅读过 ArrayList 的源码我们就知道,ArrayList 同样继承了 AbstractList , 所以 LinkedList 会有大部分方法和 ArrayList 相似。

LinkedList 实现了以下接口:

  • List : 表明它是一个列表,支持添加、删除、查找等操作,并且可以通过下标进行访问。
  • Deque :继承自 Queue 接口,具有双端队列的特性,支持从两端插入和删除元素,方便实现栈和队列等数据结构。需要注意,Deque 的发音为 “deck” [dɛk],这个大部分人都会读错。
  • Cloneable :表明它具有拷贝能力,可以进行深拷贝或浅拷贝操作。
  • Serializable : 表明它可以进行序列化操作,也就是可以将对象转换为字节流进行持久化存储或网络传输,非常方便。

LinkedList 类图

LinkedList 中的元素是通过 Node 定义的:

1
2
3
4
5
6
7
8
9
10
11
12
private static class Node<E> {
E item;// 节点值
Node<E> next; // 指向的下一个节点(后继节点)
Node<E> prev; // 指向的前一个节点(前驱结点)

// 初始化参数顺序分别是:前驱结点、本身节点值、后继节点
Node(Node<E> prev, E element, Node<E> next) {
this.item = element;
this.next = next;
this.prev = prev;
}
}

初始化

LinkedList 中有一个无参构造函数和一个有参构造函数。

1
2
3
4
5
6
7
8
9
// 创建一个空的链表对象
public LinkedList() {
}

// 接收一个集合类型作为参数,会创建一个与传入集合相同元素的链表对象
public LinkedList(Collection<? extends E> c) {
this();
addAll(c);
}

插入元素

LinkedList 除了实现了 List 接口相关方法,还实现了 Deque 接口的很多方法,所以我们有很多种方式插入元素。

我们这里以 List 接口中相关的插入方法为例进行源码讲解,对应的是add() 方法。

add() 方法有两个版本:

  • add(E e):用于在 LinkedList 的尾部插入元素,即将新元素作为链表的最后一个元素,时间复杂度为 O(1)。
  • add(int index, E element):用于在指定位置插入元素。这种插入方式需要先移动到指定位置,再修改指定节点的指针完成插入/删除,因此需要移动平均 n/4 个元素,时间复杂度为 O(n)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 在链表尾部插入元素
public boolean add(E e) {
linkLast(e);
return true;
}

// 在链表指定位置插入元素
public void add(int index, E element) {
// 下标越界检查
checkPositionIndex(index);

// 判断 index 是不是链表尾部位置
if (index == size)
// 如果是就直接调用 linkLast 方法将元素节点插入链表尾部即可
linkLast(element);
else
// 如果不是则调用 linkBefore 方法将其插入指定元素之前
linkBefore(element, node(index));
}

// 将元素节点插入到链表尾部
void linkLast(E e) {
// 将最后一个元素赋值(引用传递)给节点 l
final Node<E> l = last;
// 创建节点,并指定节点前驱为链表尾节点 last,后继引用为空
final Node<E> newNode = new Node<>(l, e, null);
// 将 last 引用指向新节点
last = newNode;
// 判断尾节点是否为空
// 如果 l 是null 意味着这是第一次添加元素
if (l == null)
// 如果是第一次添加,将first赋值为新节点,此时链表只有一个元素
first = newNode;
else
// 如果不是第一次添加,将新节点赋值给l(添加前的最后一个元素)的next
l.next = newNode;
size++;
modCount++;
}

// 在指定元素之前插入元素
void linkBefore(E e, Node<E> succ) {
// assert succ != null;断言 succ不为 null
// 定义一个节点元素保存 succ 的 prev 引用,也就是它的前一节点信息
final Node<E> pred = succ.prev;
// 初始化节点,并指明前驱和后继节点
final Node<E> newNode = new Node<>(pred, e, succ);
// 将 succ 节点前驱引用 prev 指向新节点
succ.prev = newNode;
// 判断前驱节点是否为空,为空表示 succ 是第一个节点
if (pred == null)
// 新节点成为第一个节点
first = newNode;
else
// succ 节点前驱的后继引用指向新节点
pred.next = newNode;
size++;
modCount++;
}

获取元素

LinkedList获取元素相关的方法一共有 3 个:

  1. getFirst():获取链表的第一个元素。
  2. getLast():获取链表的最后一个元素。
  3. get(int index):获取链表指定位置的元素。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 获取链表的第一个元素
public E getFirst() {
final Node<E> f = first;
if (f == null)
throw new NoSuchElementException();
return f.item;
}

// 获取链表的最后一个元素
public E getLast() {
final Node<E> l = last;
if (l == null)
throw new NoSuchElementException();
return l.item;
}

// 获取链表指定位置的元素
public E get(int index) {
// 下标越界检查,如果越界就抛异常
checkElementIndex(index);
// 返回链表中对应下标的元素
return node(index).item;
}

这里的核心在于 node(int index) 这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 返回指定下标的非空节点
Node<E> node(int index) {
// 断言下标未越界
// assert isElementIndex(index);
// 如果index小于size的二分之一 从前开始查找(向后查找) 反之向前查找
if (index < (size >> 1)) {
Node<E> x = first;
// 遍历,循环向后查找,直至 i == index
for (int i = 0; i < index; i++)
x = x.next;
return x;
} else {
Node<E> x = last;
for (int i = size - 1; i > index; i--)
x = x.prev;
return x;
}
}

get(int index) 或 remove(int index) 等方法内部都调用了该方法来获取对应的节点。

从这个方法的源码可以看出,该方法通过比较索引值与链表 size 的一半大小来确定从链表头还是尾开始遍历。如果索引值小于 size 的一半,就从链表头开始遍历,反之从链表尾开始遍历。这样可以在较短的时间内找到目标节点,充分利用了双向链表的特性来提高效率。

删除元素

LinkedList删除元素相关的方法一共有 5 个:

  1. removeFirst():删除并返回链表的第一个元素。
  2. removeLast():删除并返回链表的最后一个元素。
  3. remove(E e):删除链表中首次出现的指定元素,如果不存在该元素则返回 false。
  4. remove(int index):删除指定索引处的元素,并返回该元素的值。
  5. void clear():移除此链表中的所有元素。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 删除并返回链表的第一个元素
public E removeFirst() {
final Node<E> f = first;
if (f == null)
throw new NoSuchElementException();
return unlinkFirst(f);
}

// 删除并返回链表的最后一个元素
public E removeLast() {
final Node<E> l = last;
if (l == null)
throw new NoSuchElementException();
return unlinkLast(l);
}

// 删除链表中首次出现的指定元素,如果不存在该元素则返回 false
public boolean remove(Object o) {
// 如果指定元素为 null,遍历链表找到第一个为 null 的元素进行删除
if (o == null) {
for (Node<E> x = first; x != null; x = x.next) {
if (x.item == null) {
unlink(x);
return true;
}
}
} else {
// 如果不为 null ,遍历链表找到要删除的节点
for (Node<E> x = first; x != null; x = x.next) {
if (o.equals(x.item)) {
unlink(x);
return true;
}
}
}
return false;
}

// 删除链表指定位置的元素
public E remove(int index) {
// 下标越界检查,如果越界就抛异常
checkElementIndex(index);
return unlink(node(index));
}

这里的核心在于 unlink(Node<E> x) 这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
E unlink(Node<E> x) {
// 断言 x 不为 null
// assert x != null;
// 获取当前节点(也就是待删除节点)的元素
final E element = x.item;
// 获取当前节点的下一个节点
final Node<E> next = x.next;
// 获取当前节点的前一个节点
final Node<E> prev = x.prev;

// 如果前一个节点为空,则说明当前节点是头节点
if (prev == null) {
// 直接让链表头指向当前节点的下一个节点
first = next;
} else { // 如果前一个节点不为空
// 将前一个节点的 next 指针指向当前节点的下一个节点
prev.next = next;
// 将当前节点的 prev 指针置为 null,,方便 GC 回收
x.prev = null;
}

// 如果下一个节点为空,则说明当前节点是尾节点
if (next == null) {
// 直接让链表尾指向当前节点的前一个节点
last = prev;
} else { // 如果下一个节点不为空
// 将下一个节点的 prev 指针指向当前节点的前一个节点
next.prev = prev;
// 将当前节点的 next 指针置为 null,方便 GC 回收
x.next = null;
}

// 将当前节点元素置为 null,方便 GC 回收
x.item = null;
size--;
modCount++;
return element;
}

unlink() 方法的逻辑如下:

  1. 首先获取待删除节点 x 的前驱和后继节点;
  2. 判断待删除节点是否为头节点或尾节点:
    • 如果 x 是头节点,则将 first 指向 x 的后继节点 next
    • 如果 x 是尾节点,则将 last 指向 x 的前驱节点 prev
    • 如果 x 不是头节点也不是尾节点,执行下一步操作
  3. 将待删除节点 x 的前驱的后继指向待删除节点的后继 next,断开 x 和 x.prev 之间的链接;
  4. 将待删除节点 x 的后继的前驱指向待删除节点的前驱 prev,断开 x 和 x.next 之间的链接;
  5. 将待删除节点 x 的元素置空,修改链表长度。

可以参考下图理解(图源:LinkedList 源码分析(JDK 1.8)):

unlink 方法逻辑

遍历链表

推荐使用for-each 循环来遍历 LinkedList 中的元素, for-each 循环最终会转换成迭代器形式。

1
2
3
4
5
6
7
8
LinkedList<String> list = new LinkedList<>();
list.add("apple");
list.add("banana");
list.add("pear");

for (String fruit : list) {
System.out.println(fruit);
}

LinkedList 的遍历的核心就是它的迭代器的实现。

1
2
3
4
5
6
7
8
9
10
11
12
// 双向迭代器
private class ListItr implements ListIterator<E> {
// 表示上一次调用 next() 或 previous() 方法时经过的节点;
private Node<E> lastReturned;
// 表示下一个要遍历的节点;
private Node<E> next;
// 表示下一个要遍历的节点的下标,也就是当前节点的后继节点的下标;
private int nextIndex;
// 表示当前遍历期望的修改计数值,用于和 LinkedList 的 modCount 比较,判断链表是否被其他线程修改过。
private int expectedModCount = modCount;
…………
}

下面我们对迭代器 ListItr 中的核心方法进行详细介绍。

我们先来看下从头到尾方向的迭代:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 判断还有没有下一个节点
public boolean hasNext() {
// 判断下一个节点的下标是否小于链表的大小,如果是则表示还有下一个元素可以遍历
return nextIndex < size;
}
// 获取下一个节点
public E next() {
// 检查在迭代过程中链表是否被修改过
checkForComodification();
// 判断是否还有下一个节点可以遍历,如果没有则抛出 NoSuchElementException 异常
if (!hasNext())
throw new NoSuchElementException();
// 将 lastReturned 指向当前节点
lastReturned = next;
// 将 next 指向下一个节点
next = next.next;
nextIndex++;
return lastReturned.item;
}

再来看一下从尾到头方向的迭代:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 判断是否还有前一个节点
public boolean hasPrevious() {
return nextIndex > 0;
}

// 获取前一个节点
public E previous() {
// 检查是否在迭代过程中链表被修改
checkForComodification();
// 如果没有前一个节点,则抛出异常
if (!hasPrevious())
throw new NoSuchElementException();
// 将 lastReturned 和 next 指针指向上一个节点
lastReturned = next = (next == null) ? last : next.prev;
nextIndex--;
return lastReturned.item;
}

如果需要删除或插入元素,也可以使用迭代器进行操作。

1
2
3
4
5
6
7
8
9
10
11
LinkedList<String> list = new LinkedList<>();
list.add("apple");
list.add(null);
list.add("banana");

// Collection 接口的 removeIf 方法底层依然是基于迭代器
list.removeIf(Objects::isNull);

for (String fruit : list) {
System.out.println(fruit);
}

迭代器对应的移除元素的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 从列表中删除上次被返回的元素
public void remove() {
// 检查是否在迭代过程中链表被修改
checkForComodification();
// 如果上次返回的节点为空,则抛出异常
if (lastReturned == null)
throw new IllegalStateException();

// 获取当前节点的下一个节点
Node<E> lastNext = lastReturned.next;
// 从链表中删除上次返回的节点
unlink(lastReturned);
// 修改指针
if (next == lastReturned)
next = lastNext;
else
nextIndex--;
// 将上次返回的节点引用置为 null,方便 GC 回收
lastReturned = null;
expectedModCount++;
}

LinkedList 常用方法测试

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 创建 LinkedList 对象
LinkedList<String> list = new LinkedList<>();

// 添加元素到链表末尾
list.add("apple");
list.add("banana");
list.add("pear");
System.out.println("链表内容:" + list);

// 在指定位置插入元素
list.add(1, "orange");
System.out.println("链表内容:" + list);

// 获取指定位置的元素
String fruit = list.get(2);
System.out.println("索引为 2 的元素:" + fruit);

// 修改指定位置的元素
list.set(3, "grape");
System.out.println("链表内容:" + list);

// 删除指定位置的元素
list.remove(0);
System.out.println("链表内容:" + list);

// 删除第一个出现的指定元素
list.remove("banana");
System.out.println("链表内容:" + list);

// 获取链表的长度
int size = list.size();
System.out.println("链表长度:" + size);

// 清空链表
list.clear();
System.out.println("清空后的链表:" + list);

输出:

1
2
3
4
5
6
索引为 2 的元素:banana
链表内容:[apple, orange, banana, grape]
链表内容:[orange, banana, grape]
链表内容:[orange, grape]
链表长度:2
清空后的链表:[]

DelayQueue 源码分析

发表于 2021-07-02 | 分类于 Java , 集合 | 阅读次数:
字数统计: 3.9k 字 | 阅读时长 ≈ 14 分钟

DelayQueue 简介

DelayQueue 是 JUC 包(java.util.concurrent)为我们提供的延迟队列,用于实现延时任务比如订单下单 15 分钟未支付直接取消。它是 BlockingQueue 的一种,底层是一个基于 PriorityQueue 实现的一个无界队列,是线程安全的。关于PriorityQueue可以参考笔者编写的这篇文章:PriorityQueue 源码分析 。

BlockingQueue 的实现类

DelayQueue 中存放的元素必须实现 Delayed 接口,并且需要重写 getDelay()方法(计算是否到期)。

1
2
3
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

默认情况下, DelayQueue 会按照到期时间升序编排任务。只有当元素过期时(getDelay()方法返回值小于等于 0),才能从队列中取出。

DelayQueue 发展史

  • DelayQueue 最早是在 Java 5 中引入的,作为 java.util.concurrent 包中的一部分,用于支持基于时间的任务调度和缓存过期删除等场景,该版本仅仅支持延迟功能的实现,还未解决线程安全问题。
  • 在 Java 6 中,DelayQueue 的实现进行了优化,通过使用 ReentrantLock 和 Condition 解决线程安全及线程间交互的效率,提高了其性能和可靠性。
  • 在 Java 7 中,DelayQueue 的实现进行了进一步的优化,通过使用 CAS 操作实现元素的添加和移除操作,提高了其并发操作性能。
  • 在 Java 8 中,DelayQueue 的实现没有进行重大变化,但是在 java.time 包中引入了新的时间类,如 Duration 和 Instant,使得使用 DelayQueue 进行基于时间的调度更加方便和灵活。
  • 在 Java 9 中,DelayQueue 的实现进行了一些微小的改进,主要是对代码进行了一些优化和精简。

总的来说,DelayQueue 的发展史主要是通过优化其实现方式和提高其性能和可靠性,使其更加适用于基于时间的调度和缓存过期删除等场景。

DelayQueue 常见使用场景示例

我们这里希望任务可以按照我们预期的时间执行,例如提交 3 个任务,分别要求 1s、2s、3s 后执行,即使是乱序添加,1s 后要求 1s 执行的任务会准时执行。

延迟任务

对此我们可以使用 DelayQueue 来实现,所以我们首先需要继承 Delayed 实现 DelayedTask,实现 getDelay 方法以及优先级比较 compareTo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 延迟任务
*/
public class DelayedTask implements Delayed {
/**
* 任务到期时间
*/
private long executeTime;
/**
* 任务
*/
private Runnable task;

public DelayedTask(long delay, Runnable task) {
this.executeTime = System.currentTimeMillis() + delay;
this.task = task;
}

/**
* 查看当前任务还有多久到期
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

/**
* 延迟队列需要到期时间升序入队,所以我们需要实现compareTo进行到期时间比较
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}

public void execute() {
task.run();
}
}

完成任务的封装之后,使用就很简单了,设置好多久到期然后将任务提交到延迟队列中即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建延迟队列,并添加任务
DelayQueue < DelayedTask > delayQueue = new DelayQueue < > ();

//分别添加1s、2s、3s到期的任务
delayQueue.add(new DelayedTask(2000, () -> System.out.println("Task 2")));
delayQueue.add(new DelayedTask(1000, () -> System.out.println("Task 1")));
delayQueue.add(new DelayedTask(3000, () -> System.out.println("Task 3")));

// 取出任务并执行
while (!delayQueue.isEmpty()) {
//阻塞获取最先到期的任务
DelayedTask task = delayQueue.take();
if (task != null) {
task.execute();
}
}

从输出结果可以看出,即使笔者先提到 2s 到期的任务,1s 到期的任务 Task1 还是优先执行的。

1
2
3
Task 1
Task 2
Task 3

DelayQueue 源码解析

这里以 JDK1.8 为例,分析一下 DelayQueue 的底层核心源码。

DelayQueue 的类定义如下:

1
2
3
4
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
{
//...
}

DelayQueue 继承了 AbstractQueue 类,实现了 BlockingQueue 接口。

DelayQueue类图

核心成员变量

DelayQueue 的 4 个核心成员变量如下:

1
2
3
4
5
6
7
8
9
//可重入锁,实现线程安全的关键
private final transient ReentrantLock lock = new ReentrantLock();
//延迟队列底层存储数据的集合,确保元素按照到期时间升序排列
private final PriorityQueue<E> q = new PriorityQueue<E>();

//指向准备执行优先级最高的线程
private Thread leader = null;
//实现多线程之间等待唤醒的交互
private final Condition available = lock.newCondition();
  • lock : 我们都知道 DelayQueue 存取是线程安全的,所以为了保证存取元素时线程安全,我们就需要在存取时上锁,而 DelayQueue 就是基于 ReentrantLock 独占锁确保存取操作的线程安全。
  • q : 延迟队列要求元素按照到期时间进行升序排列,所以元素添加时势必需要进行优先级排序,所以 DelayQueue 底层元素的存取都是通过这个优先队列 PriorityQueue 的成员变量 q 来管理的。
  • leader : 延迟队列的任务只有到期之后才会执行,对于没有到期的任务只有等待,为了确保优先级最高的任务到期后可以即刻被执行,设计者就用 leader 来管理延迟任务,只有 leader 所指向的线程才具备定时等待任务到期执行的权限,而其他那些优先级低的任务只能无限期等待,直到 leader 线程执行完手头的延迟任务后唤醒它。
  • available : 上文讲述 leader 线程时提到的等待唤醒操作的交互就是通过 available 实现的,假如线程 1 尝试在空的 DelayQueue 获取任务时,available 就会将其放入等待队列中。直到有一个线程添加一个延迟任务后通过 available 的 signal 方法将其唤醒。

构造方法

相较于其他的并发容器,延迟队列的构造方法比较简单,它只有两个构造方法,因为所有成员变量在类加载时都已经初始完成了,所以默认构造方法什么也没做。还有一个传入 Collection 对象的构造方法,它会将调用 addAll()方法将集合元素存到优先队列 q 中。

1
2
3
4
5
public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}

添加元素

DelayQueue 添加元素的方法无论是 add、put 还是 offer,本质上就是调用一下 offer ,所以了解延迟队列的添加逻辑我们只需阅读 offer 方法即可。

offer 方法的整体逻辑为:

  1. 尝试获取 lock 。
  2. 如果上锁成功,则调 q 的 offer 方法将元素存放到优先队列中。
  3. 调用 peek 方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素),于是将 leader 设置为空,通知因为队列为空时调用 take 等方法导致阻塞的线程来争抢元素。
  4. 上述步骤执行完成,释放 lock。
  5. 返回 true。

源码如下,笔者已详细注释,读者可自行参阅:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean offer(E e) {
//尝试获取lock
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果上锁成功,则调q的offer方法将元素存放到优先队列中
q.offer(e);
//调用peek方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素)
if (q.peek() == e) {
//将leader设置为空,通知调用取元素方法而阻塞的线程来争抢这个任务
leader = null;
available.signal();
}
return true;
} finally {
//上述步骤执行完成,释放lock
lock.unlock();
}
}

获取元素

DelayQueue 中获取元素的方式分为阻塞式和非阻塞式,先来看看逻辑比较复杂的阻塞式获取元素方法 take,为了让读者可以更直观的了解阻塞式获取元素的全流程,笔者将以 3 个线程并发获取元素为例讲述 take 的工作流程。

想要理解下面的内容,需要用到 AQS 相关的知识,推荐阅读下面这两篇文章:

  • 图文讲解 AQS ,一起看看 AQS 的源码……(图文较长)
  • AQS 都看完了,Condition 原理可不能少!

1、首先, 3 个线程会尝试获取可重入锁 lock,假设我们现在有 3 个线程分别是 t1、t2、t3,随后 t1 得到了锁,而 t2、t3 没有抢到锁,故将这两个线程存入等待队列中。

2、紧接着 t1 开始进行元素获取的逻辑。

3、线程 t1 首先会查看 DelayQueue 队列首元素是否为空。

4、如果元素为空,则说明当前队列没有任何元素,故 t1 就会被阻塞存到 conditionWaiter 这个队列中。

注意,调用 await 之后 t1 就会释放 lcok 锁,假如 DelayQueue 持续为空,那么 t2、t3 也会像 t1 一样执行相同的逻辑并进入 conditionWaiter 队列中。

如果元素不为空,则判断当前任务是否到期,如果元素到期,则直接返回出去。如果元素未到期,则判断当前 leader 线程(DelayQueue 中唯一一个可以等待并获取元素的线程引用)是否为空,若不为空,则说明当前 leader 正在等待执行一个优先级比当前元素还高的元素到期,故当前线程 t1 只能调用 await 进入无限期等待,等到 leader 取得元素后唤醒。反之,若 leader 线程为空,则将当前线程设置为 leader 并进入有限期等待,到期后取出元素并返回。

自此我们阻塞式获取元素的逻辑都已完成后,源码如下,读者可自行参阅:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public E take() throws InterruptedException {
// 尝试获取可重入锁,将底层AQS的state设置为1,并设置为独占锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//查看队列第一个元素
E first = q.peek();
//若为空,则将当前线程放入ConditionObject的等待队列中,并将底层AQS的state设置为0,表示释放锁并进入无限期等待
if (first == null)
available.await();
else {
//若元素不为空,则查看当前元素多久到期
long delay = first.getDelay(NANOSECONDS);
//如果小于0则说明已到期直接返回出去
if (delay <= 0)
return q.poll();
//如果大于0则说明任务还没到期,首先需要释放对这个元素的引用
first = null; // don't retain ref while waiting
//判断leader是否为空,如果不为空,则说明正有线程作为leader并等待一个任务到期,则当前线程进入无限期等待
if (leader != null)
available.await();
else {
//反之将我们的线程成为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//并进入有限期等待
available.awaitNanos(delay);
} finally {
//等待任务到期时,释放leader引用,进入下一次循环将任务return出去
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 收尾逻辑:当leader为null,并且队列中有任务时,唤醒等待的获取元素的线程。
if (leader == null && q.peek() != null)
available.signal();
//释放锁
lock.unlock();
}
}

我们再来看看非阻塞的获取元素方法 poll ,逻辑比较简单,整体步骤如下:

  1. 尝试获取可重入锁。
  2. 查看队列第一个元素,判断元素是否为空。
  3. 若元素为空,或者元素未到期,则直接返回空。
  4. 若元素不为空且到期了,直接调用 poll 返回出去。
  5. 释放可重入锁 lock 。

源码如下,读者可自行参阅源码及注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public E poll() {
//尝试获取可重入锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//查看队列第一个元素,判断元素是否为空
E first = q.peek();

//若元素为空,或者元素未到期,则直接返回空
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
//若元素不为空且到期了,直接调用poll返回出去
return q.poll();
} finally {
//释放可重入锁lock
lock.unlock();
}
}

查看元素

上文获取元素时都会调用到 peek 方法,peek 顾名思义仅仅窥探一下队列中的元素,它的步骤就 4 步:

  1. 上锁。
  2. 调用优先队列 q 的 peek 方法查看索引 0 位置的元素。
  3. 释放锁。
  4. 将元素返回出去。
1
2
3
4
5
6
7
8
9
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}

DelayQueue 常见面试题

DelayQueue 的实现原理是什么?

DelayQueue 底层是使用优先队列 PriorityQueue 来存储元素,而 PriorityQueue 采用二叉小顶堆的思想确保值小的元素排在最前面,这就使得 DelayQueue 对于延迟任务优先级的管理就变得十分方便了。同时 DelayQueue 为了保证线程安全还用到了可重入锁 ReentrantLock,确保单位时间内只有一个线程可以操作延迟队列。最后,为了实现多线程之间等待和唤醒的交互效率,DelayQueue 还用到了 Condition,通过 Condition 的 await 和 signal 方法完成多线程之间的等待唤醒。

DelayQueue 的实现是否线程安全?

DelayQueue 的实现是线程安全的,它通过 ReentrantLock 实现了互斥访问和 Condition 实现了线程间的等待和唤醒操作,可以保证多线程环境下的安全性和可靠性。

DelayQueue 的使用场景有哪些?

DelayQueue 通常用于实现定时任务调度和缓存过期删除等场景。在定时任务调度中,需要将需要执行的任务封装成延迟任务对象,并将其添加到 DelayQueue 中,DelayQueue 会自动按照剩余延迟时间进行升序排序(默认情况),以保证任务能够按照时间先后顺序执行。对于缓存过期这个场景而言,在数据被缓存到内存之后,我们可以将缓存的 key 封装成一个延迟的删除任务,并将其添加到 DelayQueue 中,当数据过期时,拿到这个任务的 key,将这个 key 从内存中移除。

DelayQueue 中 Delayed 接口的作用是什么?

Delayed 接口定义了元素的剩余延迟时间(getDelay)和元素之间的比较规则(该接口继承了 Comparable 接口)。若希望元素能够存放到 DelayQueue 中,就必须实现 Delayed 接口的 getDelay() 方法和 compareTo() 方法,否则 DelayQueue 无法得知当前任务剩余时长和任务优先级的比较。

DelayQueue 和 Timer/TimerTask 的区别是什么?

DelayQueue 和 Timer/TimerTask 都可以用于实现定时任务调度,但是它们的实现方式不同。DelayQueue 是基于优先级队列和堆排序算法实现的,可以实现多个任务按照时间先后顺序执行;而 Timer/TimerTask 是基于单线程实现的,只能按照任务的执行顺序依次执行,如果某个任务执行时间过长,会影响其他任务的执行。另外,DelayQueue 还支持动态添加和移除任务,而 Timer/TimerTask 只能在创建时指定任务。

参考文献

  • 《深入理解高并发编程:JDK 核心技术》:
  • 一口气说出 Java 6 种延时队列的实现方法(面试官也得服):https://www.jb51.net/article/186192.htm
  • 图解 DelayQueue 源码(java 8)——延时队列的小九九: https://blog.csdn.net/every__day/article/details/113810985

ArrayBlockingQueue 源码分析

发表于 2021-06-27 | 分类于 Java , 集合 | 阅读次数:
字数统计: 8k 字 | 阅读时长 ≈ 30 分钟

阻塞队列简介

阻塞队列的历史

Java 阻塞队列的历史可以追溯到 JDK1.5 版本,当时 Java 平台增加了 java.util.concurrent,即我们常说的 JUC 包,其中包含了各种并发流程控制工具、并发容器、原子类等。这其中自然也包含了我们这篇文章所讨论的阻塞队列。

为了解决高并发场景下多线程之间数据共享的问题,JDK1.5 版本中出现了 ArrayBlockingQueue 和 LinkedBlockingQueue,它们是带有生产者-消费者模式实现的并发容器。其中,ArrayBlockingQueue 是有界队列,即添加的元素达到上限之后,再次添加就会被阻塞或者抛出异常。而 LinkedBlockingQueue 则由链表构成的队列,正是因为链表的特性,所以 LinkedBlockingQueue 在添加元素上并不会向 ArrayBlockingQueue 那样有着较多的约束,所以 LinkedBlockingQueue 设置队列是否有界是可选的(注意这里的无界并不是指可以添加任务数量的元素,而是说队列的大小默认为 Integer.MAX_VALUE,近乎于无限大)。

随着 Java 的不断发展,JDK 后续的几个版本又对阻塞队列进行了不少的更新和完善:

  1. JDK1.6 版本:增加 SynchronousQueue,一个不存储元素的阻塞队列。
  2. JDK1.7 版本:增加 TransferQueue,一个支持更多操作的阻塞队列。
  3. JDK1.8 版本:增加 DelayQueue,一个支持延迟获取元素的阻塞队列。

阻塞队列的思想

阻塞队列就是典型的生产者-消费者模型,它可以做到以下几点:

  1. 当阻塞队列数据为空时,所有的消费者线程都会被阻塞,等待队列非空。
  2. 当生产者往队列里填充数据后,队列就会通知消费者队列非空,消费者此时就可以进来消费。
  3. 当阻塞队列因为消费者消费过慢或者生产者存放元素过快导致队列填满时无法容纳新元素时,生产者就会被阻塞,等待队列非满时继续存放元素。
  4. 当消费者从队列中消费一个元素之后,队列就会通知生产者队列非满,生产者可以继续填充数据了。

总结一下:阻塞队列就说基于非空和非满两个条件实现生产者和消费者之间的交互,尽管这些交互流程和等待通知的机制实现非常复杂,好在 Doug Lea 的操刀之下已将阻塞队列的细节屏蔽,我们只需调用 put、take、offer、poll 等 API 即可实现多线程之间的生产和消费。

这也使得阻塞队列在多线程开发中有着广泛的运用,最常见的例子无非是我们的线程池,从源码中我们就能看出当核心线程无法及时处理任务时,这些任务都会扔到 workQueue 中。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {// ...}

ArrayBlockingQueue 常见方法及测试

简单了解了阻塞队列的历史之后,我们就开始重点讨论本篇文章所要介绍的并发容器——ArrayBlockingQueue。为了后续更加深入的了解 ArrayBlockingQueue,我们不妨基于下面几个实例了解以下 ArrayBlockingQueue 的使用。

先看看第一个例子,我们这里会用两个线程分别模拟生产者和消费者,生产者生产完会使用 put 方法生产 10 个元素给消费者进行消费,当队列元素达到我们设置的上限 5 时,put 方法就会阻塞。
同理消费者也会通过 take 方法消费元素,当队列为空时,take 方法就会阻塞消费者线程。这里笔者为了保证消费者能够在消费完 10 个元素后及时退出。便通过倒计时门闩,来控制消费者结束,生产者在这里只会生产 10 个元素。当消费者将 10 个元素消费完成之后,按下倒计时门闩,所有线程都会停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class ProducerConsumerExample {

public static void main(String[] args) throws InterruptedException {

// 创建一个大小为 5 的 ArrayBlockingQueue
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

// 创建生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
// 向队列中添加元素,如果队列已满则阻塞等待
queue.put(i);
System.out.println("生产者添加元素:" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}

});

CountDownLatch countDownLatch = new CountDownLatch(1);

// 创建消费者线程
Thread consumer = new Thread(() -> {
try {
int count = 0;
while (true) {

// 从队列中取出元素,如果队列为空则阻塞等待
int element = queue.take();
System.out.println("消费者取出元素:" + element);
++count;
if (count == 10) {
break;
}
}

countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}

});

// 启动线程
producer.start();
consumer.start();

// 等待线程结束
producer.join();
consumer.join();

countDownLatch.await();

producer.interrupt();
consumer.interrupt();
}

}

代码输出结果如下,可以看到只有生产者往队列中投放元素之后消费者才能消费,这也就意味着当队列中没有数据的时消费者就会阻塞,等待队列非空再继续消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
生产者添加元素:1
生产者添加元素:2
消费者取出元素:1
消费者取出元素:2
生产者添加元素:3
消费者取出元素:3
生产者添加元素:4
生产者添加元素:5
消费者取出元素:4
生产者添加元素:6
消费者取出元素:5
生产者添加元素:7
生产者添加元素:8
生产者添加元素:9
生产者添加元素:10
消费者取出元素:6
消费者取出元素:7
消费者取出元素:8
消费者取出元素:9
消费者取出元素:10

了解了 put、take 这两个会阻塞的存和取方法之后,我我们再来看看阻塞队列中非阻塞的入队和出队方法 offer 和 poll。

如下所示,我们设置了一个大小为 3 的阻塞队列,我们会尝试在队列用 offer 方法存放 4 个元素,然后再从队列中用 poll 尝试取 4 次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class OfferPollExample {

public static void main(String[] args) {
// 创建一个大小为 3 的 ArrayBlockingQueue
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

// 向队列中添加元素
System.out.println(queue.offer("A"));
System.out.println(queue.offer("B"));
System.out.println(queue.offer("C"));

// 尝试向队列中添加元素,但队列已满,返回 false
System.out.println(queue.offer("D"));

// 从队列中取出元素
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());

// 尝试从队列中取出元素,但队列已空,返回 null
System.out.println(queue.poll());
}

}

最终代码的输出结果如下,可以看到因为队列的大小为 3 的缘故,我们前 3 次存放到队列的结果为 true,第 4 次存放时,由于队列已满,所以存放结果返回 false。这也是为什么我们后续的 poll 方法只得到了 3 个元素的值。

1
2
3
4
5
6
7
8
true
true
true
false
A
B
C
null

了解了阻塞存取和非阻塞存取,我们再来看看阻塞队列的一个比较特殊的操作,某些场景下,我们希望能够一次性将阻塞队列的结果存到列表中再进行批量操作,我们就可以使用阻塞队列的 drainTo 方法,这个方法会一次性将队列中所有元素存放到列表,如果队列中有元素,且成功存到 list 中则 drainTo 会返回本次转移到 list 中的元素数,反之若队列为空,drainTo 则直接返回 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class DrainToExample {

public static void main(String[] args) {
// 创建一个大小为 5 的 ArrayBlockingQueue
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

// 向队列中添加元素
queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
queue.add(5);

// 创建一个 List,用于存储从队列中取出的元素
List<Integer> list = new ArrayList<>();

// 从队列中取出所有元素,并添加到 List 中
queue.drainTo(list);

// 输出 List 中的元素
System.out.println(list);
}

}

代码输出结果如下

1
[1, 2, 3, 4, 5]

ArrayBlockingQueue 源码分析

自此我们对阻塞队列的使用有了基本的印象,接下来我们就可以进一步了解一下 ArrayBlockingQueue 的工作机制了。

整体设计

在了解 ArrayBlockingQueue 的具体细节之前,我们先来看看 ArrayBlockingQueue 的类图。

ArrayBlockingQueue 类图

从图中我们可以看出,ArrayBlockingQueue 继承了阻塞队列 BlockingQueue 这个接口,不难猜出通过继承 BlockingQueue 这个接口之后,ArrayBlockingQueue 就拥有了阻塞队列那些常见的操作行为。

同时, ArrayBlockingQueue 还继承了 AbstractQueue 这个抽象类,这个继承了 AbstractCollection 和 Queue 的抽象类,从抽象类的特定和语义我们也可以猜出,这个继承关系使得 ArrayBlockingQueue 拥有了队列的常见操作。

所以我们是否可以得出这样一个结论,通过继承 AbstractQueue 获得队列所有的操作模板,其实现的入队和出队操作的整体框架。然后 ArrayBlockingQueue 通过继承 BlockingQueue 获取到阻塞队列的常见操作并将这些操作实现,填充到 AbstractQueue 模板方法的细节中,由此 ArrayBlockingQueue 成为一个完整的阻塞队列。

为了印证这一点,我们到源码中一探究竟。首先我们先来看看 AbstractQueue,从类的继承关系我们可以大致得出,它通过 AbstractCollection 获得了集合的常见操作方法,然后通过 Queue 接口获得了队列的特性。

1
2
3
4
5
public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {
//...
}

对于集合的操作无非是增删改查,所以我们不妨从添加方法入手,从源码中我们可以看到,它实现了 AbstractCollection 的 add 方法,其内部逻辑如下:

  1. 调用继承 Queue 接口的来的 offer 方法,如果 offer 成功则返回 true。
  2. 如果 offer 失败,即代表当前元素入队失败直接抛异常。
1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

而 AbstractQueue 中并没有对 Queue 的 offer 的实现,很明显这样做的目的是定义好了 add 的核心逻辑,将 offer 的细节交由其子类即我们的 ArrayBlockingQueue 实现。

到此,我们对于抽象类 AbstractQueue 的分析就结束了,我们继续看看 ArrayBlockingQueue 中另一个重要的继承接口 BlockingQueue。

点开 BlockingQueue 之后,我们可以看到这个接口同样继承了 Queue 接口,这就意味着它也具备了队列所拥有的所有行为。同时,它还定义了自己所需要实现的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public interface BlockingQueue<E> extends Queue<E> {

//元素入队成功返回true,反之则会抛出异常IllegalStateException
boolean add(E e);

//元素入队成功返回true,反之返回false
boolean offer(E e);

//元素入队成功则直接返回,如果队列已满元素不可入队则将线程阻塞,因为阻塞期间可能会被打断,所以这里方法签名抛出了InterruptedException
void put(E e) throws InterruptedException;

//和上一个方法一样,只不过队列满时只会阻塞单位为unit,时间为timeout的时长,如果在等待时长内没有入队成功则直接返回false。
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

//从队头取出一个元素,如果队列为空则阻塞等待,因为会阻塞线程的缘故,所以该方法可能会被打断,所以签名定义了InterruptedException
E take() throws InterruptedException;

//取出队头的元素并返回,如果当前队列为空则阻塞等待timeout且单位为unit的时长,如果这个时间段没有元素则直接返回null。
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

//获取队列剩余元素个数
int remainingCapacity();

//删除我们指定的对象,如果成功返回true,反之返回false。
boolean remove(Object o);

//判断队列中是否包含指定元素
public boolean contains(Object o);

//将队列中的元素全部存到指定的集合中
int drainTo(Collection<? super E> c);

//转移maxElements个元素到集合中
int drainTo(Collection<? super E> c, int maxElements);
}

了解了 BlockingQueue 的常见操作后,我们就知道了 ArrayBlockingQueue 通过继承 BlockingQueue 的方法并实现后,填充到 AbstractQueue 的方法上,由此我们便知道了上文中 AbstractQueue 的 add 方法的 offer 方法是哪里是实现的了。

1
2
3
4
5
6
7
public boolean add(E e) {
//AbstractQueue的offer来自下层的ArrayBlockingQueue从BlockingQueue继承并实现的offer方法
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

初始化

了解 ArrayBlockingQueue 的细节前,我们不妨先看看其构造函数,了解一下其初始化过程。从源码中我们可以看出 ArrayBlockingQueue 有 3 个构造方法,而最核心的构造方法就是下方这一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
// capacity 表示队列初始容量,fair 表示 锁的公平性
public ArrayBlockingQueue(int capacity, boolean fair) {
//如果设置的队列大小小于0,则直接抛出IllegalArgumentException
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化一个数组用于存放队列的元素
this.items = new Object[capacity];
//创建阻塞队列流程控制的锁
lock = new ReentrantLock(fair);
//用lock锁创建两个条件控制队列生产和消费
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

这个构造方法里面有两个比较核心的成员变量 notEmpty(非空) 和 notFull (非满) ,需要我们格外留意,它们是实现生产者和消费者有序工作的关键所在,这一点笔者会在后续的源码解析中详细说明,这里我们只需初步了解一下阻塞队列的构造即可。

另外两个构造方法都是基于上述的构造方法,默认情况下,我们会使用下面这个构造方法,该构造方法就意味着 ArrayBlockingQueue 用的是非公平锁,即各个生产者或者消费者线程收到通知后,对于锁的争抢是随机的。

1
2
3
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

还有一个不怎么常用的构造方法,在初始化容量和锁的非公平性之后,它还提供了一个 Collection 参数,从源码中不难看出这个构造方法是将外部传入的集合的元素在初始化时直接存放到阻塞队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//初始化容量和锁的公平性
this(capacity, fair);

final ReentrantLock lock = this.lock;
//上锁并将c中的元素存放到ArrayBlockingQueue底层的数组中
lock.lock();
try {
int i = 0;
try {
//遍历并添加元素到数组中
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
//记录当前队列容量
count = i;
//更新下一次put或者offer或用add方法添加到队列底层数组的位置
putIndex = (i == capacity) ? 0 : i;
} finally {
//完成遍历后释放锁
lock.unlock();
}
}

阻塞式获取和新增元素

ArrayBlockingQueue 阻塞式获取和新增元素对应的就是生产者-消费者模型,虽然它也支持非阻塞式获取和新增元素(例如 poll() 和 offer(E e) 方法,后文会介绍到),但一般不会使用。

ArrayBlockingQueue 阻塞式获取和新增元素的方法为:

  • put(E e):将元素插入队列中,如果队列已满,则该方法会一直阻塞,直到队列有空间可用或者线程被中断。
  • take() :获取并移除队列头部的元素,如果队列为空,则该方法会一直阻塞,直到队列非空或者线程被中断。

这两个方法实现的关键就是在于两个条件对象 notEmpty(非空) 和 notFull (非满),这个我们在上文的构造方法中有提到。

接下来笔者就通过两张图让大家了解一下这两个条件是如何在阻塞队列中运用的。

ArrayBlockingQueue 非空条件

假设我们的代码消费者先启动,当它发现队列中没有数据,那么非空条件就会将这个线程挂起,即等待条件非空时挂起。然后 CPU 执行权到达生产者,生产者发现队列中可以存放数据,于是将数据存放进去,通知此时条件非空,此时消费者就会被唤醒到队列中使用 take 等方法获取值了。

ArrayBlockingQueue 非满条件

随后的执行中,生产者生产速度远远大于消费者消费速度,于是生产者将队列塞满后再次尝试将数据存入队列,发现队列已满,于是阻塞队列就将当前线程挂起,等待非满。然后消费者拿着 CPU 执行权进行消费,于是队列可以存放新数据了,发出一个非满的通知,此时挂起的生产者就会等待 CPU 执行权到来时再次尝试将数据存到队列中。

简单了解阻塞队列的基于两个条件的交互流程之后,我们不妨看看 put 和 take 方法的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void put(E e) throws InterruptedException {
//确保插入的元素不为null
checkNotNull(e);
//加锁
final ReentrantLock lock = this.lock;
//这里使用lockInterruptibly()方法而不是lock()方法是为了能够响应中断操作,如果在等待获取锁的过程中被打断则该方法会抛出InterruptedException异常。
lock.lockInterruptibly();
try {
//如果count等数组长度则说明队列已满,当前线程将被挂起放到AQS队列中,等待队列非满时插入(非满条件)。
//在等待期间,锁会被释放,其他线程可以继续对队列进行操作。
while (count == items.length)
notFull.await();
//如果队列可以存放元素,则调用enqueue将元素入队
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}

put方法内部调用了 enqueue 方法来实现元素入队,我们继续深入查看一下 enqueue 方法的实现细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
private void enqueue(E x) {
//获取队列底层的数组
final Object[] items = this.items;
//将putindex位置的值设置为我们传入的x
items[putIndex] = x;
//更新putindex,如果putindex等于数组长度,则更新为0
if (++putIndex == items.length)
putIndex = 0;
//队列长度+1
count++;
//通知队列非空,那些因为获取元素而阻塞的线程可以继续工作了
notEmpty.signal();
}

从源码中可以看到入队操作的逻辑就是在数组中追加一个新元素,整体执行步骤为:

  1. 获取 ArrayBlockingQueue 底层的数组 items。
  2. 将元素存到 putIndex 位置。
  3. 更新 putIndex 到下一个位置,如果 putIndex 等于队列长度,则说明 putIndex 已经到达数组末尾了,下一次插入则需要 0 开始。(ArrayBlockingQueue 用到了循环队列的思想,即从头到尾循环复用一个数组)
  4. 更新 count 的值,表示当前队列长度+1。
  5. 调用 notEmpty.signal() 通知队列非空,消费者可以从队列中获取值了。

自此我们了解了 put 方法的流程,为了更加完整的了解 ArrayBlockingQueue 关于生产者-消费者模型的设计,我们继续看看阻塞获取队列元素的 take 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E take() throws InterruptedException {
//获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列中元素个数为0,则将当前线程打断并存入AQS队列中,等待队列非空时获取并移除元素(非空条件)
while (count == 0)
notEmpty.await();
//如果队列不为空则调用dequeue获取元素
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}

理解了 put 方法再看take 方法就很简单了,其核心逻辑和put 方法正好是相反的,比如put 方法在队列满的时候等待队列非满时插入元素(非满条件),而take 方法等待队列非空时获取并移除元素(非空条件)。

take方法内部调用了 dequeue 方法来实现元素出队,其核心逻辑和 enqueue 方法也是相反的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private E dequeue() {
//获取阻塞队列底层的数组
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//从队列中获取takeIndex位置的元素
E x = (E) items[takeIndex];
//将takeIndex置空
items[takeIndex] = null;
//takeIndex向后挪动,如果等于数组长度则更新为0
if (++takeIndex == items.length)
takeIndex = 0;
//队列长度减1
count--;
if (itrs != null)
itrs.elementDequeued();
//通知那些被打断的线程当前队列状态非满,可以继续存放元素
notFull.signal();
return x;
}

由于dequeue 方法(出队)和上面介绍的 enqueue 方法(入队)的步骤大致类似,这里就不重复介绍了。

为了帮助理解,我专门画了一张图来展示 notEmpty(非空) 和 notFull (非满)这两个条件对象是如何控制 ArrayBlockingQueue 的存和取的。

ArrayBlockingQueue 非空非满

  • 消费者:当消费者从队列中 take 或者 poll 等操作取出一个元素之后,就会通知队列非满,此时那些等待非满的生产者就会被唤醒等待获取 CPU 时间片进行入队操作。
  • 生产者:当生产者将元素存到队列中后,就会触发通知队列非空,此时消费者就会被唤醒等待 CPU 时间片尝试获取元素。如此往复,两个条件对象就构成一个环路,控制着多线程之间的存和取。

非阻塞式获取和新增元素

ArrayBlockingQueue 非阻塞式获取和新增元素的方法为:

  • offer(E e):将元素插入队列尾部。如果队列已满,则该方法会直接返回 false,不会等待并阻塞线程。
  • poll():获取并移除队列头部的元素,如果队列为空,则该方法会直接返回 null,不会等待并阻塞线程。
  • add(E e):将元素插入队列尾部。如果队列已满则会抛出 IllegalStateException 异常,底层基于 offer(E e) 方法。
  • remove():移除队列头部的元素,如果队列为空则会抛出 NoSuchElementException 异常,底层基于 poll()。
  • peek():获取但不移除队列头部的元素,如果队列为空,则该方法会直接返回 null,不会等待并阻塞线程。

先来看看 offer 方法,逻辑和 put 差不多,唯一的区别就是入队失败时不会阻塞当前线程,而是直接返回 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean offer(E e) {
//确保插入的元素不为null
checkNotNull(e);
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列已满直接返回false
if (count == items.length)
return false;
else {
//反之将元素入队并直接返回true
enqueue(e);
return true;
}
} finally {
//释放锁
lock.unlock();
}
}

poll 方法同理,获取元素失败也是直接返回空,并不会阻塞获取元素的线程。

1
2
3
4
5
6
7
8
9
10
11
public E poll() {
final ReentrantLock lock = this.lock;
//上锁
lock.lock();
try {
//如果队列为空直接返回null,反之出队返回元素值
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

add 方法其实就是对于 offer 做了一层封装,如下代码所示,可以看到 add 会调用没有规定时间的 offer,如果入队失败则直接抛异常。

1
2
3
4
5
6
7
8
9
10
11
12
public boolean add(E e) {
return super.add(e);
}


public boolean add(E e) {
//调用offer方法如果失败直接抛出异常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

remove 方法同理,调用 poll,如果返回 null 则说明队列没有元素,直接抛出异常。

1
2
3
4
5
6
7
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

peek() 方法的逻辑也很简单,内部调用了 itemAt 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public E peek() {
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当队列为空时返回 null
return itemAt(takeIndex);
} finally {
//释放锁
lock.unlock();
}
}

//返回队列中指定位置的元素
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}

指定超时时间内阻塞式获取和新增元素

在 offer(E e) 和 poll() 非阻塞获取和新增元素的基础上,设计者提供了带有等待时间的 offer(E e, long timeout, TimeUnit unit) 和 poll(long timeout, TimeUnit unit) ,用于在指定的超时时间内阻塞式地添加和获取元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列已满,进入循环
while (count == items.length) {
//时间到了队列还是满的,则直接返回false
if (nanos <= 0)
return false;
//阻塞nanos时间,等待非满
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}

可以看到,带有超时时间的 offer 方法在队列已满的情况下,会等待用户所传的时间段,如果规定时间内还不能存放元素则直接返回 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列为空,循环等待,若时间到还是空的,则直接返回null
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

同理,带有超时时间的 poll 也一样,队列为空则在规定时间内等待,若时间到了还是空的,则直接返回 null。

判断元素是否存在

ArrayBlockingQueue 提供了 contains(Object o) 来判断指定元素是否存在于队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public boolean contains(Object o) {
//若目标元素为空,则直接返回 false
if (o == null) return false;
//获取当前队列的元素数组
final Object[] items = this.items;
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列非空
if (count > 0) {
final int putIndex = this.putIndex;
//从队列头部开始遍历
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
//释放锁
lock.unlock();
}
}

ArrayBlockingQueue 获取和新增元素的方法对比

为了帮助理解 ArrayBlockingQueue ,我们再来对比一下上面提到的这些获取和新增元素的方法。

新增元素:

方法 队列满时处理方式 方法返回值
put(E e) 线程阻塞,直到中断或被唤醒 void
offer(E e) 直接返回 false boolean
offer(E e, long timeout, TimeUnit unit) 指定超时时间内阻塞,超过规定时间还未添加成功则返回 false boolean
add(E e) 直接抛出 IllegalStateException 异常 boolean

获取/移除元素:

方法 队列空时处理方式 方法返回值
take() 线程阻塞,直到中断或被唤醒 E
poll() 返回 null E
poll(long timeout, TimeUnit unit) 指定超时时间内阻塞,超过规定时间还是空的则返回 null E
peek() 返回 null E
remove() 直接抛出 NoSuchElementException 异常 boolean

ArrayBlockingQueue 相关面试题

ArrayBlockingQueue 是什么?它的特点是什么?

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,常用于多线程之间的数据共享,底层采用数组实现,从其名字就能看出来了。

ArrayBlockingQueue 的容量有限,一旦创建,容量不能改变。

为了保证线程安全,ArrayBlockingQueue 的并发控制采用可重入锁 ReentrantLock ,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。并且,它还支持公平和非公平两种方式的锁访问机制,默认是非公平锁。

ArrayBlockingQueue 虽名为阻塞队列,但也支持非阻塞获取和新增元素(例如 poll() 和 offer(E e) 方法),只是队列满时添加元素会抛出异常,队列为空时获取的元素为 null,一般不会使用。

ArrayBlockingQueue 和 LinkedBlockingQueue 有什么区别?

ArrayBlockingQueue 和 LinkedBlockingQueue 是 Java 并发包中常用的两种阻塞队列实现,它们都是线程安全的。不过,不过它们之间也存在下面这些区别:

  • 底层实现:ArrayBlockingQueue 基于数组实现,而 LinkedBlockingQueue 基于链表实现。
  • 是否有界:ArrayBlockingQueue 是有界队列,必须在创建时指定容量大小。LinkedBlockingQueue 创建时可以不指定容量大小,默认是Integer.MAX_VALUE,也就是无界的。但也可以指定队列大小,从而成为有界的。
  • 锁是否分离: ArrayBlockingQueue中的锁是没有分离的,即生产和消费用的是同一个锁;LinkedBlockingQueue中的锁是分离的,即生产用的是putLock,消费是takeLock,这样可以防止生产者和消费者线程之间的锁争夺。
  • 内存占用:ArrayBlockingQueue 需要提前分配数组内存,而 LinkedBlockingQueue 则是动态分配链表节点内存。这意味着,ArrayBlockingQueue 在创建时就会占用一定的内存空间,且往往申请的内存比实际所用的内存更大,而LinkedBlockingQueue 则是根据元素的增加而逐渐占用内存空间。

ArrayBlockingQueue 和 ConcurrentLinkedQueue 有什么区别?

ArrayBlockingQueue 和 ConcurrentLinkedQueue 是 Java 并发包中常用的两种队列实现,它们都是线程安全的。不过,不过它们之间也存在下面这些区别:

  • 底层实现:ArrayBlockingQueue 基于数组实现,而 ConcurrentLinkedQueue 基于链表实现。
  • 是否有界:ArrayBlockingQueue 是有界队列,必须在创建时指定容量大小,而 ConcurrentLinkedQueue 是无界队列,可以动态地增加容量。
  • 是否阻塞:ArrayBlockingQueue 支持阻塞和非阻塞两种获取和新增元素的方式(一般只会使用前者), ConcurrentLinkedQueue 是无界的,仅支持非阻塞式获取和新增元素。

ArrayBlockingQueue 的实现原理是什么?

ArrayBlockingQueue 的实现原理主要分为以下几点(这里以阻塞式获取和新增元素为例介绍):

  • ArrayBlockingQueue 内部维护一个定长的数组用于存储元素。
  • 通过使用 ReentrantLock 锁对象对读写操作进行同步,即通过锁机制来实现线程安全。
  • 通过 Condition 实现线程间的等待和唤醒操作。

这里再详细介绍一下线程间的等待和唤醒具体的实现(不需要记具体的方法,面试中回答要点即可):

  • 当队列已满时,生产者线程会调用 notFull.await() 方法让生产者进行等待,等待队列非满时插入(非满条件)。
  • 当队列为空时,消费者线程会调用 notEmpty.await()方法让消费者进行等待,等待队列非空时消费(非空条件)。
  • 当有新的元素被添加时,生产者线程会调用 notEmpty.signal()方法唤醒正在等待消费的消费者线程。
  • 当队列中有元素被取出时,消费者线程会调用 notFull.signal()方法唤醒正在等待插入元素的生产者线程。

关于 Condition接口的补充:

Condition是 JDK1.5 之后才有的,它具有很好的灵活性,比如可以实现多路通知功能也就是在一个Lock对象中可以创建多个Condition实例(即对象监视器),线程对象可以注册在指定的Condition中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。 在使用notify()/notifyAll()方法进行通知时,被通知的线程是由 JVM 选择的,用ReentrantLock类结合Condition实例可以实现“选择性通知” ,这个功能非常重要,而且是 Condition 接口默认提供的。而synchronized关键字就相当于整个 Lock 对象中只有一个Condition实例,所有的线程都注册在它一个身上。如果执行notifyAll()方法的话就会通知所有处于等待状态的线程,这样会造成很大的效率问题。而Condition实例的signalAll()方法,只会唤醒注册在该Condition实例中的所有等待线程。

参考文献

  • 深入理解 Java 系列 | BlockingQueue 用法详解:https://juejin.cn/post/6999798721269465102
  • 深入浅出阻塞队列 BlockingQueue 及其典型实现 ArrayBlockingQueue:https://zhuanlan.zhihu.com/p/539619957
  • 并发编程大扫盲:ArrayBlockingQueue 底层原理和实战:https://zhuanlan.zhihu.com/p/339662987

Java集合常见面试题总结(下)

发表于 2021-06-21 | 分类于 Java , 集合 | 阅读次数:
字数统计: 8.1k 字 | 阅读时长 ≈ 31 分钟

Map(重要)

HashMap 和 Hashtable 的区别

  • 线程是否安全: HashMap 是非线程安全的,Hashtable 是线程安全的,因为 Hashtable 内部的方法基本都经过synchronized 修饰。(如果你要保证线程安全的话就使用 ConcurrentHashMap 吧!);
  • 效率: 因为线程安全的问题,HashMap 要比 Hashtable 效率高一点。另外,Hashtable 基本被淘汰,不要在代码中使用它;
  • 对 Null key 和 Null value 的支持: HashMap 可以存储 null 的 key 和 value,但 null 作为键只能有一个,null 作为值可以有多个;Hashtable 不允许有 null 键和 null 值,否则会抛出 NullPointerException。
  • 初始容量大小和每次扩充容量大小的不同: ① 创建时如果不指定容量初始值,Hashtable 默认的初始大小为 11,之后每次扩充,容量变为原来的 2n+1。HashMap 默认的初始化大小为 16。之后每次扩充,容量变为原来的 2 倍。② 创建时如果给定了容量初始值,那么 Hashtable 会直接使用你给定的大小,而 HashMap 会将其扩充为 2 的幂次方大小(HashMap 中的tableSizeFor()方法保证,下面给出了源代码)。也就是说 HashMap 总是使用 2 的幂作为哈希表的大小,后面会介绍到为什么是 2 的幂次方。
  • 底层数据结构: JDK1.8 以后的 HashMap 在解决哈希冲突时有了较大的变化,当链表长度大于阈值(默认为 8)时,将链表转化为红黑树(将链表转换成红黑树前会判断,如果当前数组的长度小于 64,那么会选择先进行数组扩容,而不是转换为红黑树),以减少搜索时间(后文中我会结合源码对这一过程进行分析)。Hashtable 没有这样的机制。
  • 哈希函数的实现:HashMap 对哈希值进行了高位和低位的混合扰动处理以减少冲突,而 Hashtable 直接使用键的 hashCode() 值。

HashMap 中带有初始容量的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
public HashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR);
}

下面这个方法保证了 HashMap 总是使用 2 的幂作为哈希表的大小。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Returns a power of two size for the given target capacity.
*/
static final int tableSizeFor(int cap) {
int n = cap - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

HashMap 和 HashSet 区别

如果你看过 HashSet 源码的话就应该知道:HashSet 底层就是基于 HashMap 实现的。(HashSet 的源码非常非常少,因为除了 clone()、writeObject()、readObject()是 HashSet 自己不得不实现之外,其他方法都是直接调用 HashMap 中的方法。

HashMap HashSet
实现了 Map 接口 实现 Set 接口
存储键值对 仅存储对象
调用 put()向 map 中添加元素 调用 add()方法向 Set 中添加元素
HashMap 使用键(Key)计算 hashcode HashSet 使用成员对象来计算 hashcode 值,对于两个对象来说 hashcode 可能相同,所以equals()方法用来判断对象的相等性

HashMap 和 TreeMap 区别

TreeMap 和HashMap 都继承自AbstractMap ,但是需要注意的是TreeMap它还实现了NavigableMap接口和SortedMap 接口。

TreeMap 继承关系图

实现 NavigableMap 接口让 TreeMap 有了对集合内元素的搜索的能力。

NavigableMap 接口提供了丰富的方法来探索和操作键值对:

  1. 定向搜索: ceilingEntry(), floorEntry(), higherEntry()和 lowerEntry() 等方法可以用于定位大于等于、小于等于、严格大于、严格小于给定键的最接近的键值对。
  2. 子集操作: subMap(), headMap()和 tailMap() 方法可以高效地创建原集合的子集视图,而无需复制整个集合。
  3. 逆序视图:descendingMap() 方法返回一个逆序的 NavigableMap 视图,使得可以反向迭代整个 TreeMap。
  4. 边界操作: firstEntry(), lastEntry(), pollFirstEntry()和 pollLastEntry() 等方法可以方便地访问和移除元素。

这些方法都是基于红黑树数据结构的属性实现的,红黑树保持平衡状态,从而保证了搜索操作的时间复杂度为 O(log n),这让 TreeMap 成为了处理有序集合搜索问题的强大工具。

实现SortedMap接口让 TreeMap 有了对集合中的元素根据键排序的能力。默认是按 key 的升序排序,不过我们也可以指定排序的比较器。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* @author shuang.kou
* @createTime 2020年06月15日 17:02:00
*/
public class Person {
private Integer age;

public Person(Integer age) {
this.age = age;
}

public Integer getAge() {
return age;
}


public static void main(String[] args) {
TreeMap<Person, String> treeMap = new TreeMap<>(new Comparator<Person>() {
@Override
public int compare(Person person1, Person person2) {
int num = person1.getAge() - person2.getAge();
return Integer.compare(num, 0);
}
});
treeMap.put(new Person(3), "person1");
treeMap.put(new Person(18), "person2");
treeMap.put(new Person(35), "person3");
treeMap.put(new Person(16), "person4");
treeMap.entrySet().stream().forEach(personStringEntry -> {
System.out.println(personStringEntry.getValue());
});
}
}

输出:

1
2
3
4
person1
person4
person2
person3

可以看出,TreeMap 中的元素已经是按照 Person 的 age 字段的升序来排列了。

上面,我们是通过传入匿名内部类的方式实现的,你可以将代码替换成 Lambda 表达式实现的方式:

1
2
3
4
TreeMap<Person, String> treeMap = new TreeMap<>((person1, person2) -> {
int num = person1.getAge() - person2.getAge();
return Integer.compare(num, 0);
});

综上,相比于HashMap来说, TreeMap 主要多了对集合中的元素根据键排序的能力以及对集合内元素的搜索的能力。

HashSet 如何检查重复?

以下内容摘自我的 Java 启蒙书《Head first java》第二版:

当你把对象加入HashSet时,HashSet 会先计算对象的hashcode值来判断对象加入的位置,同时也会与其他加入的对象的 hashcode 值作比较,如果没有相符的 hashcode,HashSet 会假设对象没有重复出现。但是如果发现有相同 hashcode 值的对象,这时会调用equals()方法来检查 hashcode 相等的对象是否真的相同。如果两者相同,HashSet 就不会让加入操作成功。

在 JDK1.8 中,HashSet的add()方法只是简单的调用了HashMap的put()方法,并且判断了一下返回值以确保是否有重复元素。直接看一下HashSet中的源码:

1
2
3
4
5
// Returns: true if this set did not already contain the specified element
// 返回值:当 set 中没有包含 add 的元素时返回真
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}

而在HashMap的putVal()方法中也能看到如下说明:

1
2
3
4
5
6
// Returns : previous value, or null if none
// 返回值:如果插入位置没有元素返回null,否则返回上一个元素
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
...
}

也就是说,在 JDK1.8 中,实际上无论HashSet中是否已经存在了某元素,HashSet都会直接插入,只是会在add()方法的返回值处告诉我们插入前是否存在相同元素。

HashMap 的底层实现

JDK1.8 之前

JDK1.8 之前 HashMap 底层是 数组和链表 结合在一起使用也就是 链表散列。HashMap 通过 key 的 hashcode 经过扰动函数处理过后得到 hash 值,然后通过 (n - 1) & hash 判断当前元素存放的位置(这里的 n 指的是数组的长度),如果当前位置存在元素的话,就判断该元素与要存入的元素的 hash 值以及 key 是否相同,如果相同的话,直接覆盖,不相同就通过拉链法解决冲突。

HashMap 中的扰动函数(hash 方法)是用来优化哈希值的分布。通过对原始的 hashCode() 进行额外处理,扰动函数可以减小由于糟糕的 hashCode() 实现导致的碰撞,从而提高数据的分布均匀性。

JDK 1.8 HashMap 的 hash 方法源码:

JDK 1.8 的 hash 方法 相比于 JDK 1.7 hash 方法更加简化,但是原理不变。

1
2
3
4
5
6
7
  static final int hash(Object key) {
int h;
// key.hashCode():返回散列值也就是hashcode
// ^:按位异或
// >>>:无符号右移,忽略符号位,空位都以0补齐
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

对比一下 JDK1.7 的 HashMap 的 hash 方法源码.

1
2
3
4
5
6
7
8
static int hash(int h) {
// This function ensures that hashCodes that differ only by
// constant multiples at each bit position have a bounded
// number of collisions (approximately 8 at default load factor).

h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}

相比于 JDK1.8 的 hash 方法 ,JDK 1.7 的 hash 方法的性能会稍差一点点,因为毕竟扰动了 4 次。

所谓 “拉链法” 就是:将链表和数组相结合。也就是说创建一个链表数组,数组中每一格就是一个链表。若遇到哈希冲突,则将冲突的值加到链表中即可。

jdk1.8 之前的内部结构-HashMap

JDK1.8 之后

相比于之前的版本, JDK1.8 之后在解决哈希冲突时有了较大的变化,当链表长度大于阈值(默认为 8)(将链表转换成红黑树前会判断,如果当前数组的长度小于 64,那么会选择先进行数组扩容,而不是转换为红黑树)时,将链表转化为红黑树,以减少搜索时间。

jdk1.8之后的内部结构-HashMap

TreeMap、TreeSet 以及 JDK1.8 之后的 HashMap 底层都用到了红黑树。红黑树就是为了解决二叉查找树的缺陷,因为二叉查找树在某些情况下会退化成一个线性结构。

我们来结合源码分析一下 HashMap 链表到红黑树的转换。

1、 putVal 方法中执行链表转红黑树的判断逻辑。

链表的长度大于 8 的时候,就执行 treeifyBin (转换红黑树)的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 遍历链表
for (int binCount = 0; ; ++binCount) {
// 遍历到链表最后一个节点
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
// 如果链表元素个数大于TREEIFY_THRESHOLD(8)
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
// 红黑树转换(并不会直接转换成红黑树)
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}

2、treeifyBin 方法中判断是否真的转换为红黑树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final void treeifyBin(Node<K,V>[] tab, int hash) {
int n, index; Node<K,V> e;
// 判断当前数组的长度是否小于 64
if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
// 如果当前数组的长度小于 64,那么会选择先进行数组扩容
resize();
else if ((e = tab[index = (n - 1) & hash]) != null) {
// 否则才将列表转换为红黑树

TreeNode<K,V> hd = null, tl = null;
do {
TreeNode<K,V> p = replacementTreeNode(e, null);
if (tl == null)
hd = p;
else {
p.prev = tl;
tl.next = p;
}
tl = p;
} while ((e = e.next) != null);
if ((tab[index] = hd) != null)
hd.treeify(tab);
}
}

将链表转换成红黑树前会判断,如果当前数组的长度小于 64,那么会选择先进行数组扩容,而不是转换为红黑树。

HashMap 的长度为什么是 2 的幂次方

为了让 HashMap 存取高效并减少碰撞,我们需要确保数据尽量均匀分布。哈希值在 Java 中通常使用 int 表示,其范围是 -2147483648 ~ 2147483647前后加起来大概 40 亿的映射空间,只要哈希函数映射得比较均匀松散,一般应用是很难出现碰撞的。但是,问题是一个 40 亿长度的数组,内存是放不下的。所以,这个散列值是不能直接拿来用的。用之前还要先做对数组的长度取模运算,得到的余数才能用来要存放的位置也就是对应的数组下标。

这个算法应该如何设计呢?

我们首先可能会想到采用 % 取余的操作来实现。但是,重点来了:“取余(%)操作中如果除数是 2 的幂次则等价于与其除数减一的与(&)操作(也就是说 hash%length==hash&(length-1) 的前提是 length 是 2 的 n 次方)。” 并且,采用二进制位操作 & 相对于 % 能够提高运算效率。

除了上面所说的位运算比取余效率高之外,我觉得更重要的一个原因是:长度是 2 的幂次方,可以让 HashMap 在扩容的时候更均匀。例如:

  • length = 8 时,length - 1 = 7 的二进制位0111
  • length = 16 时,length - 1 = 15 的二进制位1111

这时候原本存在 HashMap 中的元素计算新的数组位置时 hash&(length-1),取决 hash 的第四个二进制位(从右数),会出现两种情况:

  1. 第四个二进制位为 0,数组位置不变,也就是说当前元素在新数组和旧数组的位置相同。
  2. 第四个二进制位为 1,数组位置在新数组扩容之后的那一部分。

这里列举一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
假设有一个元素的哈希值为 10101100

旧数组元素位置计算:
hash = 10101100
length - 1 = 00000111
& -----------------
index = 00000100 (4)

新数组元素位置计算:
hash = 10101100
length - 1 = 00001111
& -----------------
index = 00001100 (12)

看第四位(从右数):
1.高位为 0:位置不变。
2.高位为 1:移动到新位置(原索引位置+原容量)。

⚠️注意:这里列举的场景看的是第四个二进制位,更准确点来说看的是高位(从右数),例如 length = 32 时,length - 1 = 31,二进制为 11111,这里看的就是第五个二进制位。

也就是说扩容之后,在旧数组元素 hash 值比较均匀(至于 hash 值均不均匀,取决于前面讲的对象的 hashcode() 方法和扰动函数)的情况下,新数组元素也会被分配的比较均匀,最好的情况是会有一半在新数组的前半部分,一半在新数组后半部分。

这样也使得扩容机制变得简单和高效,扩容后只需检查哈希值高位的变化来决定元素的新位置,要么位置不变(高位为 0),要么就是移动到新位置(高位为 1,原索引位置+原容量)。

最后,简单总结一下 HashMap 的长度是 2 的幂次方的原因:

  1. 位运算效率更高:位运算(&)比取余运算(%)更高效。当长度为 2 的幂次方时,hash % length 等价于 hash & (length - 1)。
  2. 可以更好地保证哈希值的均匀分布:扩容之后,在旧数组元素 hash 值比较均匀的情况下,新数组元素也会被分配的比较均匀,最好的情况是会有一半在新数组的前半部分,一半在新数组后半部分。
  3. 扩容机制变得简单和高效:扩容后只需检查哈希值高位的变化来决定元素的新位置,要么位置不变(高位为 0),要么就是移动到新位置(高位为 1,原索引位置+原容量)。

HashMap 多线程操作导致死循环问题

JDK1.7 及之前版本的 HashMap 在多线程环境下扩容操作可能存在死循环问题,这是由于当一个桶位中有多个元素需要进行扩容时,多个线程同时对链表进行操作,头插法可能会导致链表中的节点指向错误的位置,从而形成一个环形链表,进而使得查询元素的操作陷入死循环无法结束。

为了解决这个问题,JDK1.8 版本的 HashMap 采用了尾插法而不是头插法来避免链表倒置,使得插入的节点永远都是放在链表的末尾,避免了链表中的环形结构。但是还是不建议在多线程下使用 HashMap,因为多线程下使用 HashMap 还是会存在数据覆盖的问题。并发环境下,推荐使用 ConcurrentHashMap 。

一般面试中这样介绍就差不多,不需要记各种细节,个人觉得也没必要记。如果想要详细了解 HashMap 扩容导致死循环问题,可以看看耗子叔的这篇文章:Java HashMap 的死循环。

HashMap 为什么线程不安全?

JDK1.7 及之前版本,在多线程环境下,HashMap 扩容时会造成死循环和数据丢失的问题。

数据丢失这个在 JDK1.7 和 JDK 1.8 中都存在,这里以 JDK 1.8 为例进行介绍。

JDK 1.8 后,在 HashMap 中,多个键值对可能会被分配到同一个桶(bucket),并以链表或红黑树的形式存储。多个线程对 HashMap 的 put 操作会导致线程不安全,具体来说会有数据覆盖的风险。

举个例子:

  • 两个线程 1,2 同时进行 put 操作,并且发生了哈希冲突(hash 函数计算出的插入下标是相同的)。
  • 不同的线程可能在不同的时间片获得 CPU 执行的机会,当前线程 1 执行完哈希冲突判断后,由于时间片耗尽挂起。线程 2 先完成了插入操作。
  • 随后,线程 1 获得时间片,由于之前已经进行过 hash 碰撞的判断,所有此时会直接进行插入,这就导致线程 2 插入的数据被线程 1 覆盖了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
// ...
// 判断是否出现 hash 碰撞
// (n - 1) & hash 确定元素存放在哪个桶中,桶为空,新生成结点放入桶中(此时,这个结点是放在数组中)
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
// 桶中已经存在元素(处理hash冲突)
else {
// ...
}

还有一种情况是这两个线程同时 put 操作导致 size 的值不正确,进而导致数据覆盖的问题:

  1. 线程 1 执行 if(++size > threshold) 判断时,假设获得 size 的值为 10,由于时间片耗尽挂起。
  2. 线程 2 也执行 if(++size > threshold) 判断,获得 size 的值也为 10,并将元素插入到该桶位中,并将 size 的值更新为 11。
  3. 随后,线程 1 获得时间片,它也将元素放入桶位中,并将 size 的值更新为 11。
  4. 线程 1、2 都执行了一次 put 操作,但是 size 的值只增加了 1,也就导致实际上只有一个元素被添加到了 HashMap 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
// ...
// 实际大小大于阈值则扩容
if (++size > threshold)
resize();
// 插入后回调
afterNodeInsertion(evict);
return null;
}

HashMap 常见的遍历方式?

HashMap 的 7 种遍历方式与性能分析!

🐛 修正(参见:issue#1411):

这篇文章对于 parallelStream 遍历方式的性能分析有误,先说结论:存在阻塞时 parallelStream 性能最高, 非阻塞时 parallelStream 性能最低 。

当遍历不存在阻塞时, parallelStream 的性能是最低的:

1
2
3
4
5
Benchmark               Mode  Cnt     Score      Error  Units
Test.entrySet avgt 5 288.651 ± 10.536 ns/op
Test.keySet avgt 5 584.594 ± 21.431 ns/op
Test.lambda avgt 5 221.791 ± 10.198 ns/op
Test.parallelStream avgt 5 6919.163 ± 1116.139 ns/op

加入阻塞代码Thread.sleep(10)后, parallelStream 的性能才是最高的:

1
2
3
4
5
Benchmark               Mode  Cnt           Score          Error  Units
Test.entrySet avgt 5 1554828440.000 ± 23657748.653 ns/op
Test.keySet avgt 5 1550612500.000 ± 6474562.858 ns/op
Test.lambda avgt 5 1551065180.000 ± 19164407.426 ns/op
Test.parallelStream avgt 5 186345456.667 ± 3210435.590 ns/op

ConcurrentHashMap 和 Hashtable 的区别

ConcurrentHashMap 和 Hashtable 的区别主要体现在实现线程安全的方式上不同。

  • 底层数据结构: JDK1.7 的 ConcurrentHashMap 底层采用 分段的数组+链表 实现,JDK1.8 采用的数据结构跟 HashMap1.8 的结构一样,数组+链表/红黑二叉树。Hashtable 和 JDK1.8 之前的 HashMap 的底层数据结构类似都是采用 数组+链表 的形式,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的;
  • 实现线程安全的方式(重要):
    • 在 JDK1.7 的时候,ConcurrentHashMap 对整个桶数组进行了分割分段(Segment,分段锁),每一把锁只锁容器其中一部分数据(下面有示意图),多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。
    • 到了 JDK1.8 的时候,ConcurrentHashMap 已经摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 synchronized 和 CAS 来操作。(JDK1.6 以后 synchronized 锁做了很多优化) 整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本;
    • Hashtable(同一把锁) :使用 synchronized 来保证线程安全,效率非常低下。当一个线程访问同步方法时,其他线程也访问同步方法,可能会进入阻塞或轮询状态,如使用 put 添加元素,另一个线程不能使用 put 添加元素,也不能使用 get,竞争会越来越激烈效率越低。

下面,我们再来看看两者底层数据结构的对比图。

Hashtable :

Hashtable 的内部结构

https://www.cnblogs.com/chengxiao/p/6842045.html>

JDK1.7 的 ConcurrentHashMap:

Java7 ConcurrentHashMap 存储结构

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。

Segment 数组中的每个元素包含一个 HashEntry 数组,每个 HashEntry 数组属于链表结构。

JDK1.8 的 ConcurrentHashMap:

Java8 ConcurrentHashMap 存储结构

JDK1.8 的 ConcurrentHashMap 不再是 Segment 数组 + HashEntry 数组 + 链表,而是 Node 数组 + 链表 / 红黑树。不过,Node 只能用于链表的情况,红黑树的情况需要使用 **TreeNode**。当冲突链表达到一定长度时,链表会转换成红黑树。

TreeNode是存储红黑树节点,被TreeBin包装。TreeBin通过root属性维护红黑树的根结点,因为红黑树在旋转的时候,根结点可能会被它原来的子节点替换掉,在这个时间点,如果有其他线程要写这棵红黑树就会发生线程不安全问题,所以在 ConcurrentHashMap 中TreeBin通过waiter属性维护当前使用这棵红黑树的线程,来防止其他线程的进入。

1
2
3
4
5
6
7
8
9
10
11
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
...
}

ConcurrentHashMap 线程安全的具体实现方式/底层具体实现

JDK1.8 之前

Java7 ConcurrentHashMap 存储结构

首先将数据分为一段一段(这个“段”就是 Segment)的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据时,其他段的数据也能被其他线程访问。

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。

Segment 继承了 ReentrantLock,所以 Segment 是一种可重入锁,扮演锁的角色。HashEntry 用于存储键值对数据。

1
2
static class Segment<K,V> extends ReentrantLock implements Serializable {
}

一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的个数一旦初始化就不能改变。 Segment 数组的大小默认是 16,也就是说默认可以同时支持 16 个线程并发写。

Segment 的结构和 HashMap 类似,是一种数组和链表结构,一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment 的锁。也就是说,对同一 Segment 的并发写入会被阻塞,不同 Segment 的写入是可以并发执行的。

JDK1.8 之后

Java8 ConcurrentHashMap 存储结构

Java 8 几乎完全重写了 ConcurrentHashMap,代码量从原来 Java 7 中的 1000 多行,变成了现在的 6000 多行。

ConcurrentHashMap 取消了 Segment 分段锁,采用 Node + CAS + synchronized 来保证并发安全。数据结构跟 HashMap 1.8 的结构类似,数组+链表/红黑二叉树。Java 8 在链表长度超过一定阈值(8)时将链表(寻址时间复杂度为 O(N))转换为红黑树(寻址时间复杂度为 O(log(N)))。

Java 8 中,锁粒度更细,synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 hash 不冲突,就不会产生并发,就不会影响其他 Node 的读写,效率大幅提升。

JDK 1.7 和 JDK 1.8 的 ConcurrentHashMap 实现有什么不同?

  • 线程安全实现方式:JDK 1.7 采用 Segment 分段锁来保证安全, Segment 是继承自 ReentrantLock。JDK1.8 放弃了 Segment 分段锁的设计,采用 Node + CAS + synchronized 保证线程安全,锁粒度更细,synchronized 只锁定当前链表或红黑二叉树的首节点。
  • Hash 碰撞解决方法 : JDK 1.7 采用拉链法,JDK1.8 采用拉链法结合红黑树(链表长度超过一定阈值时,将链表转换为红黑树)。
  • 并发度:JDK 1.7 最大并发度是 Segment 的个数,默认是 16。JDK 1.8 最大并发度是 Node 数组的大小,并发度更大。

ConcurrentHashMap 为什么 key 和 value 不能为 null?

ConcurrentHashMap 的 key 和 value 不能为 null 主要是为了避免二义性。null 是一个特殊的值,表示没有对象或没有引用。如果你用 null 作为键,那么你就无法区分这个键是否存在于 ConcurrentHashMap 中,还是根本没有这个键。同样,如果你用 null 作为值,那么你就无法区分这个值是否是真正存储在 ConcurrentHashMap 中的,还是因为找不到对应的键而返回的。

拿 get 方法取值来说,返回的结果为 null 存在两种情况:

  • 值没有在集合中 ;
  • 值本身就是 null。

这也就是二义性的由来。

具体可以参考 ConcurrentHashMap 源码分析 。

多线程环境下,存在一个线程操作该 ConcurrentHashMap 时,其他的线程将该 ConcurrentHashMap 修改的情况,所以无法通过 containsKey(key) 来判断否存在这个键值对,也就没办法解决二义性问题了。

与此形成对比的是,HashMap 可以存储 null 的 key 和 value,但 null 作为键只能有一个,null 作为值可以有多个。如果传入 null 作为参数,就会返回 hash 值为 0 的位置的值。单线程环境下,不存在一个线程操作该 HashMap 时,其他的线程将该 HashMap 修改的情况,所以可以通过 contains(key)来做判断是否存在这个键值对,从而做相应的处理,也就不存在二义性问题。

也就是说,多线程下无法正确判定键值对是否存在(存在其他线程修改的情况),单线程是可以的(不存在其他线程修改的情况)。

如果你确实需要在 ConcurrentHashMap 中使用 null 的话,可以使用一个特殊的静态空对象来代替 null。

1
public static final Object NULL = new Object();

最后,再分享一下 ConcurrentHashMap 作者本人 (Doug Lea)对于这个问题的回答:

The main reason that nulls aren’t allowed in ConcurrentMaps (ConcurrentHashMaps, ConcurrentSkipListMaps) is that ambiguities that may be just barely tolerable in non-concurrent maps can’t be accommodated. The main one is that if map.get(key) returns null, you can’t detect whether the key explicitly maps to null vs the key isn’t mapped. In a non-concurrent map, you can check this via map.contains(key), but in a concurrent one, the map might have changed between calls.

翻译过来之后的,大致意思还是单线程下可以容忍歧义,而多线程下无法容忍。

ConcurrentHashMap 能保证复合操作的原子性吗?

ConcurrentHashMap 是线程安全的,意味着它可以保证多个线程同时对它进行读写操作时,不会出现数据不一致的情况,也不会导致 JDK1.7 及之前版本的 HashMap 多线程操作导致死循环问题。但是,这并不意味着它可以保证所有的复合操作都是原子性的,一定不要搞混了!

复合操作是指由多个基本操作(如put、get、remove、containsKey等)组成的操作,例如先判断某个键是否存在containsKey(key),然后根据结果进行插入或更新put(key, value)。这种操作在执行过程中可能会被其他线程打断,导致结果不符合预期。

例如,有两个线程 A 和 B 同时对 ConcurrentHashMap 进行复合操作,如下:

1
2
3
4
5
6
7
8
// 线程 A
if (!map.containsKey(key)) {
map.put(key, value);
}
// 线程 B
if (!map.containsKey(key)) {
map.put(key, anotherValue);
}

如果线程 A 和 B 的执行顺序是这样:

  1. 线程 A 判断 map 中不存在 key
  2. 线程 B 判断 map 中不存在 key
  3. 线程 B 将 (key, anotherValue) 插入 map
  4. 线程 A 将 (key, value) 插入 map

那么最终的结果是 (key, value),而不是预期的 (key, anotherValue)。这就是复合操作的非原子性导致的问题。

那如何保证 ConcurrentHashMap 复合操作的原子性呢?

ConcurrentHashMap 提供了一些原子性的复合操作,如 putIfAbsent、compute、computeIfAbsent 、computeIfPresent、merge等。这些方法都可以接受一个函数作为参数,根据给定的 key 和 value 来计算一个新的 value,并且将其更新到 map 中。

上面的代码可以改写为:

1
2
3
4
// 线程 A
map.putIfAbsent(key, value);
// 线程 B
map.putIfAbsent(key, anotherValue);

或者:

1
2
3
4
// 线程 A
map.computeIfAbsent(key, k -> value);
// 线程 B
map.computeIfAbsent(key, k -> anotherValue);

很多同学可能会说了,这种情况也能加锁同步呀!确实可以,但不建议使用加锁的同步机制,违背了使用 ConcurrentHashMap 的初衷。在使用 ConcurrentHashMap 的时候,尽量使用这些原子性的复合操作方法来保证原子性。

Collections 工具类(不重要)

Collections 工具类常用方法:

  • 排序
  • 查找,替换操作
  • 同步控制(不推荐,需要线程安全的集合类型时请考虑使用 JUC 包下的并发集合)

排序操作

1
2
3
4
5
6
void reverse(List list)//反转
void shuffle(List list)//随机排序
void sort(List list)//按自然排序的升序排序
void sort(List list, Comparator c)//定制排序,由Comparator控制排序逻辑
void swap(List list, int i , int j)//交换两个索引位置的元素
void rotate(List list, int distance)//旋转。当distance为正数时,将list后distance个元素整体移到前面。当distance为负数时,将 list的前distance个元素整体移到后面

查找,替换操作

1
2
3
4
5
6
7
int binarySearch(List list, Object key)//对List进行二分查找,返回索引,注意List必须是有序的
int max(Collection coll)//根据元素的自然顺序,返回最大的元素。 类比int min(Collection coll)
int max(Collection coll, Comparator c)//根据定制排序,返回最大元素,排序规则由Comparatator类控制。类比int min(Collection coll, Comparator c)
void fill(List list, Object obj)//用指定的元素代替指定list中的所有元素
int frequency(Collection c, Object o)//统计元素出现次数
int indexOfSubList(List list, List target)//统计target在list中第一次出现的索引,找不到则返回-1,类比int lastIndexOfSubList(List source, list target)
boolean replaceAll(List list, Object oldVal, Object newVal)//用新元素替换旧元素

同步控制

Collections 提供了多个synchronizedXxx()方法·,该方法可以将指定集合包装成线程同步的集合,从而解决多线程并发访问集合时的线程安全问题。

我们知道 HashSet,TreeSet,ArrayList,LinkedList,HashMap,TreeMap 都是线程不安全的。Collections 提供了多个静态方法可以把他们包装成线程同步的集合。

最好不要用下面这些方法,效率非常低,需要线程安全的集合类型时请考虑使用 JUC 包下的并发集合。

方法如下:

1
2
3
4
synchronizedCollection(Collection<T>  c) //返回指定 collection 支持的同步(线程安全的)collection。
synchronizedList(List<T> list)//返回指定列表支持的同步(线程安全的)List。
synchronizedMap(Map<K,V> m) //返回由指定映射支持的同步(线程安全的)Map。
synchronizedSet(Set<T> s) //返回指定 set 支持的同步(线程安全的)set。

数据冷热分离详解

发表于 2021-06-20 | 分类于 分布式 , 高性能 | 阅读次数:
字数统计: 1.4k 字 | 阅读时长 ≈ 4 分钟

什么是数据冷热分离?

数据冷热分离是指根据数据的访问频率和业务重要性,将数据分为冷数据和热数据,冷数据一般存储在存储在低成本、低性能的介质中,热数据高性能存储介质中。

冷数据和热数据

热数据是指经常被访问和修改且需要快速访问的数据,冷数据是指不经常访问,对当前项目价值较低,但需要长期保存的数据。

冷热数据到底如何区分呢?有两个常见的区分方法:

  1. 时间维度区分:按照数据的创建时间、更新时间、过期时间等,将一定时间段内的数据视为热数据,超过该时间段的数据视为冷数据。例如,订单系统可以将 1 年前的订单数据作为冷数据,1 年内的订单数据作为热数据。这种方法适用于数据的访问频率和时间有较强的相关性的场景。
  2. 访问频率区分:将高频访问的数据视为热数据,低频访问的数据视为冷数据。例如,内容系统可以将浏览量非常低的文章作为冷数据,浏览量较高的文章作为热数据。这种方法需要记录数据的访问频率,成本较高,适合访问频率和数据本身有较强的相关性的场景。

几年前的数据并不一定都是冷数据,例如一些优质文章发表几年后依然有很多人访问,大部分普通用户新发表的文章却基本没什么人访问。

这两种区分冷热数据的方法各有优劣,实际项目中,可以将两者结合使用。

冷热分离的思想

冷热分离的思想非常简单,就是对数据进行分类,然后分开存储。冷热分离的思想可以应用到很多领域和场景中,而不仅仅是数据存储,例如:

  • 邮件系统中,可以将近期的比较重要的邮件放在收件箱,将比较久远的不太重要的邮件存入归档。
  • 日常生活中,可以将常用的物品放在显眼的位置,不常用的物品放入储藏室或者阁楼。
  • 图书馆中,可以将最受欢迎和最常借阅的图书单独放在一个显眼的区域,将较少借阅的书籍放在不起眼的位置。
  • ……

数据冷热分离的优缺点

  • 优点:热数据的查询性能得到优化(用户的绝大部分操作体验会更好)、节约成本(可以冷热数据的不同存储需求,选择对应的数据库类型和硬件配置,比如将热数据放在 SSD 上,将冷数据放在 HDD 上)
  • 缺点:系统复杂性和风险增加(需要分离冷热数据,数据错误的风险增加)、统计效率低(统计的时候可能需要用到冷库的数据)。

冷数据如何迁移?

冷数据迁移方案:

  1. 业务层代码实现:当有对数据进行写操作时,触发冷热分离的逻辑,判断数据是冷数据还是热数据,冷数据就入冷库,热数据就入热库。这种方案会影响性能且冷热数据的判断逻辑不太好确定,还需要修改业务层代码,因此一般不会使用。
  2. 任务调度:可以利用 xxl-job 或者其他分布式任务调度平台定时去扫描数据库,找出满足冷数据条件的数据,然后批量地将其复制到冷库中,并从热库中删除。这种方法修改的代码非常少,非常适合按照时间区分冷热数据的场景。
  3. 监听数据库的变更日志 binlog :将满足冷数据条件的数据从 binlog 中提取出来,然后复制到冷库中,并从热库中删除。这种方法可以不用修改代码,但不适合按照时间维度区分冷热数据的场景。

如果你的公司有 DBA 的话,也可以让 DBA 进行冷数据的人工迁移,一次迁移完成冷数据到冷库。然后,再搭配上面介绍的方案实现后续冷数据的迁移工作。

冷数据如何存储?

冷数据的存储要求主要是容量大,成本低,可靠性高,访问速度可以适当牺牲。

冷数据存储方案:

  • 中小厂:直接使用 MySQL/PostgreSQL 即可(不改变数据库选型和项目当前使用的数据库保持一致),比如新增一张表来存储某个业务的冷数据或者使用单独的冷库来存放冷数据(涉及跨库查询,增加了系统复杂性和维护难度)
  • 大厂:Hbase(常用)、RocksDB、Doris、Cassandra

如果公司成本预算足的话,也可以直接上 TiDB 这种分布式关系型数据库,直接一步到位。TiDB 6.0 正式支持数据冷热存储分离,可以降低 SSD 使用成本。使用 TiDB 6.0 的数据放置功能,可以在同一个集群实现海量数据的冷热存储,将新的热数据存入 SSD,历史冷数据存入 HDD。

案例分享

  • 如何快速优化几千万数据量的订单表 - 程序员济癫 - 2023
  • 海量数据冷热分离方案与实践 - 字节跳动技术团队 - 2022

超时&重试详解

发表于 2021-06-09 | 分类于 分布式 , 高可用 | 阅读次数:
字数统计: 2.3k 字 | 阅读时长 ≈ 7 分钟

由于网络问题、系统或者服务内部的 Bug、服务器宕机、操作系统崩溃等问题的不确定性,我们的系统或者服务永远不可能保证时刻都是可用的状态。

为了最大限度的减小系统或者服务出现故障之后带来的影响,我们需要用到的 超时(Timeout) 和 重试(Retry) 机制。

想要把超时和重试机制讲清楚其实很简单,因为它俩本身就不是什么高深的概念。

虽然超时和重试机制的思想很简单,但是它俩是真的非常实用。你平时接触到的绝大部分涉及到远程调用的系统或者服务都会应用超时和重试机制。尤其是对于微服务系统来说,正确设置超时和重试非常重要。单体服务通常只涉及数据库、缓存、第三方 API、中间件等的网络调用,而微服务系统内部各个服务之间还存在着网络调用。

超时机制

什么是超时机制?

超时机制说的是当一个请求超过指定的时间(比如 1s)还没有被处理的话,这个请求就会直接被取消并抛出指定的异常或者错误(比如 504 Gateway Timeout)。

我们平时接触到的超时可以简单分为下面 2 种:

  • 连接超时(ConnectTimeout):客户端与服务端建立连接的最长等待时间。
  • 读取超时(ReadTimeout):客户端和服务端已经建立连接,客户端等待服务端处理完请求的最长时间。实际项目中,我们关注比较多的还是读取超时。

一些连接池客户端框架中可能还会有获取连接超时和空闲连接清理超时。

如果没有设置超时的话,就可能会导致服务端连接数爆炸和大量请求堆积的问题。

这些堆积的连接和请求会消耗系统资源,影响新收到的请求的处理。严重的情况下,甚至会拖垮整个系统或者服务。

我之前在实际项目就遇到过类似的问题,整个网站无法正常处理请求,服务器负载直接快被拉满。后面发现原因是项目超时设置错误加上客户端请求处理异常,导致服务端连接数直接接近 40w+,这么多堆积的连接直接把系统干趴了。

超时时间应该如何设置?

超时到底设置多长时间是一个难题!超时值设置太高或者太低都有风险。如果设置太高的话,会降低超时机制的有效性,比如你设置超时为 10s 的话,那设置超时就没啥意义了,系统依然可能会出现大量慢请求堆积的问题。如果设置太低的话,就可能会导致在系统或者服务在某些处理请求速度变慢的情况下(比如请求突然增多),大量请求重试(超时通常会结合重试)继续加重系统或者服务的压力,进而导致整个系统或者服务被拖垮的问题。

通常情况下,我们建议读取超时设置为 1500ms ,这是一个比较普适的值。如果你的系统或者服务对于延迟比较敏感的话,那读取超时值可以适当在 1500ms 的基础上进行缩短。反之,读取超时值也可以在 1500ms 的基础上进行加长,不过,尽量还是不要超过 1500ms 。连接超时可以适当设置长一些,建议在 1000ms ~ 5000ms 之内。

没有银弹!超时值具体该设置多大,还是要根据实际项目的需求和情况慢慢调整优化得到。

更上一层,参考美团的 Java 线程池参数动态配置思想,我们也可以将超时弄成可配置化的参数而不是固定的,比较简单的一种办法就是将超时的值放在配置中心中。这样的话,我们就可以根据系统或者服务的状态动态调整超时值了。

重试机制

什么是重试机制?

重试机制一般配合超时机制一起使用,指的是多次发送相同的请求来避免瞬态故障和偶然性故障。

瞬态故障可以简单理解为某一瞬间系统偶然出现的故障,并不会持久。偶然性故障可以理解为哪些在某些情况下偶尔出现的故障,频率通常较低。

重试的核心思想是通过消耗服务器的资源来尽可能获得请求更大概率被成功处理。由于瞬态故障和偶然性故障是很少发生的,因此,重试对于服务器的资源消耗几乎是可以被忽略的。

常见的重试策略有哪些?

常见的重试策略有两种:

  1. 固定间隔时间重试:每次重试之间都使用相同的时间间隔,比如每隔 1.5 秒进行一次重试。这种重试策略的优点是实现起来比较简单,不需要考虑重试次数和时间的关系,也不需要维护额外的状态信息。但是这种重试策略的缺点是可能会导致重试过于频繁或过于稀疏,从而影响系统的性能和效率。如果重试间隔太短,可能会对目标系统造成过大的压力,导致雪崩效应;如果重试间隔太长,可能会导致用户等待时间过长,影响用户体验。
  2. 梯度间隔重试:根据重试次数的增加去延长下次重试时间,比如第一次重试间隔为 1 秒,第二次为 2 秒,第三次为 4 秒,以此类推。这种重试策略的优点是能够有效提高重试成功的几率(随着重试次数增加,但是重试依然不成功,说明目标系统恢复时间比较长,因此可以根据重试次数延长下次重试时间),也能通过柔性化的重试避免对下游系统造成更大压力。但是这种重试策略的缺点是实现起来比较复杂,需要考虑重试次数和时间的关系,以及设置合理的上限和下限值。另外,这种重试策略也可能会导致用户等待时间过长,影响用户体验。

这两种适合的场景各不相同。固定间隔时间重试适用于目标系统恢复时间比较稳定和可预测的场景,比如网络波动或服务重启。梯度间隔重试适用于目标系统恢复时间比较长或不可预测的场景,比如网络故障和服务故障。

重试的次数如何设置?

重试的次数不宜过多,否则依然会对系统负载造成比较大的压力。

重试的次数通常建议设为 3 次。大部分情况下,我们还是更建议使用梯度间隔重试策略,比如说我们要重试 3 次的话,第 1 次请求失败后,等待 1 秒再进行重试,第 2 次请求失败后,等待 2 秒再进行重试,第 3 次请求失败后,等待 3 秒再进行重试。

什么是重试幂等?

超时和重试机制在实际项目中使用的话,需要注意保证同一个请求没有被多次执行。

什么情况下会出现一个请求被多次执行呢?客户端等待服务端完成请求完成超时但此时服务端已经执行了请求,只是由于短暂的网络波动导致响应在发送给客户端的过程中延迟了。

举个例子:用户支付购买某个课程,结果用户支付的请求由于重试的问题导致用户购买同一门课程支付了两次。对于这种情况,我们在执行用户购买课程的请求的时候需要判断一下用户是否已经购买过。这样的话,就不会因为重试的问题导致重复购买了。

Java 中如何实现重试?

如果要手动编写代码实现重试逻辑的话,可以通过循环(例如 while 或 for 循环)或者递归实现。不过,一般不建议自己动手实现,有很多第三方开源库提供了更完善的重试机制实现,例如 Spring Retry、Resilience4j、Guava Retrying。

参考

  • 微服务之间调用超时的设置治理:https://www.infoq.cn/article/eyrslar53l6hjm5yjgyx
  • 超时、重试和抖动回退:https://aws.amazon.com/cn/builders-library/timeouts-retries-and-backoff-with-jitter/

CDN工作原理详解

发表于 2021-05-21 | 分类于 分布式 , 高性能 | 阅读次数:
字数统计: 2.2k 字 | 阅读时长 ≈ 7 分钟

什么是 CDN ?

CDN 全称是 Content Delivery Network/Content Distribution Network,翻译过的意思是 内容分发网络 。

我们可以将内容分发网络拆开来看:

  • 内容:指的是静态资源比如图片、视频、文档、JS、CSS、HTML。
  • 分发网络:指的是将这些静态资源分发到位于多个不同的地理位置机房中的服务器上,这样,就可以实现静态资源的就近访问比如北京的用户直接访问北京机房的数据。

所以,简单来说,CDN 就是将静态资源分发到多个不同的地方以实现就近访问,进而加快静态资源的访问速度,减轻服务器以及带宽的负担。

类似于京东建立的庞大的仓储运输体系,京东物流在全国拥有非常多的仓库,仓储网络几乎覆盖全国所有区县。这样的话,用户下单的第一时间,商品就从距离用户最近的仓库,直接发往对应的配送站,再由京东小哥送到你家。

京东仓配系统

你可以将 CDN 看作是服务上一层的特殊缓存服务,分布在全国各地,主要用来处理静态资源的请求。

CDN 简易示意图

我们经常拿全站加速和内容分发网络做对比,不要把两者搞混了!全站加速(不同云服务商叫法不同,腾讯云叫 ECDN、阿里云叫 DCDN)既可以加速静态资源又可以加速动态资源,内容分发网络(CDN)主要针对的是 静态资源 。

阿里云文档:https://help.aliyun.com/document_detail/64836.html

绝大部分公司都会在项目开发中使用 CDN 服务,但很少会有自建 CDN 服务的公司。基于成本、稳定性和易用性考虑,建议直接选择专业的云厂商(比如阿里云、腾讯云、华为云、青云)或者 CDN 厂商(比如网宿、蓝汛)提供的开箱即用的 CDN 服务。

很多朋友可能要问了:既然是就近访问,为什么不直接将服务部署在多个不同的地方呢?

  • 成本太高,需要部署多份相同的服务。
  • 静态资源通常占用空间比较大且经常会被访问到,如果直接使用服务器或者缓存来处理静态资源请求的话,对系统资源消耗非常大,可能会影响到系统其他服务的正常运行。

同一个服务在在多个不同的地方部署多份(比如同城灾备、异地灾备、同城多活、异地多活)是为了实现系统的高可用而不是就近访问。

CDN 工作原理是什么?

搞懂下面 3 个问题也就搞懂了 CDN 的工作原理:

  1. 静态资源是如何被缓存到 CDN 节点中的?
  2. 如何找到最合适的 CDN 节点?
  3. 如何防止静态资源被盗用?

静态资源是如何被缓存到 CDN 节点中的?

你可以通过 预热 的方式将源站的资源同步到 CDN 的节点中。这样的话,用户首次请求资源可以直接从 CDN 节点中取,无需回源。这样可以降低源站压力,提升用户体验。

如果不预热的话,你访问的资源可能不在 CDN 节点中,这个时候 CDN 节点将请求源站获取资源,这个过程是大家经常说的 回源。

  • 回源:当 CDN 节点上没有用户请求的资源或该资源的缓存已经过期时,CDN 节点需要从原始服务器获取最新的资源内容,这个过程就是回源。当用户请求发生回源的话,会导致该请求的响应速度比未使用 CDN 还慢,因为相比于未使用 CDN 还多了一层 CDN 的调用流程。
  • 预热:预热是指在 CDN 上提前将内容缓存到 CDN 节点上。这样当用户在请求这些资源时,能够快速地从最近的 CDN 节点获取到而不需要回源,进而减少了对源站的访问压力,提高了访问速度。

CDN 回源

如果资源有更新的话,你也可以对其 刷新 ,删除 CDN 节点上缓存的旧资源,并强制 CDN 节点回源站获取最新资源。

几乎所有云厂商提供的 CDN 服务都具备缓存的刷新和预热功能(下图是阿里云 CDN 服务提供的相应功能):

CDN 缓存的刷新和预热

命中率 和 回源率 是衡量 CDN 服务质量两个重要指标。命中率越高越好,回源率越低越好。

如何找到最合适的 CDN 节点?

GSLB (Global Server Load Balance,全局负载均衡)是 CDN 的大脑,负责多个 CDN 节点之间相互协作,最常用的是基于 DNS 的 GSLB。

CDN 会通过 GSLB 找到最合适的 CDN 节点,更具体点来说是下面这样的:

  1. 浏览器向 DNS 服务器发送域名请求;
  2. DNS 服务器向根据 CNAME( Canonical Name ) 别名记录向 GSLB 发送请求;
  3. GSLB 返回性能最好(通常距离请求地址最近)的 CDN 节点(边缘服务器,真正缓存内容的地方)的地址给浏览器;
  4. 浏览器直接访问指定的 CDN 节点。

CDN 原理示意图

为了方便理解,上图其实做了一点简化。GSLB 内部可以看作是 CDN 专用 DNS 服务器和负载均衡系统组合。CDN 专用 DNS 服务器会返回负载均衡系统 IP 地址给浏览器,浏览器使用 IP 地址请求负载均衡系统进而找到对应的 CDN 节点。

GSLB 是如何选择出最合适的 CDN 节点呢? GSLB 会根据请求的 IP 地址、CDN 节点状态(比如负载情况、性能、响应时间、带宽)等指标来综合判断具体返回哪一个 CDN 节点的地址。

如何防止资源被盗刷?

如果我们的资源被其他用户或者网站非法盗刷的话,将会是一笔不小的开支。

解决这个问题最常用最简单的办法设置 Referer 防盗链,具体来说就是根据 HTTP 请求的头信息里面的 Referer 字段对请求进行限制。我们可以通过 Referer 字段获取到当前请求页面的来源页面的网站地址,这样我们就能确定请求是否来自合法的网站。

CDN 服务提供商几乎都提供了这种比较基础的防盗链机制。

腾讯云 CDN Referer 防盗链配置

不过,如果站点的防盗链配置允许 Referer 为空的话,通过隐藏 Referer,可以直接绕开防盗链。

通常情况下,我们会配合其他机制来确保静态资源被盗用,一种常用的机制是 时间戳防盗链 。相比之下,时间戳防盗链 的安全性更强一些。时间戳防盗链加密的 URL 具有时效性,过期之后就无法再被允许访问。

时间戳防盗链的 URL 通常会有两个参数一个是签名字符串,一个是过期时间。签名字符串一般是通过对用户设定的加密字符串、请求路径、过期时间通过 MD5 哈希算法取哈希的方式获得。

时间戳防盗链 URL 示例:

1
http://cdn.wangsu.com/4/123.mp3? wsSecret=79aead3bd7b5db4adeffb93a010298b5&wsTime=1601026312
  • wsSecret:签名字符串。
  • wsTime: 过期时间。

时间戳防盗链的实现也比较简单,并且可靠性较高,推荐使用。并且,绝大部分 CDN 服务提供商都提供了开箱即用的时间戳防盗链机制。

七牛云时间戳防盗链配置

除了 Referer 防盗链和时间戳防盗链之外,你还可以 IP 黑白名单配置、IP 访问限频配置等机制来防盗刷。

总结

  • CDN 就是将静态资源分发到多个不同的地方以实现就近访问,进而加快静态资源的访问速度,减轻服务器以及带宽的负担。
  • 基于成本、稳定性和易用性考虑,建议直接选择专业的云厂商(比如阿里云、腾讯云、华为云、青云)或者 CDN 厂商(比如网宿、蓝汛)提供的开箱即用的 CDN 服务。
  • GSLB (Global Server Load Balance,全局负载均衡)是 CDN 的大脑,负责多个 CDN 节点之间相互协作,最常用的是基于 DNS 的 GSLB。CDN 会通过 GSLB 找到最合适的 CDN 节点。
  • 为了防止静态资源被盗用,我们可以利用 Referer 防盗链 + 时间戳防盗链 。

参考

  • 时间戳防盗链 - 七牛云 CDN:https://developer.qiniu.com/fusion/kb/1670/timestamp-hotlinking-prevention
  • CDN 是个啥玩意?一文说个明白:https://mp.weixin.qq.com/s/Pp0C8ALUXsmYCUkM5QnkQw
  • 《透视 HTTP 协议》- 37 | CDN:加速我们的网络服务:http://gk.link/a/11yOG

分布式锁常见实现方案总结

发表于 2021-05-20 | 分类于 分布式 | 阅读次数:
字数统计: 4.9k 字 | 阅读时长 ≈ 18 分钟

通常情况下,我们一般会选择基于 Redis 或者 ZooKeeper 实现分布式锁,Redis 用的要更多一点,我这里也先以 Redis 为例介绍分布式锁的实现。

基于 Redis 实现分布式锁

如何基于 Redis 实现一个最简易的分布式锁?

不论是本地锁还是分布式锁,核心都在于“互斥”。

在 Redis 中, SETNX 命令是可以帮助我们实现互斥。SETNX 即 SET if Not eXists (对应 Java 中的 setIfAbsent 方法),如果 key 不存在的话,才会设置 key 的值。如果 key 已经存在, SETNX 啥也不做。

1
2
3
4
> SETNX lockKey uniqueValue
(integer) 1
> SETNX lockKey uniqueValue
(integer) 0

释放锁的话,直接通过 DEL 命令删除对应的 key 即可。

1
2
> DEL lockKey
(integer) 1

为了防止误删到其他的锁,这里我们建议使用 Lua 脚本通过 key 对应的 value(唯一值)来判断。

选用 Lua 脚本是为了保证解锁操作的原子性。因为 Redis 在执行 Lua 脚本时,可以以原子性的方式执行,从而保证了锁释放操作的原子性。

1
2
3
4
5
6
// 释放锁时,先比较锁对应的 value 值是否相等,避免锁的误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

Redis 实现简易分布式锁

这是一种最简易的 Redis 分布式锁实现,实现方式比较简单,性能也很高效。不过,这种方式实现分布式锁存在一些问题。就比如应用程序遇到一些问题比如释放锁的逻辑突然挂掉,可能会导致锁无法被释放,进而造成共享资源无法再被其他线程/进程访问。

为什么要给锁设置一个过期时间?

为了避免锁无法被释放,我们可以想到的一个解决办法就是:给这个 key(也就是锁) 设置一个过期时间 。

1
2
127.0.0.1:6379> SET lockKey uniqueValue EX 3 NX
OK
  • lockKey:加锁的锁名;
  • uniqueValue:能够唯一标识锁的随机字符串;
  • NX:只有当 lockKey 对应的 key 值不存在的时候才能 SET 成功;
  • EX:过期时间设置(秒为单位)EX 3 标示这个锁有一个 3 秒的自动过期时间。与 EX 对应的是 PX(毫秒为单位),这两个都是过期时间设置。

一定要保证设置指定 key 的值和过期时间是一个原子操作!!! 不然的话,依然可能会出现锁无法被释放的问题。

这样确实可以解决问题,不过,这种解决办法同样存在漏洞:如果操作共享资源的时间大于过期时间,就会出现锁提前过期的问题,进而导致分布式锁直接失效。如果锁的超时时间设置过长,又会影响到性能。

你或许在想:如果操作共享资源的操作还未完成,锁过期时间能够自己续期就好了!

如何实现锁的优雅续期?

对于 Java 开发的小伙伴来说,已经有了现成的解决方案:**Redisson** 。其他语言的解决方案,可以在 Redis 官方文档中找到,地址:https://redis.io/topics/distlock 。

Distributed locks with Redis

Redisson 是一个开源的 Java 语言 Redis 客户端,提供了很多开箱即用的功能,不仅仅包括多种分布式锁的实现。并且,Redisson 还支持 Redis 单机、Redis Sentinel、Redis Cluster 等多种部署架构。

Redisson 中的分布式锁自带自动续期机制,使用起来非常简单,原理也比较简单,其提供了一个专门用来监控和续期锁的 Watch Dog( 看门狗),如果操作共享资源的线程还未执行完成的话,Watch Dog 会不断地延长锁的过期时间,进而保证锁不会因为超时而被释放。

Redisson 看门狗自动续期

看门狗名字的由来于 getLockWatchdogTimeout() 方法,这个方法返回的是看门狗给锁续期的过期时间,默认为 30 秒(redisson-3.17.6)。

1
2
3
4
5
6
7
8
9
10
//默认 30秒,支持修改
private long lockWatchdogTimeout = 30 * 1000;

public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {
this.lockWatchdogTimeout = lockWatchdogTimeout;
return this;
}
public long getLockWatchdogTimeout() {
return lockWatchdogTimeout;
}

renewExpiration() 方法包含了看门狗的主要逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void renewExpiration() {
//......
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
//......
// 异步续期,基于 Lua 脚本
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
// 无法续期
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}

if (res) {
// 递归调用实现续期
renewExpiration();
} else {
// 取消续期
cancelExpirationRenewal(null);
}
});
}
// 延迟 internalLockLeaseTime/3(默认 10s,也就是 30/3) 再调用
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}

默认情况下,每过 10 秒,看门狗就会执行续期操作,将锁的超时时间设置为 30 秒。看门狗续期前也会先判断是否需要执行续期操作,需要才会执行续期,否则取消续期操作。

Watch Dog 通过调用 renewExpirationAsync() 方法实现锁的异步续期:

1
2
3
4
5
6
7
8
9
10
11
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判断是否为持锁线程,如果是就执行续期操作,就锁的过期时间设置为 30s(默认)
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}

可以看出, renewExpirationAsync 方法其实是调用 Lua 脚本实现的续期,这样做主要是为了保证续期操作的原子性。

我这里以 Redisson 的分布式可重入锁 RLock 为例来说明如何使用 Redisson 实现分布式锁:

1
2
3
4
5
6
7
8
// 1.获取指定的分布式锁对象
RLock lock = redisson.getLock("lock");
// 2.拿锁且不设置锁超时时间,具备 Watch Dog 自动续期机制
lock.lock();
// 3.执行业务
...
// 4.释放锁
lock.unlock();

只有未指定锁超时时间,才会使用到 Watch Dog 自动续期机制。

1
2
// 手动给锁设置过期时间,不具备 Watch Dog 自动续期机制
lock.lock(10, TimeUnit.SECONDS);

如果使用 Redis 来实现分布式锁的话,还是比较推荐直接基于 Redisson 来做的。

如何实现可重入锁?

所谓可重入锁指的是在一个线程中可以多次获取同一把锁,比如一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执行调用的方法即可重入 ,而无需重新获得锁。像 Java 中的 synchronized 和 ReentrantLock 都属于可重入锁。

不可重入的分布式锁基本可以满足绝大部分业务场景了,一些特殊的场景可能会需要使用可重入的分布式锁。

可重入分布式锁的实现核心思路是线程在获取锁的时候判断是否为自己的锁,如果是的话,就不用再重新获取了。为此,我们可以为每个锁关联一个可重入计数器和一个占有它的线程。当可重入计数器大于 0 时,则锁被占有,需要判断占有该锁的线程和请求获取锁的线程是否为同一个。

实际项目中,我们不需要自己手动实现,推荐使用我们上面提到的 Redisson ,其内置了多种类型的锁比如可重入锁(Reentrant Lock)、自旋锁(Spin Lock)、公平锁(Fair Lock)、多重锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)。

Redis 如何解决集群情况下分布式锁的可靠性?

为了避免单点故障,生产环境下的 Redis 服务通常是集群化部署的。

Redis 集群下,上面介绍到的分布式锁的实现会存在一些问题。由于 Redis 集群数据同步到各个节点时是异步的,如果在 Redis 主节点获取到锁后,在没有同步到其他节点时,Redis 主节点宕机了,此时新的 Redis 主节点依然可以获取锁,所以多个应用服务就可以同时获取到锁。

针对这个问题,Redis 之父 antirez 设计了 Redlock 算法 来解决。

Redlock 算法的思想是让客户端向 Redis 集群中的多个独立的 Redis 实例依次请求申请加锁,如果客户端能够和半数以上的实例成功地完成加锁操作,那么我们就认为,客户端成功地获得分布式锁,否则加锁失败。

即使部分 Redis 节点出现问题,只要保证 Redis 集群中有半数以上的 Redis 节点可用,分布式锁服务就是正常的。

Redlock 是直接操作 Redis 节点的,并不是通过 Redis 集群操作的,这样才可以避免 Redis 集群主从切换导致的锁丢失问题。

Redlock 实现比较复杂,性能比较差,发生时钟变迁的情况下还存在安全性隐患。《数据密集型应用系统设计》一书的作者 Martin Kleppmann 曾经专门发文(How to do distributed locking - Martin Kleppmann - 2016)怼过 Redlock,他认为这是一个很差的分布式锁实现。感兴趣的朋友可以看看Redis 锁从面试连环炮聊到神仙打架这篇文章,有详细介绍到 antirez 和 Martin Kleppmann 关于 Redlock 的激烈辩论。

实际项目中不建议使用 Redlock 算法,成本和收益不成正比,可以考虑基于 Redis 主从复制+哨兵模式实现分布式锁。

基于 ZooKeeper 实现分布式锁

ZooKeeper 相比于 Redis 实现分布式锁,除了提供相对更高的可靠性之外,在功能层面还有一个非常有用的特性:Watch 机制。这个机制可以用来实现公平的分布式锁。不过,使用 ZooKeeper 实现的分布式锁在性能方面相对较差,因此如果对性能要求比较高的话,ZooKeeper 可能就不太适合了。

如何基于 ZooKeeper 实现分布式锁?

ZooKeeper 分布式锁是基于 临时顺序节点 和 Watcher(事件监听器) 实现的。

获取锁:

  1. 首先我们要有一个持久节点/locks,客户端获取锁就是在locks下创建临时顺序节点。
  2. 假设客户端 1 创建了/locks/lock1节点,创建成功之后,会判断 lock1是否是 /locks 下最小的子节点。
  3. 如果 lock1是最小的子节点,则获取锁成功。否则,获取锁失败。
  4. 如果获取锁失败,则说明有其他的客户端已经成功获取锁。客户端 1 并不会不停地循环去尝试加锁,而是在前一个节点比如/locks/lock0上注册一个事件监听器。这个监听器的作用是当前一个节点释放锁之后通知客户端 1(避免无效自旋),这样客户端 1 就加锁成功了。

释放锁:

  1. 成功获取锁的客户端在执行完业务流程之后,会将对应的子节点删除。
  2. 成功获取锁的客户端在出现故障之后,对应的子节点由于是临时顺序节点,也会被自动删除,避免了锁无法被释放。
  3. 我们前面说的事件监听器其实监听的就是这个子节点删除事件,子节点删除就意味着锁被释放。

实际项目中,推荐使用 Curator 来实现 ZooKeeper 分布式锁。Curator 是 Netflix 公司开源的一套 ZooKeeper Java 客户端框架,相比于 ZooKeeper 自带的客户端 zookeeper 来说,Curator 的封装更加完善,各种 API 都可以比较方便地使用。

Curator主要实现了下面四种锁:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式不可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器,获取锁的时候获取所有锁,释放锁也会释放所有锁资源(忽略释放失败的锁)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CuratorFramework client = ZKUtils.getClient();
client.start();
// 分布式可重入排它锁
InterProcessLock lock1 = new InterProcessMutex(client, lockPath1);
// 分布式不可重入排它锁
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2);
// 将多个锁作为一个整体
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("不能获取多锁");
}
System.out.println("已获取多锁");
System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
try {
// 资源操作
resource.use();
} finally {
System.out.println("释放多个锁");
lock.release();
}
System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
client.close();

为什么要用临时顺序节点?

每个数据节点在 ZooKeeper 中被称为 znode,它是 ZooKeeper 中数据的最小单元。

我们通常是将 znode 分为 4 大类:

  • 持久(PERSISTENT)节点:一旦创建就一直存在即使 ZooKeeper 集群宕机,直到将其删除。
  • 临时(EPHEMERAL)节点:临时节点的生命周期是与 客户端会话(session) 绑定的,会话消失则节点消失 。并且,临时节点只能做叶子节点 ,不能创建子节点。
  • 持久顺序(PERSISTENT_SEQUENTIAL)节点:除了具有持久(PERSISTENT)节点的特性之外, 子节点的名称还具有顺序性。比如 /node1/app0000000001、/node1/app0000000002 。
  • 临时顺序(EPHEMERAL_SEQUENTIAL)节点:除了具备临时(EPHEMERAL)节点的特性之外,子节点的名称还具有顺序性。

可以看出,临时节点相比持久节点,最主要的是对会话失效的情况处理不一样,临时节点会话消失则对应的节点消失。这样的话,如果客户端发生异常导致没来得及释放锁也没关系,会话失效节点自动被删除,不会发生死锁的问题。

使用 Redis 实现分布式锁的时候,我们是通过过期时间来避免锁无法被释放导致死锁问题的,而 ZooKeeper 直接利用临时节点的特性即可。

假设不使用顺序节点的话,所有尝试获取锁的客户端都会对持有锁的子节点加监听器。当该锁被释放之后,势必会造成所有尝试获取锁的客户端来争夺锁,这样对性能不友好。使用顺序节点之后,只需要监听前一个节点就好了,对性能更友好。

为什么要设置对前一个节点的监听?

Watcher(事件监听器),是 ZooKeeper 中的一个很重要的特性。ZooKeeper 允许用户在指定节点上注册一些 Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

同一时间段内,可能会有很多客户端同时获取锁,但只有一个可以获取成功。如果获取锁失败,则说明有其他的客户端已经成功获取锁。获取锁失败的客户端并不会不停地循环去尝试加锁,而是在前一个节点注册一个事件监听器。

这个事件监听器的作用是:当前一个节点对应的客户端释放锁之后(也就是前一个节点被删除之后,监听的是删除事件),通知获取锁失败的客户端(唤醒等待的线程,Java 中的 wait/notifyAll ),让它尝试去获取锁,然后就成功获取锁了。

如何实现可重入锁?

这里以 Curator 的 InterProcessMutex 对可重入锁的实现来介绍(源码地址:InterProcessMutex.java)。

当我们调用 InterProcessMutex#acquire方法获取锁的时候,会调用InterProcessMutex#internalLock方法。

1
2
3
4
5
6
7
// 获取可重入互斥锁,直到获取成功为止
@Override
public void acquire() throws Exception {
if (!internalLock(-1, null)) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}

internalLock 方法会先获取当前请求锁的线程,然后从 threadData( ConcurrentMap<Thread, LockData> 类型)中获取当前线程对应的 lockData 。 lockData 包含锁的信息和加锁的次数,是实现可重入锁的关键。

第一次获取锁的时候,lockData为 null。获取锁成功之后,会将当前线程和对应的 lockData 放到 threadData 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private boolean internalLock(long time, TimeUnit unit) throws Exception {
// 获取当前请求锁的线程
Thread currentThread = Thread.currentThread();
// 拿对应的 lockData
LockData lockData = threadData.get(currentThread);
// 第一次获取锁的话,lockData 为 null
if (lockData != null) {
// 当前线程获取过一次锁之后
// 因为当前线程的锁存在, lockCount 自增后返回,实现锁重入.
lockData.lockCount.incrementAndGet();
return true;
}
// 尝试获取锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if (lockPath != null) {
LockData newLockData = new LockData(currentThread, lockPath);
// 获取锁成功之后,将当前线程和对应的 lockData 放到 threadData 中
threadData.put(currentThread, newLockData);
return true;
}

return false;
}

LockData是 InterProcessMutex中的一个静态内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

private static class LockData
{
// 当前持有锁的线程
final Thread owningThread;
// 锁对应的子节点
final String lockPath;
// 加锁的次数
final AtomicInteger lockCount = new AtomicInteger(1);

private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}

如果已经获取过一次锁,后面再来获取锁的话,直接就会在 if (lockData != null) 这里被拦下了,然后就会执行lockData.lockCount.incrementAndGet(); 将加锁次数加 1。

整个可重入锁的实现逻辑非常简单,直接在客户端判断当前线程有没有获取锁,有的话直接将加锁次数加 1 就可以了。

总结

在这篇文章中,我介绍了实现分布式锁的两种常见方式:Redis 和 ZooKeeper。至于具体选择 Redis 还是 ZooKeeper 来实现分布式锁,还是要根据业务的具体需求来决定。

  • 如果对性能要求比较高的话,建议使用 Redis 实现分布式锁。推荐优先选择 Redisson 提供的现成分布式锁,而不是自己实现。实际项目中不建议使用 Redlock 算法,成本和收益不成正比,可以考虑基于 Redis 主从复制+哨兵模式实现分布式锁。
  • 如果对可靠性要求比较高,建议使用 ZooKeeper 实现分布式锁,推荐基于 Curator 框架来实现。不过,现在很多项目都不会用到 ZooKeeper,如果单纯是因为分布式锁而引入 ZooKeeper 的话,那是不太可取的,不建议这样做,为了一个小小的功能增加了系统的复杂度。

需要注意的是,无论选择哪种方式实现分布式锁,包括 Redis、ZooKeeper 或 Etcd(本文没介绍,但也经常用来实现分布式锁),都无法保证 100% 的安全性,特别是在遇到进程垃圾回收(GC)、网络延迟等异常情况下。

为了进一步提高系统的可靠性,建议引入一个兜底机制。例如,可以通过 版本号(Fencing Token)机制 来避免并发冲突。

最后,再分享几篇我觉得写的还不错的文章:

  • 分布式锁实现原理与最佳实践 - 阿里云开发者
  • 聊聊分布式锁 - 字节跳动技术团队
  • Redis、ZooKeeper、Etcd,谁有最好用的分布式锁? - 腾讯云开发者

读写分离和分库分表详解

发表于 2021-05-19 | 分类于 分布式 , 高性能 | 阅读次数:
字数统计: 6.8k 字 | 阅读时长 ≈ 23 分钟

读写分离

什么是读写分离?

见名思意,根据读写分离的名字,我们就可以知道:读写分离主要是为了将对数据库的读写操作分散到不同的数据库节点上。 这样的话,就能够小幅提升写性能,大幅提升读性能。

我简单画了一张图来帮助不太清楚读写分离的小伙伴理解。

读写分离示意图

一般情况下,我们都会选择一主多从,也就是一台主数据库负责写,其他的从数据库负责读。主库和从库之间会进行数据同步,以保证从库中数据的准确性。这样的架构实现起来比较简单,并且也符合系统的写少读多的特点。

如何实现读写分离?

不论是使用哪一种读写分离具体的实现方案,想要实现读写分离一般包含如下几步:

  1. 部署多台数据库,选择其中的一台作为主数据库,其他的一台或者多台作为从数据库。
  2. 保证主数据库和从数据库之间的数据是实时同步的,这个过程也就是我们常说的主从复制。
  3. 系统将写请求交给主数据库处理,读请求交给从数据库处理。

落实到项目本身的话,常用的方式有两种:

1. 代理方式

代理方式实现读写分离

我们可以在应用和数据中间加了一个代理层。应用程序所有的数据请求都交给代理层处理,代理层负责分离读写请求,将它们路由到对应的数据库中。

提供类似功能的中间件有 MySQL Router(官方, MySQL Proxy 的替代方案)、Atlas(基于 MySQL Proxy)、MaxScale、MyCat。

关于 MySQL Router 多提一点:在 MySQL 8.2 的版本中,MySQL Router 能自动分辨对数据库读写/操作并把这些操作路由到正确的实例上。这是一项有价值的功能,可以优化数据库性能和可扩展性,而无需在应用程序中进行任何更改。具体介绍可以参考官方博客:MySQL 8.2 – transparent read/write splitting。

2. 组件方式

在这种方式中,我们可以通过引入第三方组件来帮助我们读写请求。

这也是我比较推荐的一种方式。这种方式目前在各种互联网公司中用的最多的,相关的实际的案例也非常多。如果你要采用这种方式的话,推荐使用 sharding-jdbc ,直接引入 jar 包即可使用,非常方便。同时,也节省了很多运维的成本。

你可以在 shardingsphere 官方找到 sharding-jdbc 关于读写分离的操作。

主从复制原理是什么?

MySQL binlog(binary log 即二进制日志文件) 主要记录了 MySQL 数据库中数据的所有变化(数据库执行的所有 DDL 和 DML 语句)。因此,我们根据主库的 MySQL binlog 日志就能够将主库的数据同步到从库中。

更具体和详细的过程是这个样子的(图片来自于:《MySQL Master-Slave Replication on the Same Machine》):

MySQL主从复制

  1. 主库将数据库中数据的变化写入到 binlog
  2. 从库连接主库
  3. 从库会创建一个 I/O 线程向主库请求更新的 binlog
  4. 主库会创建一个 binlog dump 线程来发送 binlog ,从库中的 I/O 线程负责接收
  5. 从库的 I/O 线程将接收的 binlog 写入到 relay log 中。
  6. 从库的 SQL 线程读取 relay log 同步数据到本地(也就是再执行一遍 SQL )。

怎么样?看了我对主从复制这个过程的讲解,你应该搞明白了吧!

你一般看到 binlog 就要想到主从复制。当然,除了主从复制之外,binlog 还能帮助我们实现数据恢复。

🌈 拓展一下:

不知道大家有没有使用过阿里开源的一个叫做 canal 的工具。这个工具可以帮助我们实现 MySQL 和其他数据源比如 Elasticsearch 或者另外一台 MySQL 数据库之间的数据同步。很显然,这个工具的底层原理肯定也是依赖 binlog。canal 的原理就是模拟 MySQL 主从复制的过程,解析 binlog 将数据同步到其他的数据源。

另外,像咱们常用的分布式缓存组件 Redis 也是通过主从复制实现的读写分离。

🌕 简单总结一下:

MySQL 主从复制是依赖于 binlog 。另外,常见的一些同步 MySQL 数据到其他数据源的工具(比如 canal)的底层一般也是依赖 binlog 。

如何避免主从延迟?

读写分离对于提升数据库的并发非常有效,但是,同时也会引来一个问题:主库和从库的数据存在延迟,比如你写完主库之后,主库的数据同步到从库是需要时间的,这个时间差就导致了主库和从库的数据不一致性问题。这也就是我们经常说的 主从同步延迟 。

如果我们的业务场景无法容忍主从同步延迟的话,应该如何避免呢(注意:我这里说的是避免而不是减少延迟)?

这里提供两种我知道的方案(能力有限,欢迎补充),你可以根据自己的业务场景参考一下。

强制将读请求路由到主库处理

既然你从库的数据过期了,那我就直接从主库读取嘛!这种方案虽然会增加主库的压力,但是,实现起来比较简单,也是我了解到的使用最多的一种方式。

比如 Sharding-JDBC 就是采用的这种方案。通过使用 Sharding-JDBC 的 HintManager 分片键值管理器,我们可以强制使用主库。

1
2
3
HintManager hintManager = HintManager.getInstance();
hintManager.setMasterRouteOnly();
// 继续JDBC操作

对于这种方案,你可以将那些必须获取最新数据的读请求都交给主库处理。

延迟读取

还有一些朋友肯定会想既然主从同步存在延迟,那我就在延迟之后读取啊,比如主从同步延迟 0.5s,那我就 1s 之后再读取数据。这样多方便啊!方便是方便,但是也很扯淡。

不过,如果你是这样设计业务流程就会好很多:对于一些对数据比较敏感的场景,你可以在完成写请求之后,避免立即进行请求操作。比如你支付成功之后,跳转到一个支付成功的页面,当你点击返回之后才返回自己的账户。

总结

关于如何避免主从延迟,我们这里介绍了两种方案。实际上,延迟读取这种方案没办法完全避免主从延迟,只能说可以减少出现延迟的概率而已,实际项目中一般不会使用。

总的来说,要想不出现延迟问题,一般还是要强制将那些必须获取最新数据的读请求都交给主库处理。如果你的项目的大部分业务场景对数据准确性要求不是那么高的话,这种方案还是可以选择的。

什么情况下会出现主从延迟?如何尽量减少延迟?

我们在上面的内容中也提到了主从延迟以及避免主从延迟的方法,这里我们再来详细分析一下主从延迟出现的原因以及应该如何尽量减少主从延迟。

要搞懂什么情况下会出现主从延迟,我们需要先搞懂什么是主从延迟。

MySQL 主从同步延时是指从库的数据落后于主库的数据,这种情况可能由以下两个原因造成:

  1. 从库 I/O 线程接收 binlog 的速度跟不上主库写入 binlog 的速度,导致从库 relay log 的数据滞后于主库 binlog 的数据;
  2. 从库 SQL 线程执行 relay log 的速度跟不上从库 I/O 线程接收 binlog 的速度,导致从库的数据滞后于从库 relay log 的数据。

与主从同步有关的时间点主要有 3 个:

  1. 主库执行完一个事务,写入 binlog,将这个时刻记为 T1;
  2. 从库 I/O 线程接收到 binlog 并写入 relay log 的时刻记为 T2;
  3. 从库 SQL 线程读取 relay log 同步数据本地的时刻记为 T3。

结合我们上面讲到的主从复制原理,可以得出:

  • T2 和 T1 的差值反映了从库 I/O 线程的性能和网络传输的效率,这个差值越小说明从库 I/O 线程的性能和网络传输效率越高。
  • T3 和 T2 的差值反映了从库 SQL 线程执行的速度,这个差值越小,说明从库 SQL 线程执行速度越快。

那什么情况下会出现出从延迟呢?这里列举几种常见的情况:

  1. 从库机器性能比主库差:从库接收 binlog 并写入 relay log 以及执行 SQL 语句的速度会比较慢(也就是 T2-T1 和 T3-T2 的值会较大),进而导致延迟。解决方法是选择与主库一样规格或更高规格的机器作为从库,或者对从库进行性能优化,比如调整参数、增加缓存、使用 SSD 等。
  2. 从库处理的读请求过多:从库需要执行主库的所有写操作,同时还要响应读请求,如果读请求过多,会占用从库的 CPU、内存、网络等资源,影响从库的复制效率(也就是 T2-T1 和 T3-T2 的值会较大,和前一种情况类似)。解决方法是引入缓存(推荐)、使用一主多从的架构,将读请求分散到不同的从库,或者使用其他系统来提供查询的能力,比如将 binlog 接入到 Hadoop、Elasticsearch 等系统中。
  3. 大事务:运行时间比较长,长时间未提交的事务就可以称为大事务。由于大事务执行时间长,并且从库上的大事务会比主库上的大事务花费更多的时间和资源,因此非常容易造成主从延迟。解决办法是避免大批量修改数据,尽量分批进行。类似的情况还有执行时间较长的慢 SQL ,实际项目遇到慢 SQL 应该进行优化。
  4. 从库太多:主库需要将 binlog 同步到所有的从库,如果从库数量太多,会增加同步的时间和开销(也就是 T2-T1 的值会比较大,但这里是因为主库同步压力大导致的)。解决方案是减少从库的数量,或者将从库分为不同的层级,让上层的从库再同步给下层的从库,减少主库的压力。
  5. 网络延迟:如果主从之间的网络传输速度慢,或者出现丢包、抖动等问题,那么就会影响 binlog 的传输效率,导致从库延迟。解决方法是优化网络环境,比如提升带宽、降低延迟、增加稳定性等。
  6. 单线程复制:MySQL5.5 及之前,只支持单线程复制。为了优化复制性能,MySQL 5.6 引入了 多线程复制,MySQL 5.7 还进一步完善了多线程复制。
  7. 复制模式:MySQL 默认的复制是异步的,必然会存在延迟问题。全同步复制不存在延迟问题,但性能太差了。半同步复制是一种折中方案,相对于异步复制,半同步复制提高了数据的安全性,减少了主从延迟(还是有一定程度的延迟)。MySQL 5.5 开始,MySQL 以插件的形式支持 semi-sync 半同步复制。并且,MySQL 5.7 引入了 增强半同步复制 。
  8. ……

《MySQL 实战 45 讲》这个专栏中的读写分离有哪些坑?这篇文章也有对主从延迟解决方案这一话题进行探讨,感兴趣的可以阅读学习一下。

分库分表

读写分离主要应对的是数据库读并发,没有解决数据库存储问题。试想一下:如果 MySQL 一张表的数据量过大怎么办?

换言之,我们该如何解决 MySQL 的存储压力呢?

答案之一就是 分库分表。

什么是分库?

分库 就是将数据库中的数据分散到不同的数据库上,可以垂直分库,也可以水平分库。

垂直分库 就是把单一数据库按照业务进行划分,不同的业务使用不同的数据库,进而将一个数据库的压力分担到多个数据库。

举个例子:说你将数据库中的用户表、订单表和商品表分别单独拆分为用户数据库、订单数据库和商品数据库。

垂直分库

水平分库 是把同一个表按一定规则拆分到不同的数据库中,每个库可以位于不同的服务器上,这样就实现了水平扩展,解决了单表的存储和性能瓶颈的问题。

举个例子:订单表数据量太大,你对订单表进行了水平切分(水平分表),然后将切分后的 2 张订单表分别放在两个不同的数据库。

水平分库

什么是分表?

分表 就是对单表的数据进行拆分,可以是垂直拆分,也可以是水平拆分。

垂直分表 是对数据表列的拆分,把一张列比较多的表拆分为多张表。

举个例子:我们可以将用户信息表中的一些列单独抽出来作为一个表。

水平分表 是对数据表行的拆分,把一张行比较多的表拆分为多张表,可以解决单一表数据量过大的问题。

举个例子:我们可以将用户信息表拆分成多个用户信息表,这样就可以避免单一表数据量过大对性能造成影响。

水平拆分只能解决单表数据量大的问题,为了提升性能,我们通常会选择将拆分后的多张表放在不同的数据库中。也就是说,水平分表通常和水平分库同时出现。

分表

什么情况下需要分库分表?

遇到下面几种场景可以考虑分库分表:

  • 单表的数据达到千万级别以上,数据库读写速度比较缓慢。
  • 数据库中的数据占用的空间越来越大,备份时间越来越长。
  • 应用的并发量太大(应该优先考虑其他性能优化方法,而非分库分表)。

不过,分库分表的成本太高,如非必要尽量不要采用。而且,并不一定是单表千万级数据量就要分表,毕竟每张表包含的字段不同,它们在不错的性能下能够存放的数据量也不同,还是要具体情况具体分析。

之前看过一篇文章分析 “InnoDB 中高度为 3 的 B+ 树最多可以存多少数据”,写的挺不错,感兴趣的可以看看。

常见的分片算法有哪些?

分片算法主要解决了数据被水平分片之后,数据究竟该存放在哪个表的问题。

常见的分片算法有:

  • 哈希分片:求指定分片键的哈希,然后根据哈希值确定数据应被放置在哪个表中。哈希分片比较适合随机读写的场景,不太适合经常需要范围查询的场景。哈希分片可以使每个表的数据分布相对均匀,但对动态伸缩(例如新增一个表或者库)不友好。
  • 范围分片:按照特定的范围区间(比如时间区间、ID 区间)来分配数据,比如 将 id 为 1~299999 的记录分到第一个表, 300000~599999 的分到第二个表。范围分片适合需要经常进行范围查找且数据分布均匀的场景,不太适合随机读写的场景(数据未被分散,容易出现热点数据的问题)。
  • 映射表分片:使用一个单独的表(称为映射表)来存储分片键和分片位置的对应关系。映射表分片策略可以支持任何类型的分片算法,如哈希分片、范围分片等。映射表分片策略是可以灵活地调整分片规则,不需要修改应用程序代码或重新分布数据。不过,这种方式需要维护额外的表,还增加了查询的开销和复杂度。
  • 一致性哈希分片:将哈希空间组织成一个环形结构,将分片键和节点(数据库或表)都映射到这个环上,然后根据顺时针的规则确定数据或请求应该分配到哪个节点上,解决了传统哈希对动态伸缩不友好的问题。
  • 地理位置分片:很多 NewSQL 数据库都支持地理位置分片算法,也就是根据地理位置(如城市、地域)来分配数据。
  • 融合算法分片:灵活组合多种分片算法,比如将哈希分片和范围分片组合。
  • ……

分片键如何选择?

分片键(Sharding Key)是数据分片的关键字段。分片键的选择非常重要,它关系着数据的分布和查询效率。一般来说,分片键应该具备以下特点:

  • 具有共性,即能够覆盖绝大多数的查询场景,尽量减少单次查询所涉及的分片数量,降低数据库压力;
  • 具有离散性,即能够将数据均匀地分散到各个分片上,避免数据倾斜和热点问题;
  • 具有稳定性,即分片键的值不会发生变化,避免数据迁移和一致性问题;
  • 具有扩展性,即能够支持分片的动态增加和减少,避免数据重新分片的开销。

实际项目中,分片键很难满足上面提到的所有特点,需要权衡一下。并且,分片键可以是表中多个字段的组合,例如取用户 ID 后四位作为订单 ID 后缀。

分库分表会带来什么问题呢?

记住,你在公司做的任何技术决策,不光是要考虑这个技术能不能满足我们的要求,是否适合当前业务场景,还要重点考虑其带来的成本。

引入分库分表之后,会给系统带来什么挑战呢?

  • join 操作:同一个数据库中的表分布在了不同的数据库中,导致无法使用 join 操作。这样就导致我们需要手动进行数据的封装,比如你在一个数据库中查询到一个数据之后,再根据这个数据去另外一个数据库中找对应的数据。不过,很多大厂的资深 DBA 都是建议尽量不要使用 join 操作。因为 join 的效率低,并且会对分库分表造成影响。对于需要用到 join 操作的地方,可以采用多次查询业务层进行数据组装的方法。不过,这种方法需要考虑业务上多次查询的事务性的容忍度。
  • 事务问题:同一个数据库中的表分布在了不同的数据库中,如果单个操作涉及到多个数据库,那么数据库自带的事务就无法满足我们的要求了。这个时候,我们就需要引入分布式事务了。关于分布式事务常见解决方案总结,网站上也有对应的总结:https://javaguide.cn/distributed-system/distributed-transaction.html 。
  • 分布式 ID:分库之后, 数据遍布在不同服务器上的数据库,数据库的自增主键已经没办法满足生成的主键唯一了。我们如何为不同的数据节点生成全局唯一主键呢?这个时候,我们就需要为我们的系统引入分布式 ID 了。关于分布式 ID 的详细介绍&实现方案总结,可以看我写的这篇文章:分布式 ID 介绍&实现方案总结。
  • 跨库聚合查询问题:分库分表会导致常规聚合查询操作,如 group by,order by 等变得异常复杂。这是因为这些操作需要在多个分片上进行数据汇总和排序,而不是在单个数据库上进行。为了实现这些操作,需要编写复杂的业务代码,或者使用中间件来协调分片间的通信和数据传输。这样会增加开发和维护的成本,以及影响查询的性能和可扩展性。
  • ……

另外,引入分库分表之后,一般需要 DBA 的参与,同时还需要更多的数据库服务器,这些都属于成本。

分库分表有没有什么比较推荐的方案?

Apache ShardingSphere 是一款分布式的数据库生态系统, 可以将任意数据库转换为分布式数据库,并通过数据分片、弹性伸缩、加密等能力对原有数据库进行增强。

ShardingSphere 项目(包括 Sharding-JDBC、Sharding-Proxy 和 Sharding-Sidecar)是当当捐入 Apache 的,目前主要由京东数科的一些巨佬维护。

ShardingSphere 绝对可以说是当前分库分表的首选!ShardingSphere 的功能完善,除了支持读写分离和分库分表,还提供分布式事务、数据库治理、影子库、数据加密和脱敏等功能。

ShardingSphere 提供的功能如下:

ShardingSphere 提供的功能

ShardingSphere 的优势如下(摘自 ShardingSphere 官方文档:https://shardingsphere.apache.org/document/current/cn/overview/):

  • 极致性能:驱动程序端历经长年打磨,效率接近原生 JDBC,性能极致。
  • 生态兼容:代理端支持任何通过 MySQL/PostgreSQL 协议的应用访问,驱动程序端可对接任意实现 JDBC 规范的数据库。
  • 业务零侵入:面对数据库替换场景,ShardingSphere 可满足业务无需改造,实现平滑业务迁移。
  • 运维低成本:在保留原技术栈不变前提下,对 DBA 学习、管理成本低,交互友好。
  • 安全稳定:基于成熟数据库底座之上提供增量能力,兼顾安全性及稳定性。
  • 弹性扩展:具备计算、存储平滑在线扩展能力,可满足业务多变的需求。
  • 开放生态:通过多层次(内核、功能、生态)插件化能力,为用户提供可定制满足自身特殊需求的独有系统。

另外,ShardingSphere 的生态体系完善,社区活跃,文档完善,更新和发布比较频繁。

不过,还是要多提一句:现在很多公司都是用的类似于 TiDB 这种分布式关系型数据库,不需要我们手动进行分库分表(数据库层面已经帮我们做了),也不需要解决手动分库分表引入的各种问题,直接一步到位,内置很多实用的功能(如无感扩容和缩容、冷热存储分离)!如果公司条件允许的话,个人也是比较推荐这种方式!

分库分表后,数据怎么迁移呢?

分库分表之后,我们如何将老库(单库单表)的数据迁移到新库(分库分表后的数据库系统)呢?

比较简单同时也是非常常用的方案就是停机迁移,写个脚本老库的数据写到新库中。比如你在凌晨 2 点,系统使用的人数非常少的时候,挂一个公告说系统要维护升级预计 1 小时。然后,你写一个脚本将老库的数据都同步到新库中。

如果你不想停机迁移数据的话,也可以考虑双写方案。双写方案是针对那种不能停机迁移的场景,实现起来要稍微麻烦一些。具体原理是这样的:

  • 我们对老库的更新操作(增删改),同时也要写入新库(双写)。如果操作的数据不存在于新库的话,需要插入到新库中。 这样就能保证,咱们新库里的数据是最新的。
  • 在迁移过程,双写只会让被更新操作过的老库中的数据同步到新库,我们还需要自己写脚本将老库中的数据和新库的数据做比对。如果新库中没有,那咱们就把数据插入到新库。如果新库有,旧库没有,就把新库对应的数据删除(冗余数据清理)。
  • 重复上一步的操作,直到老库和新库的数据一致为止。

想要在项目中实施双写还是比较麻烦的,很容易会出现问题。我们可以借助上面提到的数据库同步工具 Canal 做增量数据迁移(还是依赖 binlog,开发和维护成本较低)。

总结

  • 读写分离主要是为了将对数据库的读写操作分散到不同的数据库节点上。 这样的话,就能够小幅提升写性能,大幅提升读性能。
  • 读写分离基于主从复制,MySQL 主从复制是依赖于 binlog 。
  • 分库 就是将数据库中的数据分散到不同的数据库上。分表 就是对单表的数据进行拆分,可以是垂直拆分,也可以是水平拆分。
  • 引入分库分表之后,需要系统解决事务、分布式 id、无法 join 操作问题。
  • 现在很多公司都是用的类似于 TiDB 这种分布式关系型数据库,不需要我们手动进行分库分表(数据库层面已经帮我们做了),也不需要解决手动分库分表引入的各种问题,直接一步到位,内置很多实用的功能(如无感扩容和缩容、冷热存储分离)!如果公司条件允许的话,个人也是比较推荐这种方式!
  • 如果必须要手动分库分表的话,ShardingSphere 是首选!ShardingSphere 的功能完善,除了支持读写分离和分库分表,还提供分布式事务、数据库治理等功能。另外,ShardingSphere 的生态体系完善,社区活跃,文档完善,更新和发布比较频繁。
<i class="fa fa-angle-left"></i>1…141516…27<i class="fa fa-angle-right"></i>

264 日志
34 分类
38 标签
GitHub Zhihu Wechat
© 2024 史海杰 | Site words total count: 722k
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4