Jay's Blog

知而不行为不知


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 留言

  • 搜索

十大经典排序算法总结

发表于 2024-08-20 | 分类于 算法 | 阅读次数:
字数统计: 6.7k 字 | 阅读时长 ≈ 27 分钟

本文转自:http://www.guoyaohua.com/sorting.html,对其做了补充完善。

引言

所谓排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作。排序算法,就是如何使得记录按照要求排列的方法。排序算法在很多领域得到相当地重视,尤其是在大量数据的处理方面。一个优秀的算法可以节省大量的资源。在各个领域中考虑到数据的各种限制和规范,要得到一个符合实际的优秀算法,得经过大量的推理和分析。

简介

排序算法总结

常见的内部排序算法有:插入排序、希尔排序、选择排序、冒泡排序、归并排序、快速排序、堆排序、基数排序等,本文只讲解内部排序算法。用一张表格概括:

排序算法 时间复杂度(平均) 时间复杂度(最差) 时间复杂度(最好) 空间复杂度 排序方式 稳定性
冒泡排序 O(n^2) O(n^2) O(n) O(1) 内部排序 稳定
选择排序 O(n^2) O(n^2) O(n^2) O(1) 内部排序 不稳定
插入排序 O(n^2) O(n^2) O(n) O(1) 内部排序 稳定
希尔排序 O(nlogn) O(n^2) O(nlogn) O(1) 内部排序 不稳定
归并排序 O(nlogn) O(nlogn) O(nlogn) O(n) 外部排序 稳定
快速排序 O(nlogn) O(n^2) O(nlogn) O(logn) 内部排序 不稳定
堆排序 O(nlogn) O(nlogn) O(nlogn) O(1) 内部排序 不稳定
计数排序 O(n+k) O(n+k) O(n+k) O(k) 外部排序 稳定
桶排序 O(n+k) O(n^2) O(n+k) O(n+k) 外部排序 稳定
基数排序 O(n×k) O(n×k) O(n×k) O(n+k) 外部排序 稳定

术语解释:

  • n:数据规模,表示待排序的数据量大小。
  • k:“桶” 的个数,在某些特定的排序算法中(如基数排序、桶排序等),表示分割成的独立的排序区间或类别的数量。
  • 内部排序:所有排序操作都在内存中完成,不需要额外的磁盘或其他存储设备的辅助。这适用于数据量小到足以完全加载到内存中的情况。
  • 外部排序:当数据量过大,不可能全部加载到内存中时使用。外部排序通常涉及到数据的分区处理,部分数据被暂时存储在外部磁盘等存储设备上。
  • 稳定:如果 A 原本在 B 前面,而 $A=B$,排序之后 A 仍然在 B 的前面。
  • 不稳定:如果 A 原本在 B 的前面,而 $A=B$,排序之后 A 可能会出现在 B 的后面。
  • 时间复杂度:定性描述一个算法执行所耗费的时间。
  • 空间复杂度:定性描述一个算法执行所需内存的大小。

排序算法分类

十种常见排序算法可以分类两大类别:比较类排序和非比较类排序。

排序算法分类

常见的快速排序、归并排序、堆排序以及冒泡排序等都属于比较类排序算法。比较类排序是通过比较来决定元素间的相对次序,由于其时间复杂度不能突破 O(nlogn),因此也称为非线性时间比较类排序。在冒泡排序之类的排序中,问题规模为 n,又因为需要比较 n 次,所以平均时间复杂度为 O(n²)。在归并排序、快速排序之类的排序中,问题规模通过分治法消减为 logn 次,所以时间复杂度平均 O(nlogn)。

比较类排序的优势是,适用于各种规模的数据,也不在乎数据的分布,都能进行排序。可以说,比较排序适用于一切需要排序的情况。

而计数排序、基数排序、桶排序则属于非比较类排序算法。非比较排序不通过比较来决定元素间的相对次序,而是通过确定每个元素之前,应该有多少个元素来排序。由于它可以突破基于比较排序的时间下界,以线性时间运行,因此称为线性时间非比较类排序。 非比较排序只要确定每个元素之前的已有的元素个数即可,所有一次遍历即可解决。算法时间复杂度 $O(n)$。

非比较排序时间复杂度底,但由于非比较排序需要占用空间来确定唯一位置。所以对数据规模和数据分布有一定的要求。

冒泡排序 (Bubble Sort)

冒泡排序是一种简单的排序算法。它重复地遍历要排序的序列,依次比较两个元素,如果它们的顺序错误就把它们交换过来。遍历序列的工作是重复地进行直到没有再需要交换为止,此时说明该序列已经排序完成。这个算法的名字由来是因为越小的元素会经由交换慢慢 “浮” 到数列的顶端。

算法步骤

  1. 比较相邻的元素。如果第一个比第二个大,就交换它们两个;
  2. 对每一对相邻元素作同样的工作,从开始第一对到结尾的最后一对,这样在最后的元素应该会是最大的数;
  3. 针对所有的元素重复以上的步骤,除了最后一个;
  4. 重复步骤 1~3,直到排序完成。

图解算法

冒泡排序

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 冒泡排序
* @param arr
* @return arr
*/
public static int[] bubbleSort(int[] arr) {
for (int i = 1; i < arr.length; i++) {
// Set a flag, if true, that means the loop has not been swapped,
// that is, the sequence has been ordered, the sorting has been completed.
boolean flag = true;
for (int j = 0; j < arr.length - i; j++) {
if (arr[j] > arr[j + 1]) {
int tmp = arr[j];
arr[j] = arr[j + 1];
arr[j + 1] = tmp;
// Change flag
flag = false;
}
}
if (flag) {
break;
}
}
return arr;
}

此处对代码做了一个小优化,加入了 is_sorted Flag,目的是将算法的最佳时间复杂度优化为 O(n),即当原输入序列就是排序好的情况下,该算法的时间复杂度就是 O(n)。

算法分析

  • 稳定性:稳定
  • 时间复杂度:最佳:$O(n)$ ,最差:$O(n^2)$, 平均:$O(n^2)$
  • 空间复杂度:$O(1)$
  • 排序方式:In-place

选择排序 (Selection Sort)

选择排序是一种简单直观的排序算法,无论什么数据进去都是 $O(n^2)$ 的时间复杂度。所以用到它的时候,数据规模越小越好。唯一的好处可能就是不占用额外的内存空间了吧。它的工作原理:首先在未排序序列中找到最小(大)元素,存放到排序序列的起始位置,然后,再从剩余未排序元素中继续寻找最小(大)元素,然后放到已排序序列的末尾。以此类推,直到所有元素均排序完毕。

算法步骤

  1. 首先在未排序序列中找到最小(大)元素,存放到排序序列的起始位置
  2. 再从剩余未排序元素中继续寻找最小(大)元素,然后放到已排序序列的末尾。
  3. 重复第 2 步,直到所有元素均排序完毕。

图解算法

Selection Sort

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 选择排序
* @param arr
* @return arr
*/
public static int[] selectionSort(int[] arr) {
for (int i = 0; i < arr.length - 1; i++) {
int minIndex = i;
for (int j = i + 1; j < arr.length; j++) {
if (arr[j] < arr[minIndex]) {
minIndex = j;
}
}
if (minIndex != i) {
int tmp = arr[i];
arr[i] = arr[minIndex];
arr[minIndex] = tmp;
}
}
return arr;
}

算法分析

  • 稳定性:不稳定
  • 时间复杂度:最佳:$O(n^2)$ ,最差:$O(n^2)$, 平均:$O(n^2)$
  • 空间复杂度:$O(1)$
  • 排序方式:In-place

插入排序 (Insertion Sort)

插入排序是一种简单直观的排序算法。它的工作原理是通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入。插入排序在实现上,通常采用 in-place 排序(即只需用到 $O(1)$ 的额外空间的排序),因而在从后向前扫描过程中,需要反复把已排序元素逐步向后挪位,为最新元素提供插入空间。

插入排序的代码实现虽然没有冒泡排序和选择排序那么简单粗暴,但它的原理应该是最容易理解的了,因为只要打过扑克牌的人都应该能够秒懂。插入排序是一种最简单直观的排序算法,它的工作原理是通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫描,找到相应位置并插入。

插入排序和冒泡排序一样,也有一种优化算法,叫做拆半插入。

算法步骤

  1. 从第一个元素开始,该元素可以认为已经被排序;
  2. 取出下一个元素,在已经排序的元素序列中从后向前扫描;
  3. 如果该元素(已排序)大于新元素,将该元素移到下一位置;
  4. 重复步骤 3,直到找到已排序的元素小于或者等于新元素的位置;
  5. 将新元素插入到该位置后;
  6. 重复步骤 2~5。

图解算法

insertion_sort

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 插入排序
* @param arr
* @return arr
*/
public static int[] insertionSort(int[] arr) {
for (int i = 1; i < arr.length; i++) {
int preIndex = i - 1;
int current = arr[i];
while (preIndex >= 0 && current < arr[preIndex]) {
arr[preIndex + 1] = arr[preIndex];
preIndex -= 1;
}
arr[preIndex + 1] = current;
}
return arr;
}

算法分析

  • 稳定性:稳定
  • 时间复杂度:最佳:$O(n)$ ,最差:$O(n^2)$, 平均:$O(n2)$
  • 空间复杂度:O(1)$
  • 排序方式:In-place

希尔排序 (Shell Sort)

希尔排序是希尔 (Donald Shell) 于 1959 年提出的一种排序算法。希尔排序也是一种插入排序,它是简单插入排序经过改进之后的一个更高效的版本,也称为递减增量排序算法,同时该算法是冲破 $O(n^2)$ 的第一批算法之一。

希尔排序的基本思想是:先将整个待排序的记录序列分割成为若干子序列分别进行直接插入排序,待整个序列中的记录 “基本有序” 时,再对全体记录进行依次直接插入排序。

算法步骤

我们来看下希尔排序的基本步骤,在此我们选择增量 $gap=length/2$,缩小增量继续以 $gap = gap/2$ 的方式,这种增量选择我们可以用一个序列来表示,$\lbrace \frac{n}{2}, \frac{(n/2)}{2}, \dots, 1 \rbrace$,称为增量序列。希尔排序的增量序列的选择与证明是个数学难题,我们选择的这个增量序列是比较常用的,也是希尔建议的增量,称为希尔增量,但其实这个增量序列不是最优的。此处我们做示例使用希尔增量。

先将整个待排序的记录序列分割成为若干子序列分别进行直接插入排序,具体算法描述:

  • 选择一个增量序列 $\lbrace t_1, t_2, \dots, t_k \rbrace$,其中 $t_i \gt t_j, i \lt j, t_k = 1$;
  • 按增量序列个数 k,对序列进行 k 趟排序;
  • 每趟排序,根据对应的增量 $t$,将待排序列分割成若干长度为 $m$ 的子序列,分别对各子表进行直接插入排序。仅增量因子为 1 时,整个序列作为一个表来处理,表长度即为整个序列的长度。

图解算法

shell_sort

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 希尔排序
*
* @param arr
* @return arr
*/
public static int[] shellSort(int[] arr) {
int n = arr.length;
int gap = n / 2;
while (gap > 0) {
for (int i = gap; i < n; i++) {
int current = arr[i];
int preIndex = i - gap;
// Insertion sort
while (preIndex >= 0 && arr[preIndex] > current) {
arr[preIndex + gap] = arr[preIndex];
preIndex -= gap;
}
arr[preIndex + gap] = current;

}
gap /= 2;
}
return arr;
}

算法分析

  • 稳定性:不稳定
  • 时间复杂度:最佳:$O(nlogn)$, 最差:$O(n^2)$ 平均:$O(nlogn)$
  • 空间复杂度:$O(1)$

归并排序 (Merge Sort)

归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法 (Divide and Conquer) 的一个非常典型的应用。归并排序是一种稳定的排序方法。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。若将两个有序表合并成一个有序表,称为 2 - 路归并。

和选择排序一样,归并排序的性能不受输入数据的影响,但表现比选择排序好的多,因为始终都是 $O(nlogn)$ 的时间复杂度。代价是需要额外的内存空间。

算法步骤

归并排序算法是一个递归过程,边界条件为当输入序列仅有一个元素时,直接返回,具体过程如下:

  1. 如果输入内只有一个元素,则直接返回,否则将长度为 $n$ 的输入序列分成两个长度为 $n/2$ 的子序列;
  2. 分别对这两个子序列进行归并排序,使子序列变为有序状态;
  3. 设定两个指针,分别指向两个已经排序子序列的起始位置;
  4. 比较两个指针所指向的元素,选择相对小的元素放入到合并空间(用于存放排序结果),并移动指针到下一位置;
  5. 重复步骤 3 ~ 4 直到某一指针达到序列尾;
  6. 将另一序列剩下的所有元素直接复制到合并序列尾。

图解算法

MergeSort

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 归并排序
*
* @param arr
* @return arr
*/
public static int[] mergeSort(int[] arr) {
if (arr.length <= 1) {
return arr;
}
int middle = arr.length / 2;
int[] arr_1 = Arrays.copyOfRange(arr, 0, middle);
int[] arr_2 = Arrays.copyOfRange(arr, middle, arr.length);
return merge(mergeSort(arr_1), mergeSort(arr_2));
}

/**
* Merge two sorted arrays
*
* @param arr_1
* @param arr_2
* @return sorted_arr
*/
public static int[] merge(int[] arr_1, int[] arr_2) {
int[] sorted_arr = new int[arr_1.length + arr_2.length];
int idx = 0, idx_1 = 0, idx_2 = 0;
while (idx_1 < arr_1.length && idx_2 < arr_2.length) {
if (arr_1[idx_1] < arr_2[idx_2]) {
sorted_arr[idx] = arr_1[idx_1];
idx_1 += 1;
} else {
sorted_arr[idx] = arr_2[idx_2];
idx_2 += 1;
}
idx += 1;
}
if (idx_1 < arr_1.length) {
while (idx_1 < arr_1.length) {
sorted_arr[idx] = arr_1[idx_1];
idx_1 += 1;
idx += 1;
}
} else {
while (idx_2 < arr_2.length) {
sorted_arr[idx] = arr_2[idx_2];
idx_2 += 1;
idx += 1;
}
}
return sorted_arr;
}

算法分析

  • 稳定性:稳定
  • 时间复杂度:最佳:$O(nlogn)$, 最差:$O(nlogn)$, 平均:$O(nlogn)$
  • 空间复杂度:$O(n)$

快速排序 (Quick Sort)

快速排序用到了分治思想,同样的还有归并排序。乍看起来快速排序和归并排序非常相似,都是将问题变小,先排序子串,最后合并。不同的是快速排序在划分子问题的时候经过多一步处理,将划分的两组数据划分为一大一小,这样在最后合并的时候就不必像归并排序那样再进行比较。但也正因为如此,划分的不定性使得快速排序的时间复杂度并不稳定。

快速排序的基本思想:通过一趟排序将待排序列分隔成独立的两部分,其中一部分记录的元素均比另一部分的元素小,则可分别对这两部分子序列继续进行排序,以达到整个序列有序。

算法步骤

快速排序使用分治法(Divide and conquer)策略来把一个序列分为较小和较大的 2 个子序列,然后递归地排序两个子序列。具体算法描述如下:

  1. 从序列中随机挑出一个元素,做为 “基准”(pivot);
  2. 重新排列序列,将所有比基准值小的元素摆放在基准前面,所有比基准值大的摆在基准的后面(相同的数可以到任一边)。在这个操作结束之后,该基准就处于数列的中间位置。这个称为分区(partition)操作;
  3. 递归地把小于基准值元素的子序列和大于基准值元素的子序列进行快速排序。

图解算法

RandomQuickSort

代码实现

来源:使用 Java 实现快速排序(详解)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static int partition(int[] array, int low, int high) {
int pivot = array[high];
int pointer = low;
for (int i = low; i < high; i++) {
if (array[i] <= pivot) {
int temp = array[i];
array[i] = array[pointer];
array[pointer] = temp;
pointer++;
}
System.out.println(Arrays.toString(array));
}
int temp = array[pointer];
array[pointer] = array[high];
array[high] = temp;
return pointer;
}
public static void quickSort(int[] array, int low, int high) {
if (low < high) {
int position = partition(array, low, high);
quickSort(array, low, position - 1);
quickSort(array, position + 1, high);
}
}

算法分析

  • 稳定性:不稳定
  • 时间复杂度:最佳:$O(nlogn)$, 最差:$O(n^2)$,平均:$O(nlogn)$
  • 空间复杂度:$O(logn)$

堆排序 (Heap Sort)

堆排序是指利用堆这种数据结构所设计的一种排序算法。堆是一个近似完全二叉树的结构,并同时满足堆的性质:即子结点的值总是小于(或者大于)它的父节点。

算法步骤

  1. 将初始待排序列 $(R_1, R_2, \dots, R_n)$ 构建成大顶堆,此堆为初始的无序区;
  2. 将堆顶元素 $R_1$ 与最后一个元素 $R_n$ 交换,此时得到新的无序区 $(R_1, R_2, \dots, R_{n-1})$ 和新的有序区 $R_n$, 且满足 $R_i \leqslant R_n (i \in 1, 2,\dots, n-1)$;
  3. 由于交换后新的堆顶 $R_1$ 可能违反堆的性质,因此需要对当前无序区 $(R_1, R_2, \dots, R_{n-1})$ 调整为新堆,然后再次将 $R_1$ 与无序区最后一个元素交换,得到新的无序区 $(R_1, R_2, \dots, R_{n-2})$ 和新的有序区 $(R_{n-1}, R_n)$。不断重复此过程直到有序区的元素个数为 $n-1$,则整个排序过程完成。

图解算法

HeapSort

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Global variable that records the length of an array;
static int heapLen;

/**
* Swap the two elements of an array
* @param arr
* @param i
* @param j
*/
private static void swap(int[] arr, int i, int j) {
int tmp = arr[i];
arr[i] = arr[j];
arr[j] = tmp;
}

/**
* Build Max Heap
* @param arr
*/
private static void buildMaxHeap(int[] arr) {
for (int i = arr.length / 2 - 1; i >= 0; i--) {
heapify(arr, i);
}
}

/**
* Adjust it to the maximum heap
* @param arr
* @param i
*/
private static void heapify(int[] arr, int i) {
int left = 2 * i + 1;
int right = 2 * i + 2;
int largest = i;
if (right < heapLen && arr[right] > arr[largest]) {
largest = right;
}
if (left < heapLen && arr[left] > arr[largest]) {
largest = left;
}
if (largest != i) {
swap(arr, largest, i);
heapify(arr, largest);
}
}

/**
* Heap Sort
* @param arr
* @return
*/
public static int[] heapSort(int[] arr) {
// index at the end of the heap
heapLen = arr.length;
// build MaxHeap
buildMaxHeap(arr);
for (int i = arr.length - 1; i > 0; i--) {
// Move the top of the heap to the tail of the heap in turn
swap(arr, 0, i);
heapLen -= 1;
heapify(arr, 0);
}
return arr;
}

算法分析

  • 稳定性:不稳定
  • 时间复杂度:最佳:$O(nlogn)$, 最差:$O(nlogn)$, 平均:$O(nlogn)$
  • 空间复杂度:$O(1)$

计数排序 (Counting Sort)

计数排序的核心在于将输入的数据值转化为键存储在额外开辟的数组空间中。 作为一种线性时间复杂度的排序,计数排序要求输入的数据必须是有确定范围的整数。

计数排序 (Counting sort) 是一种稳定的排序算法。计数排序使用一个额外的数组 C,其中第 i 个元素是待排序数组 A 中值等于 i 的元素的个数。然后根据数组 C 来将 A 中的元素排到正确的位置。它只能对整数进行排序。

算法步骤

  1. 找出数组中的最大值 max、最小值 min;
  2. 创建一个新数组 C,其长度是 max-min+1,其元素默认值都为 0;
  3. 遍历原数组 A 中的元素 A[i],以 A[i] - min 作为 C 数组的索引,以 A[i] 的值在 A 中元素出现次数作为 C[A[i] - min] 的值;
  4. 对 C 数组变形,新元素的值是该元素与前一个元素值的和,即当 i>1 时 C[i] = C[i] + C[i-1];
  5. 创建结果数组 R,长度和原始数组一样。
  6. 从后向前遍历原始数组 A 中的元素 A[i],使用 A[i] 减去最小值 min 作为索引,在计数数组 C 中找到对应的值 C[A[i] - min],C[A[i] - min] - 1 就是 A[i] 在结果数组 R 中的位置,做完上述这些操作,将 count[A[i] - min] 减小 1。

图解算法

CountingSort

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Gets the maximum and minimum values in the array
*
* @param arr
* @return
*/
private static int[] getMinAndMax(int[] arr) {
int maxValue = arr[0];
int minValue = arr[0];
for (int i = 0; i < arr.length; i++) {
if (arr[i] > maxValue) {
maxValue = arr[i];
} else if (arr[i] < minValue) {
minValue = arr[i];
}
}
return new int[] { minValue, maxValue };
}

/**
* Counting Sort
*
* @param arr
* @return
*/
public static int[] countingSort(int[] arr) {
if (arr.length < 2) {
return arr;
}
int[] extremum = getMinAndMax(arr);
int minValue = extremum[0];
int maxValue = extremum[1];
int[] countArr = new int[maxValue - minValue + 1];
int[] result = new int[arr.length];

for (int i = 0; i < arr.length; i++) {
countArr[arr[i] - minValue] += 1;
}
for (int i = 1; i < countArr.length; i++) {
countArr[i] += countArr[i - 1];
}
for (int i = arr.length - 1; i >= 0; i--) {
int idx = countArr[arr[i] - minValue] - 1;
result[idx] = arr[i];
countArr[arr[i] - minValue] -= 1;
}
return result;
}

算法分析

当输入的元素是 n 个 0 到 k 之间的整数时,它的运行时间是 $O(n+k)$。计数排序不是比较排序,排序的速度快于任何比较排序算法。由于用来计数的数组 C 的长度取决于待排序数组中数据的范围(等于待排序数组的最大值与最小值的差加上 1),这使得计数排序对于数据范围很大的数组,需要大量额外内存空间。

  • 稳定性:稳定
  • 时间复杂度:最佳:$O(n+k)$ 最差:$O(n+k)$ 平均:$O(n+k)$
  • 空间复杂度:$O(k)$

桶排序 (Bucket Sort)

桶排序是计数排序的升级版。它利用了函数的映射关系,高效与否的关键就在于这个映射函数的确定。为了使桶排序更加高效,我们需要做到这两点:

  1. 在额外空间充足的情况下,尽量增大桶的数量
  2. 使用的映射函数能够将输入的 N 个数据均匀的分配到 K 个桶中

桶排序的工作的原理:假设输入数据服从均匀分布,将数据分到有限数量的桶里,每个桶再分别排序(有可能再使用别的排序算法或是以递归方式继续使用桶排序进行。

算法步骤

  1. 设置一个 BucketSize,作为每个桶所能放置多少个不同数值;
  2. 遍历输入数据,并且把数据依次映射到对应的桶里去;
  3. 对每个非空的桶进行排序,可以使用其它排序方法,也可以递归使用桶排序;
  4. 从非空桶里把排好序的数据拼接起来。

图解算法

BucketSort

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Gets the maximum and minimum values in the array
* @param arr
* @return
*/
private static int[] getMinAndMax(List<Integer> arr) {
int maxValue = arr.get(0);
int minValue = arr.get(0);
for (int i : arr) {
if (i > maxValue) {
maxValue = i;
} else if (i < minValue) {
minValue = i;
}
}
return new int[] { minValue, maxValue };
}

/**
* Bucket Sort
* @param arr
* @return
*/
public static List<Integer> bucketSort(List<Integer> arr, int bucket_size) {
if (arr.size() < 2 || bucket_size == 0) {
return arr;
}
int[] extremum = getMinAndMax(arr);
int minValue = extremum[0];
int maxValue = extremum[1];
int bucket_cnt = (maxValue - minValue) / bucket_size + 1;
List<List<Integer>> buckets = new ArrayList<>();
for (int i = 0; i < bucket_cnt; i++) {
buckets.add(new ArrayList<Integer>());
}
for (int element : arr) {
int idx = (element - minValue) / bucket_size;
buckets.get(idx).add(element);
}
for (int i = 0; i < buckets.size(); i++) {
if (buckets.get(i).size() > 1) {
buckets.set(i, sort(buckets.get(i), bucket_size / 2));
}
}
ArrayList<Integer> result = new ArrayList<>();
for (List<Integer> bucket : buckets) {
for (int element : bucket) {
result.add(element);
}
}
return result;
}

算法分析

  • 稳定性:稳定
  • 时间复杂度:最佳:$O(n+k)$ 最差:$O(n^2)$ 平均:$O(n+k)$
  • 空间复杂度:$O(n+k)$

基数排序 (Radix Sort)

基数排序也是非比较的排序算法,对元素中的每一位数字进行排序,从最低位开始排序,复杂度为 $O(n×k)$,$n$ 为数组长度,$k$ 为数组中元素的最大的位数;

基数排序是按照低位先排序,然后收集;再按照高位排序,然后再收集;依次类推,直到最高位。有时候有些属性是有优先级顺序的,先按低优先级排序,再按高优先级排序。最后的次序就是高优先级高的在前,高优先级相同的低优先级高的在前。基数排序基于分别排序,分别收集,所以是稳定的。

算法步骤

  1. 取得数组中的最大数,并取得位数,即为迭代次数 $N$(例如:数组中最大数值为 1000,则 $N=4$);
  2. A 为原始数组,从最低位开始取每个位组成 radix 数组;
  3. 对 radix 进行计数排序(利用计数排序适用于小范围数的特点);
  4. 将 radix 依次赋值给原数组;
  5. 重复 2~4 步骤 $N$ 次

图解算法

RadixSort

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Radix Sort
*
* @param arr
* @return
*/
public static int[] radixSort(int[] arr) {
if (arr.length < 2) {
return arr;
}
int N = 1;
int maxValue = arr[0];
for (int element : arr) {
if (element > maxValue) {
maxValue = element;
}
}
while (maxValue / 10 != 0) {
maxValue = maxValue / 10;
N += 1;
}
for (int i = 0; i < N; i++) {
List<List<Integer>> radix = new ArrayList<>();
for (int k = 0; k < 10; k++) {
radix.add(new ArrayList<Integer>());
}
for (int element : arr) {
int idx = (element / (int) Math.pow(10, i)) % 10;
radix.get(idx).add(element);
}
int idx = 0;
for (List<Integer> l : radix) {
for (int n : l) {
arr[idx++] = n;
}
}
}
return arr;
}

算法分析

  • 稳定性:稳定
  • 时间复杂度:最佳:$O(n×k)$ 最差:$O(n×k)$ 平均:$O(n×k)$
  • 空间复杂度:$O(n+k)$

基数排序 vs 计数排序 vs 桶排序

这三种排序算法都利用了桶的概念,但对桶的使用方法上有明显差异:

  • 基数排序:根据键值的每位数字来分配桶
  • 计数排序:每个桶只存储单一键值
  • 桶排序:每个桶存储一定范围的数值

参考文章

  • https://www.cnblogs.com/guoyaohua/p/8600214.html
  • https://en.wikipedia.org/wiki/Sorting_algorithm
  • https://sort.hust.cc/

第k小、top k问题

发表于 2024-08-19 | 分类于 算法 | 阅读次数:
字数统计: 270 字 | 阅读时长 ≈ 1 分钟

通用思想

第k大(小)、TOP K问题都可以用++堆++来解决。第k大、最大的TOP K建小根堆。Java中优先队列就是小根堆。首先将前 k 个数插入小根堆中,随后从第 k+1 个数开始遍历,如果当前遍历到的数比小根堆的堆顶的数要大,就把堆顶的数弹出,再插入当前遍历到的数。最后将小根堆里的数存入数组返回即可。

当然,更笨的方法是,对于求第k小、top k小的问题,将所有的元素构建一个最小堆,然后按照堆排序的办法,弹出交换k次,这种不采用,因为空间要求太大。

也可以用快速排序的分割思路。

例外

对于有些有明显的大小顺序进入的问题,求第k小,便可以直接构建最小堆,不规定堆的大小,往堆中压入k次,即可得到答案。

例如264. 丑数 II、786. 第 K 个最小的素数分数、

并查集

发表于 2024-08-15 | 分类于 算法 | 阅读次数:
字数统计: 6 字 | 阅读时长 ≈ 1 分钟

例题:省份数量

Redis常见面试题总结(上)

发表于 2024-08-09 | 分类于 分布式 , redis | 阅读次数:
字数统计: 13.3k 字 | 阅读时长 ≈ 49 分钟

Redis 基础

什么是 Redis?

Redis (REmote DIctionary Server)是一个基于 C 语言开发的开源 NoSQL 数据库(BSD 许可)。与传统数据库不同的是,Redis 的数据是保存在内存中的(内存数据库,支持持久化),因此读写速度非常快,被广泛应用于分布式缓存方向。并且,Redis 存储的是 KV 键值对数据。

为了满足不同的业务场景,Redis 内置了多种数据类型实现(比如 String、Hash、Sorted Set、Bitmap、HyperLogLog、GEO)。并且,Redis 还支持事务、持久化、Lua 脚本、发布订阅模型、多种开箱即用的集群方案(Redis Sentinel、Redis Cluster)。

Redis 数据类型概览

Redis 没有外部依赖,Linux 和 OS X 是 Redis 开发和测试最多的两个操作系统,官方推荐生产环境使用 Linux 部署 Redis。

个人学习的话,你可以自己本机安装 Redis 或者通过 Redis 官网提供的在线 Redis 环境(少部分命令无法使用)来实际体验 Redis。

try-redis

全世界有非常多的网站使用到了 Redis ,techstacks.io 专门维护了一个使用 Redis 的热门站点列表 ,感兴趣的话可以看看。

Redis 为什么这么快?

Redis 内部做了非常多的性能优化,比较重要的有下面 3 点:

  1. Redis 基于内存,内存的访问速度比磁盘快很多;
  2. Redis 基于 Reactor 模式设计开发了一套高效的事件处理模型,主要是单线程事件循环和 IO 多路复用(Redis 线程模式后面会详细介绍到);
  3. Redis 内置了多种优化过后的数据类型/结构实现,性能非常高。
  4. Redis 通信协议实现简单且解析高效。

下面这张图片总结的挺不错的,分享一下,出自 Why is Redis so fast? 。

why-redis-so-fast

那既然都这么快了,为什么不直接用 Redis 当主数据库呢?主要是因为内存成本太高且 Redis 提供的数据持久化仍然有数据丢失的风险。

除了 Redis,你还知道其他分布式缓存方案吗?

如果面试中被问到这个问题的话,面试官主要想看看:

  1. 你在选择 Redis 作为分布式缓存方案时,是否是经过严谨的调研和思考,还是只是因为 Redis 是当前的“热门”技术。
  2. 你在分布式缓存方向的技术广度。

如果你了解其他方案,并且能解释为什么最终选择了 Redis(更进一步!),这会对你面试表现加分不少!

下面简单聊聊常见的分布式缓存技术选型。

分布式缓存的话,比较老牌同时也是使用的比较多的还是 Memcached 和 Redis。不过,现在基本没有看过还有项目使用 Memcached 来做缓存,都是直接用 Redis。

Memcached 是分布式缓存最开始兴起的那会,比较常用的。后来,随着 Redis 的发展,大家慢慢都转而使用更加强大的 Redis 了。

有一些大厂也开源了类似于 Redis 的分布式高性能 KV 存储数据库,例如,腾讯开源的 Tendis 。Tendis 基于知名开源项目 RocksDB 作为存储引擎 ,100% 兼容 Redis 协议和 Redis4.0 所有数据模型。关于 Redis 和 Tendis 的对比,腾讯官方曾经发过一篇文章:Redis vs Tendis:冷热混合存储版架构揭秘 ,可以简单参考一下。

不过,从 Tendis 这个项目的 Github 提交记录可以看出,Tendis 开源版几乎已经没有被维护更新了,加上其关注度并不高,使用的公司也比较少。因此,不建议你使用 Tendis 来实现分布式缓存。

目前,比较业界认可的 Redis 替代品还是下面这两个开源分布式缓存(都是通过碰瓷 Redis 火的):

  • Dragonfly:一种针对现代应用程序负荷需求而构建的内存数据库,完全兼容 Redis 和 Memcached 的 API,迁移时无需修改任何代码,号称全世界最快的内存数据库。
  • KeyDB: Redis 的一个高性能分支,专注于多线程、内存效率和高吞吐量。

不过,个人还是建议分布式缓存首选 Redis ,毕竟经过这么多年的生考验,生态也这么优秀,资料也很全面!

PS:篇幅问题,我这并没有对上面提到的分布式缓存选型做详细介绍和对比,感兴趣的话,可以自行研究一下。

说一下 Redis 和 Memcached 的区别和共同点

现在公司一般都是用 Redis 来实现缓存,而且 Redis 自身也越来越强大了!不过,了解 Redis 和 Memcached 的区别和共同点,有助于我们在做相应的技术选型的时候,能够做到有理有据!

共同点:

  1. 都是基于内存的数据库,一般都用来当做缓存使用。
  2. 都有过期策略。
  3. 两者的性能都非常高。

区别:

  1. 数据类型:Redis 支持更丰富的数据类型(支持更复杂的应用场景)。Redis 不仅仅支持简单的 k/v 类型的数据,同时还提供 list,set,zset,hash 等数据结构的存储。Memcached 只支持最简单的 k/v 数据类型。
  2. 数据持久化:Redis 支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用,而 Memcached 把数据全部存在内存之中。也就是说,Redis 有灾难恢复机制而 Memcached 没有。
  3. 集群模式支持:Memcached 没有原生的集群模式,需要依靠客户端来实现往集群中分片写入数据;但是 Redis 自 3.0 版本起是原生支持集群模式的。
  4. 线程模型:Memcached 是多线程,非阻塞 IO 复用的网络模型;Redis 使用单线程的多路 IO 复用模型。 (Redis 6.0 针对网络数据的读写引入了多线程)
  5. 特性支持:Redis 支持发布订阅模型、Lua 脚本、事务等功能,而 Memcached 不支持。并且,Redis 支持更多的编程语言。
  6. 过期数据删除:Memcached 过期数据的删除策略只用了惰性删除,而 Redis 同时使用了惰性删除与定期删除。

相信看了上面的对比之后,我们已经没有什么理由可以选择使用 Memcached 来作为自己项目的分布式缓存了。

为什么要用 Redis?

1、访问速度更快

传统数据库数据保存在磁盘,而 Redis 基于内存,内存的访问速度比磁盘快很多。引入 Redis 之后,我们可以把一些高频访问的数据放到 Redis 中,这样下次就可以直接从内存中读取,速度可以提升几十倍甚至上百倍。

2、高并发

一般像 MySQL 这类的数据库的 QPS 大概都在 4k 左右(4 核 8g) ,但是使用 Redis 缓存之后很容易达到 5w+,甚至能达到 10w+(就单机 Redis 的情况,Redis 集群的话会更高)。

QPS(Query Per Second):服务器每秒可以执行的查询次数;

由此可见,直接操作缓存能够承受的数据库请求数量是远远大于直接访问数据库的,所以我们可以考虑把数据库中的部分数据转移到缓存中去,这样用户的一部分请求会直接到缓存这里而不用经过数据库。进而,我们也就提高了系统整体的并发。

3、功能全面

Redis 除了可以用作缓存之外,还可以用于分布式锁、限流、消息队列、延时队列等场景,功能强大!

常见的缓存读写策略有哪些?

关于常见的缓存读写策略的详细介绍,可以看我写的这篇文章:3 种常用的缓存读写策略详解 。

什么是 Redis Module?有什么用?

Redis 从 4.0 版本开始,支持通过 Module 来扩展其功能以满足特殊的需求。这些 Module 以动态链接库(so 文件)的形式被加载到 Redis 中,这是一种非常灵活的动态扩展功能的实现方式,值得借鉴学习!

我们每个人都可以基于 Redis 去定制化开发自己的 Module,比如实现搜索引擎功能、自定义分布式锁和分布式限流。

目前,被 Redis 官方推荐的 Module 有:

  • RediSearch:用于实现搜索引擎的模块。
  • RedisJSON:用于处理 JSON 数据的模块。
  • RedisGraph:用于实现图形数据库的模块。
  • RedisTimeSeries:用于处理时间序列数据的模块。
  • RedisBloom:用于实现布隆过滤器的模块。
  • RedisAI:用于执行深度学习/机器学习模型并管理其数据的模块。
  • RedisCell:用于实现分布式限流的模块。
  • ……

关于 Redis 模块的详细介绍,可以查看官方文档:https://redis.io/modules。

Redis 应用

Redis 除了做缓存,还能做什么?

  • 分布式锁:通过 Redis 来做分布式锁是一种比较常见的方式。通常情况下,我们都是基于 Redisson 来实现分布式锁。关于 Redis 实现分布式锁的详细介绍,可以看我写的这篇文章:分布式锁详解 。
  • 限流:一般是通过 Redis + Lua 脚本的方式来实现限流。如果不想自己写 Lua 脚本的话,也可以直接利用 Redisson 中的 RRateLimiter 来实现分布式限流,其底层实现就是基于 Lua 代码+令牌桶算法。
  • 消息队列:Redis 自带的 List 数据结构可以作为一个简单的队列使用。Redis 5.0 中增加的 Stream 类型的数据结构更加适合用来做消息队列。它比较类似于 Kafka,有主题和消费组的概念,支持消息持久化以及 ACK 机制。
  • 延时队列:Redisson 内置了延时队列(基于 Sorted Set 实现的)。
  • 分布式 Session :利用 String 或者 Hash 数据类型保存 Session 数据,所有的服务器都可以访问。
  • 复杂业务场景:通过 Redis 以及 Redis 扩展(比如 Redisson)提供的数据结构,我们可以很方便地完成很多复杂的业务场景比如通过 Bitmap 统计活跃用户、通过 Sorted Set 维护排行榜。
  • ……

如何基于 Redis 实现分布式锁?

关于 Redis 实现分布式锁的详细介绍,可以看我写的这篇文章:分布式锁详解 。

Redis 可以做消息队列么?

实际项目中使用 Redis 来做消息队列的非常少,毕竟有更成熟的消息队列中间件可以用。

先说结论:可以是可以,但不建议使用 Redis 来做消息队列。和专业的消息队列相比,还是有很多欠缺的地方。

Redis 2.0 之前,如果想要使用 Redis 来做消息队列的话,只能通过 List 来实现。

通过 RPUSH/LPOP 或者 LPUSH/RPOP即可实现简易版消息队列:

1
2
3
4
5
6
7
8
# 生产者生产消息
> RPUSH myList msg1 msg2
(integer) 2
> RPUSH myList msg3
(integer) 3
# 消费者消费消息
> LPOP myList
"msg1"

不过,通过 RPUSH/LPOP 或者 LPUSH/RPOP这样的方式存在性能问题,我们需要不断轮询去调用 RPOP 或 LPOP 来消费消息。当 List 为空时,大部分的轮询的请求都是无效请求,这种方式大量浪费了系统资源。

因此,Redis 还提供了 BLPOP、BRPOP 这种阻塞式读取的命令(带 B-Blocking 的都是阻塞式),并且还支持一个超时参数。如果 List 为空,Redis 服务端不会立刻返回结果,它会等待 List 中有新数据后再返回或者是等待最多一个超时时间后返回空。如果将超时时间设置为 0 时,即可无限等待,直到弹出消息

1
2
3
4
# 超时时间为 10s
# 如果有数据立刻返回,否则最多等待10秒
> BRPOP myList 10
null

List 实现消息队列功能太简单,像消息确认机制等功能还需要我们自己实现,最要命的是没有广播机制,消息也只能被消费一次。

Redis 2.0 引入了发布订阅 (pub/sub) 功能,解决了 List 实现消息队列没有广播机制的问题。

Redis 发布订阅 (pub/sub) 功能

pub/sub 中引入了一个概念叫 channel(频道),发布订阅机制的实现就是基于这个 channel 来做的。

pub/sub 涉及发布者(Publisher)和订阅者(Subscriber,也叫消费者)两个角色:

  • 发布者通过 PUBLISH 投递消息给指定 channel。
  • 订阅者通过SUBSCRIBE订阅它关心的 channel。并且,订阅者可以订阅一个或者多个 channel。

我们这里启动 3 个 Redis 客户端来简单演示一下:

pub/sub 实现消息队列演示

pub/sub 既能单播又能广播,还支持 channel 的简单正则匹配。不过,消息丢失(客户端断开连接或者 Redis 宕机都会导致消息丢失)、消息堆积(发布者发布消息的时候不会管消费者的具体消费能力如何)等问题依然没有一个比较好的解决办法。

为此,Redis 5.0 新增加的一个数据结构 Stream 来做消息队列。Stream 支持:

  • 发布 / 订阅模式
  • 按照消费者组进行消费(借鉴了 Kafka 消费者组的概念)
  • 消息持久化( RDB 和 AOF)
  • ACK 机制(通过确认机制来告知已经成功处理了消息)
  • 阻塞式获取消息

Stream 的结构如下:

这是一个有序的消息链表,每个消息都有一个唯一的 ID 和对应的内容。ID 是一个时间戳和序列号的组合,用来保证消息的唯一性和递增性。内容是一个或多个键值对(类似 Hash 基本数据类型),用来存储消息的数据。

这里再对图中涉及到的一些概念,进行简单解释:

  • Consumer Group:消费者组用于组织和管理多个消费者。消费者组本身不处理消息,而是再将消息分发给消费者,由消费者进行真正的消费
  • last_delivered_id:标识消费者组当前消费位置的游标,消费者组中任意一个消费者读取了消息都会使 last_delivered_id 往前移动。
  • pending_ids:记录已经被客户端消费但没有 ack 的消息的 ID。

下面是Stream 用作消息队列时常用的命令:

  • XADD:向流中添加新的消息。
  • XREAD:从流中读取消息。
  • XREADGROUP:从消费组中读取消息。
  • XRANGE:根据消息 ID 范围读取流中的消息。
  • XREVRANGE:与 XRANGE 类似,但以相反顺序返回结果。
  • XDEL:从流中删除消息。
  • XTRIM:修剪流的长度,可以指定修建策略(MAXLEN/MINID)。
  • XLEN:获取流的长度。
  • XGROUP CREATE:创建消费者组。
  • XGROUP DESTROY : 删除消费者组
  • XGROUP DELCONSUMER:从消费者组中删除一个消费者。
  • XGROUP SETID:为消费者组设置新的最后递送消息 ID
  • XACK:确认消费组中的消息已被处理。
  • XPENDING:查询消费组中挂起(未确认)的消息。
  • XCLAIM:将挂起的消息从一个消费者转移到另一个消费者。
  • XINFO:获取流(XINFO STREAM)、消费组(XINFO GROUPS)或消费者(XINFO CONSUMERS)的详细信息。

Stream 使用起来相对要麻烦一些,这里就不演示了。

总的来说,Stream 已经可以满足一个消息队列的基本要求了。不过,Stream 在实际使用中依然会有一些小问题不太好解决比如在 Redis 发生故障恢复后不能保证消息至少被消费一次。

综上,和专业的消息队列相比,使用 Redis 来实现消息队列还是有很多欠缺的地方比如消息丢失和堆积问题不好解决。因此,我们通常建议不要使用 Redis 来做消息队列,你完全可以选择市面上比较成熟的一些消息队列比如 RocketMQ、Kafka。不过,如果你就是想要用 Redis 来做消息队列的话,那我建议你优先考虑 Stream,这是目前相对最优的 Redis 消息队列实现。

相关阅读:Redis 消息队列发展历程 - 阿里开发者 - 2022。

Redis 可以做搜索引擎么?

Redis 是可以实现全文搜索引擎功能的,需要借助 RediSearch ,这是一个基于 Redis 的搜索引擎模块。

RediSearch 支持中文分词、聚合统计、停用词、同义词、拼写检查、标签查询、向量相似度查询、多关键词搜索、分页搜索等功能,算是一个功能比较完善的全文搜索引擎了。

相比较于 Elasticsearch 来说,RediSearch 主要在下面两点上表现更优异一些:

  1. 性能更优秀:依赖 Redis 自身的高性能,基于内存操作(Elasticsearch 基于磁盘)。
  2. 较低内存占用实现快速索引:RediSearch 内部使用压缩的倒排索引,所以可以用较低的内存占用来实现索引的快速构建。

对于小型项目的简单搜索场景来说,使用 RediSearch 来作为搜索引擎还是没有问题的(搭配 RedisJSON 使用)。

对于比较复杂或者数据规模较大的搜索场景还是不太建议使用 RediSearch 来作为搜索引擎,主要是因为下面这些限制和问题:

  1. 数据量限制:Elasticsearch 可以支持 PB 级别的数据量,可以轻松扩展到多个节点,利用分片机制提高可用性和性能。RedisSearch 是基于 Redis 实现的,其能存储的数据量受限于 Redis 的内存容量,不太适合存储大规模的数据(内存昂贵,扩展能力较差)。
  2. 分布式能力较差:Elasticsearch 是为分布式环境设计的,可以轻松扩展到多个节点。虽然 RedisSearch 支持分布式部署,但在实际应用中可能会面临一些挑战,如数据分片、节点间通信、数据一致性等问题。
  3. 聚合功能较弱:Elasticsearch 提供了丰富的聚合功能,而 RediSearch 的聚合功能相对较弱,只支持简单的聚合操作。
  4. 生态较差:Elasticsearch 可以轻松和常见的一些系统/软件集成比如 Hadoop、Spark、Kibana,而 RedisSearch 则不具备该优势。

Elasticsearch 适用于全文搜索、复杂查询、实时数据分析和聚合的场景,而 RediSearch 适用于快速数据存储、缓存和简单查询的场景。

如何基于 Redis 实现延时任务?

类似的问题:

  • 订单在 10 分钟后未支付就失效,如何用 Redis 实现?
  • 红包 24 小时未被查收自动退还,如何用 Redis 实现?

基于 Redis 实现延时任务的功能无非就下面两种方案:

  1. Redis 过期事件监听
  2. Redisson 内置的延时队列

Redis 过期事件监听的存在时效性较差、丢消息、多服务实例下消息重复消费等问题,不被推荐使用。

Redisson 内置的延时队列具备下面这些优势:

  1. 减少了丢消息的可能:DelayedQueue 中的消息会被持久化,即使 Redis 宕机了,根据持久化机制,也只可能丢失一点消息,影响不大。当然了,你也可以使用扫描数据库的方法作为补偿机制。
  2. 消息不存在重复消费问题:每个客户端都是从同一个目标队列中获取任务的,不存在重复消费的问题。

关于 Redis 实现延时任务的详细介绍,可以看我写的这篇文章:如何基于 Redis 实现延时任务?。

Redis 数据类型

关于 Redis 5 种基础数据类型和 3 种特殊数据类型的详细介绍请看下面这两篇文章以及 Redis 官方文档 :

  • Redis 5 种基本数据类型详解
  • Redis 3 种特殊数据类型详解

Redis 常用的数据类型有哪些?

Redis 中比较常见的数据类型有下面这些:

  • 5 种基础数据类型:String(字符串)、List(列表)、Set(集合)、Hash(散列)、Zset(有序集合)。
  • 3 种特殊数据类型:HyperLogLog(基数统计)、Bitmap (位图)、Geospatial (地理位置)。

除了上面提到的之外,还有一些其他的比如 Bloom filter(布隆过滤器)、Bitfield(位域)。

String 的应用场景有哪些?

String 是 Redis 中最简单同时也是最常用的一个数据类型。它是一种二进制安全的数据类型,可以用来存储任何类型的数据比如字符串、整数、浮点数、图片(图片的 base64 编码或者解码或者图片的路径)、序列化后的对象。

String 的常见应用场景如下:

  • 常规数据(比如 Session、Token、序列化后的对象、图片的路径)的缓存;
  • 计数比如用户单位时间的请求数(简单限流可以用到)、页面单位时间的访问数;
  • 分布式锁(利用 SETNX key value 命令可以实现一个最简易的分布式锁);
  • ……

关于 String 的详细介绍请看这篇文章:Redis 5 种基本数据类型详解。

String 还是 Hash 存储对象数据更好呢?

简单对比一下二者:

  • 对象存储方式:String 存储的是序列化后的对象数据,存放的是整个对象,操作简单直接。Hash 是对对象的每个字段单独存储,可以获取部分字段的信息,也可以修改或者添加部分字段,节省网络流量。如果对象中某些字段需要经常变动或者经常需要单独查询对象中的个别字段信息,Hash 就非常适合。
  • 内存消耗:Hash 通常比 String 更节省内存,特别是在字段较多且字段长度较短时。Redis 对小型 Hash 进行优化(如使用 ziplist 存储),进一步降低内存占用。
  • 复杂对象存储:String 在处理多层嵌套或复杂结构的对象时更方便,因为无需处理每个字段的独立存储和操作。
  • 性能:String 的操作通常具有 O(1) 的时间复杂度,因为它存储的是整个对象,操作简单直接,整体读写的性能较好。Hash 由于需要处理多个字段的增删改查操作,在字段较多且经常变动的情况下,可能会带来额外的性能开销。

总结:

  • 在绝大多数情况下,String 更适合存储对象数据,尤其是当对象结构简单且整体读写是主要操作时。
  • 如果你需要频繁操作对象的部分字段或节省内存,Hash 可能是更好的选择。

String 的底层实现是什么?

Redis 是基于 C 语言编写的,但 Redis 的 String 类型的底层实现并不是 C 语言中的字符串(即以空字符 \0 结尾的字符数组),而是自己编写了 SDS(Simple Dynamic String,简单动态字符串) 来作为底层实现。

SDS 最早是 Redis 作者为日常 C 语言开发而设计的 C 字符串,后来被应用到了 Redis 上,并经过了大量的修改完善以适合高性能操作。

Redis7.0 的 SDS 的部分源码如下(https://github.com/redis/redis/blob/7.0/src/sds.h):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* Note: sdshdr5 is never used, we just access the flags byte directly.
* However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};

通过源码可以看出,SDS 共有五种实现方式 SDS_TYPE_5(并未用到)、SDS_TYPE_8、SDS_TYPE_16、SDS_TYPE_32、SDS_TYPE_64,其中只有后四种实际用到。Redis 会根据初始化的长度决定使用哪种类型,从而减少内存的使用。

类型 字节 位
sdshdr5 < 1 <8
sdshdr8 1 8
sdshdr16 2 16
sdshdr32 4 32
sdshdr64 8 64

对于后四种实现都包含了下面这 4 个属性:

  • len:字符串的长度也就是已经使用的字节数
  • alloc:总共可用的字符空间大小,alloc-len 就是 SDS 剩余的空间大小
  • buf[]:实际存储字符串的数组
  • flags:低三位保存类型标志

SDS 相比于 C 语言中的字符串有如下提升:

  1. 可以避免缓冲区溢出:C 语言中的字符串被修改(比如拼接)时,一旦没有分配足够长度的内存空间,就会造成缓冲区溢出。SDS 被修改时,会先根据 len 属性检查空间大小是否满足要求,如果不满足,则先扩展至所需大小再进行修改操作。
  2. 获取字符串长度的复杂度较低:C 语言中的字符串的长度通常是经过遍历计数来实现的,时间复杂度为 O(n)。SDS 的长度获取直接读取 len 属性即可,时间复杂度为 O(1)。
  3. 减少内存分配次数:为了避免修改(增加/减少)字符串时,每次都需要重新分配内存(C 语言的字符串是这样的),SDS 实现了空间预分配和惰性空间释放两种优化策略。当 SDS 需要增加字符串时,Redis 会为 SDS 分配好内存,并且根据特定的算法分配多余的内存,这样可以减少连续执行字符串增长操作所需的内存重分配次数。当 SDS 需要减少字符串时,这部分内存不会立即被回收,会被记录下来,等待后续使用(支持手动释放,有对应的 API)。
  4. 二进制安全:C 语言中的字符串以空字符 \0 作为字符串结束的标识,这存在一些问题,像一些二进制文件(比如图片、视频、音频)就可能包括空字符,C 字符串无法正确保存。SDS 使用 len 属性判断字符串是否结束,不存在这个问题。

🤐 多提一嘴,很多文章里 SDS 的定义是下面这样的:

1
2
3
4
5
struct sdshdr {
unsigned int len;
unsigned int free;
char buf[];
};

这个也没错,Redis 3.2 之前就是这样定义的。后来,由于这种方式的定义存在问题,len 和 free 的定义用了 4 个字节,造成了浪费。Redis 3.2 之后,Redis 改进了 SDS 的定义,将其划分为了现在的 5 种类型。

购物车信息用 String 还是 Hash 存储更好呢?

由于购物车中的商品频繁修改和变动,购物车信息建议使用 Hash 存储:

  • 用户 id 为 key
  • 商品 id 为 field,商品数量为 value

Hash维护简单的购物车信息

那用户购物车信息的维护具体应该怎么操作呢?

  • 用户添加商品就是往 Hash 里面增加新的 field 与 value;
  • 查询购物车信息就是遍历对应的 Hash;
  • 更改商品数量直接修改对应的 value 值(直接 set 或者做运算皆可);
  • 删除商品就是删除 Hash 中对应的 field;
  • 清空购物车直接删除对应的 key 即可。

这里只是以业务比较简单的购物车场景举例,实际电商场景下,field 只保存一个商品 id 是没办法满足需求的。

使用 Redis 实现一个排行榜怎么做?

Redis 中有一个叫做 Sorted Set (有序集合)的数据类型经常被用在各种排行榜的场景,比如直播间送礼物的排行榜、朋友圈的微信步数排行榜、王者荣耀中的段位排行榜、话题热度排行榜等等。

相关的一些 Redis 命令: ZRANGE (从小到大排序)、 ZREVRANGE (从大到小排序)、ZREVRANK (指定元素排名)。

《Java 面试指北》 的「技术面试题篇」就有一篇文章详细介绍如何使用 Sorted Set 来设计制作一个排行榜,感兴趣的小伙伴可以看看。

Redis 的有序集合底层为什么要用跳表,而不用平衡树、红黑树或者 B+树?

这道面试题很多大厂比较喜欢问,难度还是有点大的。

  • 平衡树 vs 跳表:平衡树的插入、删除和查询的时间复杂度和跳表一样都是 **O(log n)**。对于范围查询来说,平衡树也可以通过中序遍历的方式达到和跳表一样的效果。但是它的每一次插入或者删除操作都需要保证整颗树左右节点的绝对平衡,只要不平衡就要通过旋转操作来保持平衡,这个过程是比较耗时的。跳表诞生的初衷就是为了克服平衡树的一些缺点。跳表使用概率平衡而不是严格强制的平衡,因此,跳表中的插入和删除算法比平衡树的等效算法简单得多,速度也快得多。
  • 红黑树 vs 跳表:相比较于红黑树来说,跳表的实现也更简单一些,不需要通过旋转和染色(红黑变换)来保证黑平衡。并且,按照区间来查找数据这个操作,红黑树的效率没有跳表高。
  • B+树 vs 跳表:B+树更适合作为数据库和文件系统中常用的索引结构之一,它的核心思想是通过可能少的 IO 定位到尽可能多的索引来获得查询数据。对于 Redis 这种内存数据库来说,它对这些并不感冒,因为 Redis 作为内存数据库它不可能存储大量的数据,所以对于索引不需要通过 B+树这种方式进行维护,只需按照概率进行随机维护即可,节约内存。而且使用跳表实现 zset 时相较前者来说更简单一些,在进行插入时只需通过索引将数据插入到链表中合适的位置再随机维护一定高度的索引即可,也不需要像 B+树那样插入时发现失衡时还需要对节点分裂与合并。

另外,我还单独写了一篇文章从有序集合的基本使用到跳表的源码分析和实现,让你会对 Redis 的有序集合底层实现的跳表有着更深刻的理解和掌握 :Redis 为什么用跳表实现有序集合。

Set 的应用场景是什么?

Redis 中 Set 是一种无序集合,集合中的元素没有先后顺序但都唯一,有点类似于 Java 中的 HashSet 。

Set 的常见应用场景如下:

  • 存放的数据不能重复的场景:网站 UV 统计(数据量巨大的场景还是 HyperLogLog更适合一些)、文章点赞、动态点赞等等。
  • 需要获取多个数据源交集、并集和差集的场景:共同好友(交集)、共同粉丝(交集)、共同关注(交集)、好友推荐(差集)、音乐推荐(差集)、订阅号推荐(差集+交集) 等等。
  • 需要随机获取数据源中的元素的场景:抽奖系统、随机点名等等。

使用 Set 实现抽奖系统怎么做?

如果想要使用 Set 实现一个简单的抽奖系统的话,直接使用下面这几个命令就可以了:

  • SADD key member1 member2 ...:向指定集合添加一个或多个元素。
  • SPOP key count:随机移除并获取指定集合中一个或多个元素,适合不允许重复中奖的场景。
  • SRANDMEMBER key count : 随机获取指定集合中指定数量的元素,适合允许重复中奖的场景。

使用 Bitmap 统计活跃用户怎么做?

Bitmap 存储的是连续的二进制数字(0 和 1),通过 Bitmap, 只需要一个 bit 位来表示某个元素对应的值或者状态,key 就是对应元素本身 。我们知道 8 个 bit 可以组成一个 byte,所以 Bitmap 本身会极大的节省储存空间。

你可以将 Bitmap 看作是一个存储二进制数字(0 和 1)的数组,数组中每个元素的下标叫做 offset(偏移量)。

img

如果想要使用 Bitmap 统计活跃用户的话,可以使用日期(精确到天)作为 key,然后用户 ID 为 offset,如果当日活跃过就设置为 1。

初始化数据:

1
2
3
4
5
6
> SETBIT 20210308 1 1
(integer) 0
> SETBIT 20210308 2 1
(integer) 0
> SETBIT 20210309 1 1
(integer) 0

统计 20210308~20210309 总活跃用户数:

1
2
3
4
> BITOP and desk1 20210308 20210309
(integer) 1
> BITCOUNT desk1
(integer) 1

统计 20210308~20210309 在线活跃用户数:

1
2
3
4
> BITOP or desk2 20210308 20210309
(integer) 1
> BITCOUNT desk2
(integer) 2

使用 HyperLogLog 统计页面 UV 怎么做?

使用 HyperLogLog 统计页面 UV 主要需要用到下面这两个命令:

  • PFADD key element1 element2 ...:添加一个或多个元素到 HyperLogLog 中。
  • PFCOUNT key1 key2:获取一个或者多个 HyperLogLog 的唯一计数。

1、将访问指定页面的每个用户 ID 添加到 HyperLogLog 中。

1
PFADD PAGE_1:UV USER1 USER2 ...... USERn

2、统计指定页面的 UV。

1
PFCOUNT PAGE_1:UV

Redis 持久化机制(重要)

Redis 持久化机制(RDB 持久化、AOF 持久化、RDB 和 AOF 的混合持久化) 相关的问题比较多,也比较重要,于是我单独抽了一篇文章来总结 Redis 持久化机制相关的知识点和问题:Redis 持久化机制详解 。

Redis 线程模型(重要)

对于读写命令来说,Redis 一直是单线程模型。不过,在 Redis 4.0 版本之后引入了多线程来执行一些大键值对的异步删除操作, Redis 6.0 版本之后引入了多线程来处理网络请求(提高网络 IO 读写性能)。

Redis 单线程模型了解吗?

Redis 基于 Reactor 模式设计开发了一套高效的事件处理模型 (Netty 的线程模型也基于 Reactor 模式,Reactor 模式不愧是高性能 IO 的基石),这套事件处理模型对应的是 Redis 中的文件事件处理器(file event handler)。由于文件事件处理器(file event handler)是单线程方式运行的,所以我们一般都说 Redis 是单线程模型。

《Redis 设计与实现》有一段话是这样介绍文件事件处理器的,我觉得写得挺不错。

Redis 基于 Reactor 模式开发了自己的网络事件处理器:这个处理器被称为文件事件处理器(file event handler)。

  • 文件事件处理器使用 I/O 多路复用(multiplexing)程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
  • 当被监听的套接字准备好执行连接应答(accept)、读取(read)、写入(write)、关 闭(close)等操作时,与操作相对应的文件事件就会产生,这时文件事件处理器就会调用套接字之前关联好的事件处理器来处理这些事件。

虽然文件事件处理器以单线程方式运行,但通过使用 I/O 多路复用程序来监听多个套接字,文件事件处理器既实现了高性能的网络通信模型,又可以很好地与 Redis 服务器中其他同样以单线程方式运行的模块进行对接,这保持了 Redis 内部单线程设计的简单性。

既然是单线程,那怎么监听大量的客户端连接呢?

Redis 通过 IO 多路复用程序 来监听来自客户端的大量连接(或者说是监听多个 socket),它会将感兴趣的事件及类型(读、写)注册到内核中并监听每个事件是否发生。

这样的好处非常明显:I/O 多路复用技术的使用让 Redis 不需要额外创建多余的线程来监听客户端的大量连接,降低了资源的消耗(和 NIO 中的 Selector 组件很像)。

文件事件处理器(file event handler)主要是包含 4 个部分:

  • 多个 socket(客户端连接)
  • IO 多路复用程序(支持多个客户端连接的关键)
  • 文件事件分派器(将 socket 关联到相应的事件处理器)
  • 事件处理器(连接应答处理器、命令请求处理器、命令回复处理器)

文件事件处理器(file event handler)

相关阅读:Redis 事件机制详解 。

Redis6.0 之前为什么不使用多线程?

虽然说 Redis 是单线程模型,但实际上,Redis 在 4.0 之后的版本中就已经加入了对多线程的支持。

不过,Redis 4.0 增加的多线程主要是针对一些大键值对的删除操作的命令,使用这些命令就会使用主线程之外的其他线程来“异步处理”,从而减少对主线程的影响。

为此,Redis 4.0 之后新增了几个异步命令:

  • UNLINK:可以看作是 DEL 命令的异步版本。
  • FLUSHALL ASYNC:用于清空所有数据库的所有键,不限于当前 SELECT 的数据库。
  • FLUSHDB ASYNC:用于清空当前 SELECT 数据库中的所有键。

redis4.0 more thread

总的来说,直到 Redis 6.0 之前,Redis 的主要操作仍然是单线程处理的。

那 Redis6.0 之前为什么不使用多线程? 我觉得主要原因有 3 点:

  • 单线程编程容易并且更容易维护;
  • Redis 的性能瓶颈不在 CPU ,主要在内存和网络;
  • 多线程就会存在死锁、线程上下文切换等问题,甚至会影响性能。

相关阅读:为什么 Redis 选择单线程模型? 。

Redis6.0 之后为何引入了多线程?

Redis6.0 引入多线程主要是为了提高网络 IO 读写性能,因为这个算是 Redis 中的一个性能瓶颈(Redis 的瓶颈主要受限于内存和网络)。

虽然,Redis6.0 引入了多线程,但是 Redis 的多线程只是在网络数据的读写这类耗时操作上使用了,执行命令仍然是单线程顺序执行。因此,你也不需要担心线程安全问题。

Redis6.0 的多线程默认是禁用的,只使用主线程。如需开启需要设置 IO 线程数 > 1,需要修改 redis 配置文件 redis.conf:

1
io-threads 4 #设置1的话只会开启主线程,官网建议4核的机器建议设置为2或3个线程,8核的建议设置为6个线程

另外:

  • io-threads 的个数一旦设置,不能通过 config 动态设置。
  • 当设置 ssl 后,io-threads 将不工作。

开启多线程后,默认只会使用多线程进行 IO 写入 writes,即发送数据给客户端,如果需要开启多线程 IO 读取 reads,同样需要修改 redis 配置文件 redis.conf :

1
io-threads-do-reads yes

但是官网描述开启多线程读并不能有太大提升,因此一般情况下并不建议开启

相关阅读:

  • Redis 6.0 新特性-多线程连环 13 问!
  • Redis 多线程网络模型全面揭秘(推荐)

Redis 后台线程了解吗?

我们虽然经常说 Redis 是单线程模型(主要逻辑是单线程完成的),但实际还有一些后台线程用于执行一些比较耗时的操作:

  • 通过 bio_close_file 后台线程来释放 AOF / RDB 等过程中产生的临时文件资源。
  • 通过 bio_aof_fsync 后台线程调用 fsync 函数将系统内核缓冲区还未同步到到磁盘的数据强制刷到磁盘( AOF 文件)。
  • 通过 bio_lazy_free后台线程释放大对象(已删除)占用的内存空间.

在bio.h 文件中有定义(Redis 6.0 版本,源码地址:https://github.com/redis/redis/blob/6.0/src/bio.h):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#ifndef __BIO_H
#define __BIO_H

/* Exported API */
void bioInit(void);
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
unsigned long long bioPendingJobsOfType(int type);
unsigned long long bioWaitStepOfType(int type);
time_t bioOlderJobOfType(int type);
void bioKillThreads(void);

/* Background job opcodes */
#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
#define BIO_NUM_OPS 3

#endif

关于 Redis 后台线程的详细介绍可以查看 Redis 6.0 后台线程有哪些? 这篇就文章。

Redis 内存管理

Redis 给缓存数据设置过期时间有什么用?

一般情况下,我们设置保存的缓存数据的时候都会设置一个过期时间。为什么呢?

内存是有限且珍贵的,如果不对缓存数据设置过期时间,那内存占用就会一直增长,最终可能会导致 OOM 问题。通过设置合理的过期时间,Redis 会自动删除暂时不需要的数据,为新的缓存数据腾出空间。

Redis 自带了给缓存数据设置过期时间的功能,比如:

1
2
3
4
5
6
127.0.0.1:6379> expire key 60 # 数据在 60s 后过期
(integer) 1
127.0.0.1:6379> setex key 60 value # 数据在 60s 后过期 (setex:[set] + [ex]pire)
OK
127.0.0.1:6379> ttl key # 查看数据还有多久过期
(integer) 56

注意 ⚠️:Redis 中除了字符串类型有自己独有设置过期时间的命令 setex 外,其他方法都需要依靠 expire 命令来设置过期时间 。另外, persist 命令可以移除一个键的过期时间。

过期时间除了有助于缓解内存的消耗,还有什么其他用么?

很多时候,我们的业务场景就是需要某个数据只在某一时间段内存在,比如我们的短信验证码可能只在 1 分钟内有效,用户登录的 Token 可能只在 1 天内有效。

如果使用传统的数据库来处理的话,一般都是自己判断过期,这样更麻烦并且性能要差很多。

Redis 是如何判断数据是否过期的呢?

Redis 通过一个叫做过期字典(可以看作是 hash 表)来保存数据过期的时间。过期字典的键指向 Redis 数据库中的某个 key(键),过期字典的值是一个 long long 类型的整数,这个整数保存了 key 所指向的数据库键的过期时间(毫秒精度的 UNIX 时间戳)。

Redis 过期字典

过期字典是存储在 redisDb 这个结构里的:

1
2
3
4
5
6
7
typedef struct redisDb {
...

dict *dict; //数据库键空间,保存着数据库中所有键值对
dict *expires // 过期字典,保存着键的过期时间
...
} redisDb;

在查询一个 key 的时候,Redis 首先检查该 key 是否存在于过期字典中(时间复杂度为 O(1)),如果不在就直接返回,在的话需要判断一下这个 key 是否过期,过期直接删除 key 然后返回 null。

Redis 过期 key 删除策略了解么?

如果假设你设置了一批 key 只能存活 1 分钟,那么 1 分钟后,Redis 是怎么对这批 key 进行删除的呢?

常用的过期数据的删除策略就下面这几种:

  1. 惰性删除:只会在取出/查询 key 的时候才对数据进行过期检查。这种方式对 CPU 最友好,但是可能会造成太多过期 key 没有被删除。
  2. 定期删除:周期性地随机从设置了过期时间的 key 中抽查一批,然后逐个检查这些 key 是否过期,过期就删除 key。相比于惰性删除,定期删除对内存更友好,对 CPU 不太友好。
  3. 延迟队列:把设置过期时间的 key 放到一个延迟队列里,到期之后就删除 key。这种方式可以保证每个过期 key 都能被删除,但维护延迟队列太麻烦,队列本身也要占用资源。
  4. 定时删除:每个设置了过期时间的 key 都会在设置的时间到达时立即被删除。这种方法可以确保内存中不会有过期的键,但是它对 CPU 的压力最大,因为它需要为每个键都设置一个定时器。

Redis 采用的那种删除策略呢?

Redis 采用的是 定期删除+惰性/懒汉式删除 结合的策略,这也是大部分缓存框架的选择。定期删除对内存更加友好,惰性删除对 CPU 更加友好。两者各有千秋,结合起来使用既能兼顾 CPU 友好,又能兼顾内存友好。

下面是我们详细介绍一下 Redis 中的定期删除具体是如何做的。

Redis 的定期删除过程是随机的(周期性地随机从设置了过期时间的 key 中抽查一批),所以并不保证所有过期键都会被立即删除。这也就解释了为什么有的 key 过期了,并没有被删除。并且,Redis 底层会通过限制删除操作执行的时长和频率来减少删除操作对 CPU 时间的影响。

另外,定期删除还会受到执行时间和过期 key 的比例的影响:

  • 执行时间已经超过了阈值,那么就中断这一次定期删除循环,以避免使用过多的 CPU 时间。
  • 如果这一批过期的 key 比例超过一个比例,就会重复执行此删除流程,以更积极地清理过期 key。相应地,如果过期的 key 比例低于这个比例,就会中断这一次定期删除循环,避免做过多的工作而获得很少的内存回收。

Redis 7.2 版本的执行时间阈值是 25ms,过期 key 比例设定值是 **10%**。

1
2
3
4
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
we do extra efforts. */

每次随机抽查数量是多少?

expire.c中定义了每次随机抽查的数量,Redis 7.2 版本为 20 ,也就是说每次会随机选择 20 个设置了过期时间的 key 判断是否过期。

1
#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */

如何控制定期删除的执行频率?

在 Redis 中,定期删除的频率是由 hz 参数控制的。hz 默认为 10,代表每秒执行 10 次,也就是每秒钟进行 10 次尝试来查找并删除过期的 key。

hz 的取值范围为 1~500。增大 hz 参数的值会提升定期删除的频率。如果你想要更频繁地执行定期删除任务,可以适当增加 hz 的值,但这会加 CPU 的使用率。根据 Redis 官方建议,hz 的值不建议超过 100,对于大部分用户使用默认的 10 就足够了。

下面是 hz 参数的官方注释,我翻译了其中的重要信息(Redis 7.2 版本)。

redis.conf 对于 hz 的注释

类似的参数还有一个 dynamic-hz,这个参数开启之后 Redis 就会在 hz 的基础上动态计算一个值。Redis 提供并默认启用了使用自适应 hz 值的能力,

这两个参数都在 Redis 配置文件 redis.conf中:

1
2
3
4
# 默认为 10
hz 10
# 默认开启
dynamic-hz yes

多提一嘴,除了定期删除过期 key 这个定期任务之外,还有一些其他定期任务例如关闭超时的客户端连接、更新统计信息,这些定期任务的执行频率也是通过 hz 参数决定。

为什么定期删除不是把所有过期 key 都删除呢?

这样会对性能造成太大的影响。如果我们 key 数量非常庞大的话,挨个遍历检查是非常耗时的,会严重影响性能。Redis 设计这种策略的目的是为了平衡内存和性能。

为什么 key 过期之后不立马把它删掉呢?这样不是会浪费很多内存空间吗?

因为不太好办到,或者说这种删除方式的成本太高了。假如我们使用延迟队列作为删除策略,这样存在下面这些问题:

  1. 队列本身的开销可能很大:key 多的情况下,一个延迟队列可能无法容纳。
  2. 维护延迟队列太麻烦:修改 key 的过期时间就需要调整期在延迟队列中的位置,并且,还需要引入并发控制。

大量 key 集中过期怎么办?

当 Redis 中存在大量 key 在同一时间点集中过期时,可能会导致以下问题:

  • 请求延迟增加: Redis 在处理过期 key 时需要消耗 CPU 资源,如果过期 key 数量庞大,会导致 Redis 实例的 CPU 占用率升高,进而影响其他请求的处理速度,造成延迟增加。
  • 内存占用过高: 过期的 key 虽然已经失效,但在 Redis 真正删除它们之前,仍然会占用内存空间。如果过期 key 没有及时清理,可能会导致内存占用过高,甚至引发内存溢出。

为了避免这些问题,可以采取以下方案:

  1. 尽量避免 key 集中过期: 在设置键的过期时间时尽量随机一点。
  2. 开启 lazy free 机制: 修改 redis.conf 配置文件,将 lazyfree-lazy-expire 参数设置为 yes,即可开启 lazy free 机制。开启 lazy free 机制后,Redis 会在后台异步删除过期的 key,不会阻塞主线程的运行,从而降低对 Redis 性能的影响。

Redis 内存淘汰策略了解么?

相关问题:MySQL 里有 2000w 数据,Redis 中只存 20w 的数据,如何保证 Redis 中的数据都是热点数据?

Redis 的内存淘汰策略只有在运行内存达到了配置的最大内存阈值时才会触发,这个阈值是通过redis.conf的maxmemory参数来定义的。64 位操作系统下,maxmemory 默认为 0 ,表示不限制内存大小。32 位操作系统下,默认的最大内存值是 3GB。

你可以使用命令 config get maxmemory 来查看 maxmemory的值。

1
2
3
> config get maxmemory
maxmemory
0

Redis 提供了 6 种内存淘汰策略:

  1. volatile-lru(least recently used):从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰。
  2. volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰。
  3. volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰。
  4. allkeys-lru(least recently used):从数据集(server.db[i].dict)中移除最近最少使用的数据淘汰。
  5. allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰。
  6. no-eviction(默认内存淘汰策略):禁止驱逐数据,当内存不足以容纳新写入数据时,新写入操作会报错。

4.0 版本后增加以下两种:

  1. volatile-lfu(least frequently used):从已设置过期时间的数据集(server.db[i].expires)中挑选最不经常使用的数据淘汰。
  2. allkeys-lfu(least frequently used):从数据集(server.db[i].dict)中移除最不经常使用的数据淘汰。

allkeys-xxx 表示从所有的键值中淘汰数据,而 volatile-xxx 表示从设置了过期时间的键值中淘汰数据。

config.c中定义了内存淘汰策略的枚举数组:

1
2
3
4
5
6
7
8
9
10
11
configEnum maxmemory_policy_enum[] = {
{"volatile-lru", MAXMEMORY_VOLATILE_LRU},
{"volatile-lfu", MAXMEMORY_VOLATILE_LFU},
{"volatile-random",MAXMEMORY_VOLATILE_RANDOM},
{"volatile-ttl",MAXMEMORY_VOLATILE_TTL},
{"allkeys-lru",MAXMEMORY_ALLKEYS_LRU},
{"allkeys-lfu",MAXMEMORY_ALLKEYS_LFU},
{"allkeys-random",MAXMEMORY_ALLKEYS_RANDOM},
{"noeviction",MAXMEMORY_NO_EVICTION},
{NULL, 0}
};

你可以使用 config get maxmemory-policy 命令来查看当前 Redis 的内存淘汰策略。

1
2
3
> config get maxmemory-policy
maxmemory-policy
noeviction

可以通过config set maxmemory-policy 内存淘汰策略 命令修改内存淘汰策略,立即生效,但这种方式重启 Redis 之后就失效了。修改 redis.conf 中的 maxmemory-policy 参数不会因为重启而失效,不过,需要重启之后修改才能生效。

1
maxmemory-policy noeviction

关于淘汰策略的详细说明可以参考 Redis 官方文档:https://redis.io/docs/reference/eviction/。

参考

  • 《Redis 开发与运维》
  • 《Redis 设计与实现》
  • 《Redis 核心原理与实战》
  • Redis 命令手册:https://www.redis.com.cn/commands.html
  • RedisSearch 终极使用指南,你值得拥有!:https://mp.weixin.qq.com/s/FA4XVAXJksTOHUXMsayy2g
  • WHY Redis choose single thread (vs multi threads): https://medium.com/@jychen7/sharing-redis-single-thread-vs-multi-threads-5870bd44d153

Kafka常见问题总结

发表于 2024-08-05 | 分类于 分布式 , 消息队列 | 阅读次数:
字数统计: 6.5k 字 | 阅读时长 ≈ 24 分钟

Kafka 基础

Kafka 是什么?主要应用场景有哪些?

Kafka 是一个分布式流式处理平台。这到底是什么意思呢?

流平台具有三个关键功能:

  1. 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
  2. 容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
  3. 流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。

Kafka 主要有两大应用场景:

  1. 消息队列:建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
  2. 数据处理: 构建实时的流数据处理程序来转换或处理数据流。

和其他消息队列相比,Kafka 的优势在哪里?

我们现在经常提到 Kafka 的时候就已经默认它是一个非常优秀的消息队列了,我们也会经常拿它跟 RocketMQ、RabbitMQ 对比。我觉得 Kafka 相比其他消息队列主要的优势如下:

  1. 极致的性能:基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。
  2. 生态系统兼容性无可匹敌:Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域。

实际上在早期的时候 Kafka 并不是一个合格的消息队列,早期的 Kafka 在消息队列领域就像是一个衣衫褴褛的孩子一样,功能不完备并且有一些小问题比如丢失消息、不保证消息可靠性等等。当然,这也和 LinkedIn 最早开发 Kafka 用于处理海量的日志有很大关系,哈哈哈,人家本来最开始就不是为了作为消息队列滴,谁知道后面误打误撞在消息队列领域占据了一席之地。

随着后续的发展,这些短板都被 Kafka 逐步修复完善。所以,Kafka 作为消息队列不可靠这个说法已经过时!

队列模型了解吗?Kafka 的消息模型知道吗?

题外话:早期的 JMS 和 AMQP 属于消息服务领域权威组织所做的相关的标准,我在 JavaGuide的 《消息队列其实很简单》这篇文章中介绍过。但是,这些标准的进化跟不上消息队列的演进速度,这些标准实际上已经属于废弃状态。所以,可能存在的情况是:不同的消息队列都有自己的一套消息模型。

队列模型:早期的消息模型

队列模型

使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。 比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

队列模型存在的问题:

假如我们存在这样一种情况:我们需要将生产者产生的消息分发给多个消费者,并且每个消费者都能接收到完整的消息内容。

这种情况,队列模型就不好解决了。很多比较杠精的人就说:我们可以为每个消费者创建一个单独的队列,让生产者发送多份。这是一种非常愚蠢的做法,浪费资源不说,还违背了使用消息队列的目的。

发布-订阅模型:Kafka 消息模型

发布-订阅模型主要是为了解决队列模型存在的问题。

发布订阅模型

发布订阅模型(Pub-Sub) 使用主题(Topic) 作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。

在发布 - 订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。所以说,发布 - 订阅模型在功能层面上是可以兼容队列模型的。

Kafka 采用的就是发布 - 订阅模型。

RocketMQ 的消息模型和 Kafka 基本是完全一样的。唯一的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)。

Kafka 核心概念

什么是 Producer、Consumer、Broker、Topic、Partition?

Kafka 将生产者发布的消息发送到 Topic(主题) 中,需要这些消息的消费者可以订阅这些 Topic(主题),如下图所示:

上面这张图也为我们引出了,Kafka 比较重要的几个概念:

  1. Producer(生产者) : 产生消息的一方。
  2. Consumer(消费者) : 消费消息的一方。
  3. Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。

同时,你一定也注意到每个 Broker 中又包含了 Topic 以及 Partition 这两个重要的概念:

  • Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
  • Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。这正如我上面所画的图一样。

划重点:Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列。这样是不是更好理解一点?

Kafka 的多副本机制了解吗?带来了什么好处?

还有一点我觉得比较重要的是 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。

生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。

Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?

  1. Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
  2. Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。

Zookeeper 和 Kafka

Zookeeper 在 Kafka 中的作用是什么?

要想搞懂 zookeeper 在 Kafka 中的作用 一定要自己搭建一个 Kafka 环境然后自己进 zookeeper 去看一下有哪些文件夹和 Kafka 有关,每个节点又保存了什么信息。 一定不要光看不实践,这样学来的也终会忘记!这部分内容参考和借鉴了这篇文章:https://www.jianshu.com/p/a036405f989c 。

下图就是我的本地 Zookeeper ,它成功和我本地的 Kafka 关联上(以下文件夹结构借助 idea 插件 Zookeeper tool 实现)。

ZooKeeper 主要为 Kafka 提供元数据的管理的功能。

从图中我们可以看出,Zookeeper 主要为 Kafka 做了下面这些事情:

  1. Broker 注册:在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去
  2. Topic 注册:在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0、/brokers/topics/my-topic/Partitions/1
  3. 负载均衡:上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
  4. ……

使用 Kafka 能否不引入 Zookeeper?

在 Kafka 2.8 之前,Kafka 最被大家诟病的就是其重度依赖于 Zookeeper。在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,让你可以以一种轻量级的方式来使用 Kafka。

不过,要提示一下:如果要使用 KRaft 模式的话,建议选择较高版本的 Kafka,因为这个功能还在持续完善优化中。Kafka 3.3.1 版本是第一个将 KRaft(Kafka Raft)共识协议标记为生产就绪的版本。

Kafka 消费顺序、消息丢失和重复消费

Kafka 如何保证消息的消费顺序?

我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:

  1. 更改用户会员等级。
  2. 根据会员等级计算订单价格。

假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。

我们知道 Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。

每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。 Kafka 只能为我们保证 Partition(分区) 中的消息有序。

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。

所以,我们就有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。

Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。

总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:

  1. 1 个 Topic 只对应一个 Partition。
  2. (推荐)发送消息的时候指定 key/Partition。

当然不仅仅只有上面两种方法,上面两种方法是我觉得比较好理解的,

Kafka 如何保证消息不丢失?

生产者丢失消息的情况

生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。

所以,我们不能默认在调用send方法发送消息之后消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send 方法发送消息实际上是异步的操作,我们可以通过 get()方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:

详细代码见我的这篇文章:Kafka 系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 作为消息队列?

1
2
3
4
5
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}

但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:

1
2
3
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));

如果消息发送失败的话,我们检查失败的原因之后重新发送即可!

另外,这里推荐为 Producer 的retries(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你 3 次一下子就重试完了。

消费者丢失消息的情况

我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

kafka offset

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。

解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后再自己手动提交 offset 。 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。

Kafka 弄丢了消息

我们知道 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。

试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。

设置 acks = all

解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。

acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。当我们配置 acks = all 表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应. 这种模式是最高级别的,也是最安全的,可以确保不止一个 Broker 接收到了消息. 该模式的延迟会很高.

设置 replication.factor >= 3

为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。

设置 min.insync.replicas > 1

一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。

但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1。

设置 unclean.leader.election.enable = false

Kafka 0.11.0.0 版本开始 unclean.leader.election.enable 参数的默认值由原来的 true 改为 false

我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

Kafka 如何保证消息不重复消费?

kafka 出现消息重复消费的原因:

  • 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
  • Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。

解决方案:

  • 消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。
  • 将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交 offset 合适?
    • 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
    • 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。

Kafka 重试机制

在 Kafka 如何保证消息不丢失这里,我们提到了 Kafka 的重试机制。由于这部分内容较为重要,我们这里再来详细介绍一下。

网上关于 Spring Kafka 的默认重试机制文章很多,但大多都是过时的,和实际运行结果完全不一样。以下是根据 spring-kafka-2.9.3 源码重新梳理一下。

消费失败会怎么样?

在消费过程中,当其中一个消息消费异常时,会不会卡住后续队列消息的消费?这样业务岂不是卡住了?

生产者代码:

1
2
3
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i))
}

消费者消代码:

1
2
3
4
5
6
7
8
@KafkaListener(topics = {KafkaConst.TEST_TOPIC},groupId = "apple")
private void customer(String message) throws InterruptedException {
log.info("kafka customer:{}",message);
Integer n = Integer.parseInt(message);
if (n%5==0){
throw new RuntimeException();
}
}

在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。下面是一段消费的日志,可以看出当 test-0@95 重试多次后会被跳过。

1
2
3
4
2023-08-10 12:03:32.918 DEBUG 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Skipping seek of: test-0@95
2023-08-10 12:03:32.918 TRACE 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Seeking: test-0 to: 96
2023-08-10 12:03:32.918 INFO 9700 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-apple-1, groupId=apple] Seeking to offset 96 for partition test-0

因此,即使某个消息消费异常,Kafka 消费者仍然能够继续消费后续的消息,不会一直卡在当前消息,保证了业务的正常进行。

默认会重试多少次?

默认配置下,消费异常会进行重试,重试次数是多少, 重试是否有时间间隔?

看源码 FailedRecordTracker 类有个 recovered 函数,返回 Boolean 值判断是否要进行重试,下面是这个函数中判断是否重试的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public boolean recovered(ConsumerRecord << ? , ? > record, Exception exception,
@Nullable MessageListenerContainer container,
@Nullable Consumer << ? , ? > consumer) throws InterruptedException {

if (this.noRetries) {
// 不支持重试
attemptRecovery(record, exception, null, consumer);
return true;
}
// 取已经失败的消费记录集合
Map < TopicPartition, FailedRecord > map = this.failures.get();
if (map == null) {
this.failures.set(new HashMap < > ());
map = this.failures.get();
}
// 获取消费记录所在的Topic和Partition
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);
// 通知注册的重试监听器,消息投递失败
this.retryListeners.forEach(rl - >
rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get()));
// 获取下一次重试的时间间隔
long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
if (nextBackOff != BackOffExecution.STOP) {
this.backOffHandler.onNextBackOff(container, exception, nextBackOff);
return false;
} else {
attemptRecovery(record, exception, topicPartition, consumer);
map.remove(topicPartition);
if (map.isEmpty()) {
this.failures.remove();
}
return true;
}
}

其中, BackOffExecution.STOP 的值为 -1。

1
2
3
4
5
6
7
@FunctionalInterface
public interface BackOffExecution {

long STOP = -1;
long nextBackOff();

}

nextBackOff 的值调用 BackOff 类的 nextBackOff() 函数。如果当前执行次数大于最大执行次数则返回 STOP,既超过这个最大执行次数后才会停止重试。

1
2
3
4
5
6
7
8
9
public long nextBackOff() {
this.currentAttempts++;
if (this.currentAttempts <= getMaxAttempts()) {
return getInterval();
}
else {
return STOP;
}
}

那么这个 getMaxAttempts 的值又是多少呢?回到最开始,当执行出错会进入 DefaultErrorHandler 。DefaultErrorHandler 默认的构造函数是:

1
2
3
public DefaultErrorHandler() {
this(null, SeekUtils.DEFAULT_BACK_OFF);
}

SeekUtils.DEFAULT_BACK_OFF 定义的是:

1
2
3
public static final int DEFAULT_MAX_FAILURES = 10;

public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, DEFAULT_MAX_FAILURES - 1);

DEFAULT_MAX_FAILURES 的值是 10,currentAttempts 从 0 到 9,所以总共会执行 10 次,每次重试的时间间隔为 0。

最后,简单总结一下:Kafka 消费者在默认配置下会进行最多 10 次 的重试,每次重试的时间间隔为 0,即立即进行重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,消息将被视为消费失败。

如何自定义重试次数以及时间间隔?

从上面的代码可以知道,默认错误处理器的重试次数以及时间间隔是由 FixedBackOff 控制的,FixedBackOff 是 DefaultErrorHandler 初始化时默认的。所以自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。

1
2
3
4
5
6
7
8
9
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
// 自定义重试时间间隔以及次数
FixedBackOff fixedBackOff = new FixedBackOff(1000, 5);
factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff));
factory.setConsumerFactory(consumerFactory);
return factory;
}

如何在重试失败后进行告警?

自定义重试失败后逻辑,需要手动实现,以下是一个简单的例子,重写 DefaultErrorHandler 的 handleRemaining 函数,加上自定义的告警等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {

public DelErrorHandler(FixedBackOff backOff) {
super(null,backOff);
}

@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
super.handleRemaining(thrownException, records, consumer, container);
log.info("重试多次失败");
// 自定义操作
}
}

DefaultErrorHandler 只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler 接口。手动实现 CommonErrorHandler 就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。

重试失败后的数据如何再次处理?

当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?

死信队列(Dead Letter Queue,简称 DLQ) 是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被”丢弃”或”死亡”的情况。当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。

@RetryableTopic 是 Spring Kafka 中的一个注解,它用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(
attempts = "5",
backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
System.out.println(n);
}

当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。对于死信队列的处理,既可以用 @DltHandler 处理,也可以使用 @KafkaListener 重新消费。

参考

  • Kafka 官方文档:https://kafka.apache.org/documentation/
  • 极客时间—《Kafka 核心技术与实战》第 11 节:无消息丢失配置怎么实现?

约瑟夫环

发表于 2024-07-28 | 分类于 算法 | 阅读次数:
字数统计: 8 字 | 阅读时长 ≈ 1 分钟

https://zhuanlan.zhihu.com/p/35487124?utm_id=0

RabbitMQ常见问题总结

发表于 2024-07-22 | 分类于 分布式 , 消息队列 | 阅读次数:
字数统计: 5.1k 字 | 阅读时长 ≈ 18 分钟

本篇文章由 JavaGuide 收集自网络,原出处不明。

RabbitMQ 是什么?

RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ 是使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个 Broker 构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。

PS:也可能直接问什么是消息队列?消息队列就是一个使用队列来通信的组件。

RabbitMQ 特点?

  • 可靠性: RabbitMQ 使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
  • 灵活的路由 : 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能, RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起, 也可以通过插件机制来实现自己的交换器。
  • 扩展性: 多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。
  • 高可用性 : 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。
  • 多种协议: RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP, MQTT 等多种消息 中间件协议。
  • 多语言客户端 :RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。
  • 管理界面 : RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集 群中的节点等。
  • 插件机制 : RabbitMQ 提供了许多插件 , 以实现从多方面进行扩展,当然也可以编写自 己的插件。

RabbitMQ 核心概念?

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。

RabbitMQ 的整体模型架构如下:

图1-RabbitMQ 的整体模型架构

下面我会一一介绍上图中的一些概念。

Producer(生产者) 和 Consumer(消费者)

  • Producer(生产者) :生产消息的一方(邮件投递者)
  • Consumer(消费者) :消费消息的一方(邮件收件人)

消息一般由 2 部分组成:消息头(或者说是标签 Label)和 消息体。消息体也可以称为 payLoad ,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。生产者把消息交由 RabbitMQ 后,RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)。

Exchange(交换器)

在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。

Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。

RabbitMQ 的 Exchange(交换器) 有 4 种类型,不同的类型对应着不同的路由策略:direct(默认),fanout, topic, 和 headers,不同类型的 Exchange 转发消息的策略有所区别。这个会在介绍 Exchange Types(交换器类型) 的时候介绍到。

Exchange(交换器) 示意图如下:

Exchange(交换器) 示意图

生产者将消息发给交换器的时候,一般会指定一个 **RoutingKey(路由键)**,用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。

Binding(绑定) 示意图:

Binding(绑定) 示意图

生产者将消息发送给交换器时,需要一个 RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中。

Queue(消息队列)

Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。

RabbitMQ 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做。

Broker(消息中间件的服务节点)

对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

下图展示了生产者将消息存入 RabbitMQ Broker,以及消费者从 Broker 中消费数据的整个流程。

消息队列的运转过程

这样图 1 中的一些关于 RabbitMQ 的基本概念我们就介绍完毕了,下面再来介绍一下 Exchange Types(交换器类型) 。

Exchange Types(交换器类型)

RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP 规范里还提到两种 Exchange Type,分别为 system 与 自定义,这里不予以描述)。

1、fanout

fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。

2、direct

direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。

direct 类型交换器

以上图为例,如果发送消息的时候设置路由键为“warning”,那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置路由键为”Info”或者”debug”,消息只会路由到 Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。

direct 类型常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

3、topic

前面讲到 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

  • RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
  • BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
  • BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

topic 类型交换器

以上图为例:

  • 路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queue1 和 Queue2;
  • 路由键为 “com.hidden.client” 的消息只会路由到 Queue2 中;
  • 路由键为 “com.hidden.demo” 的消息只会路由到 Queue2 中;
  • 路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queue1 中;
  • 路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。

4、headers(不推荐)

headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

AMQP 是什么?

RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。

AMQP 协议的三层:

  • Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
  • Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
  • TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP 模型的三大组件:

  • **交换器 (Exchange)**:消息代理服务器中用于把消息路由到队列的组件。
  • **队列 (Queue)**:用来存储消息的数据结构,位于硬盘或内存中。
  • **绑定 (Binding)**:一套规则,告知交换器消息应该将消息投递给哪个队列。

说说生产者 Producer 和消费者 Consumer?

生产者 :

  • 消息生产者,就是投递消息的一方。
  • 消息一般包含两个部分:消息体(payload)和标签(Label)。

消费者:

  • 消费消息,也就是接收消息的一方。
  • 消费者连接到 RabbitMQ 服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。

说说 Broker 服务节点、Queue 队列、Exchange 交换器?

  • Broker:可以看做 RabbitMQ 的服务节点。一般情况下一个 Broker 可以看做一个 RabbitMQ 服务器。
  • Queue:RabbitMQ 的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
  • Exchange:生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃。

什么是死信队列?如何导致的?

DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

导致的死信的几种原因:

  • 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
  • 消息 TTL 过期。
  • 队列满了,无法再添加。

什么是延迟队列?RabbitMQ 怎么实现延迟队列?

延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:

  1. 通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
  2. 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。

也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。

什么是优先级队列?

RabbitMQ 自 V3.5.0 有优先级队列实现,优先级高的队列会先被消费。

可以通过x-max-priority参数来实现优先级队列。不过,当消费速度大于生产速度且 Broker 没有堆积的情况下,优先级显得没有意义。

RabbitMQ 有哪些工作模式?

  • 简单模式
  • work 工作模式
  • pub/sub 发布订阅模式
  • Routing 路由模式
  • Topic 主题模式

RabbitMQ 消息怎么传输?

由于 TCP 链接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈,所以 RabbitMQ 使用信道的方式来传输数据。信道(Channel)是生产者、消费者与 RabbitMQ 通信的渠道,信道是建立在 TCP 链接上的虚拟链接,且每条 TCP 链接上的信道数量没有限制。就是说 RabbitMQ 在一条 TCP 链接上建立成百上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个信道在 RabbitMQ 都有唯一的 ID,保证了信道私有性,每个信道对应一个线程使用。

如何保证消息的可靠性?

消息到 MQ 的过程中搞丢,MQ 自己搞丢,MQ 到消费过程中搞丢。

  • 生产者到 RabbitMQ:事务机制和 Confirm 机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。
  • RabbitMQ 自身:持久化、集群、普通模式、镜像模式。
  • RabbitMQ 到消费者:basicAck 机制、死信队列、消息补偿机制。

如何保证 RabbitMQ 消息的顺序性?

  • 拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue (消息队列)而已,确实是麻烦点;
  • 或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

如何保证 RabbitMQ 高可用的?

RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式。

普通集群模式

意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。

你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式

这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

这样的好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。

如何解决消息队列的延时以及过期失效问题?

RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上 12 点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

虚拟内存

发表于 2024-07-22 | 分类于 计算机基础 | 阅读次数:
字数统计: 151 字 | 阅读时长 ≈ 1 分钟

深入浅出:操作系统之虚拟内存 - 知乎 (zhihu.com)

深入理解计算机系统 - 虚拟内存 - 知乎 (zhihu.com)

虚拟内存可以让每个进程都能运行操作系统内存范围内大小的程序。也就是逻辑地址。通过MMU来进行管理。

对于进程来说,虚拟内存就是一张连续的内存空间,这个空间有些在主存中,有些在磁盘中

在每个进程里有页表来将物理内存和虚拟内存映射起来。

同时针对缺页处理置换算法。

Notes

发表于 2024-07-18 | 分类于 算法 | 阅读次数:
字数统计: 75 字 | 阅读时长 ≈ 1 分钟
  1. 以后实在想不出来的题可以先尝试换换数据结构,就是一开始试着确定数据结构,而不是算法,因为数据结构是算法成功运作的前提。比如哈希表、栈、队列、树,进一步地单调栈、单调队列。

二分法

发表于 2024-07-17 | 分类于 算法 | 阅读次数:
字数统计: 15 字 | 阅读时长 ≈ 1 分钟

参考【吃透二分法系列】:从入门到精通

<i class="fa fa-angle-left"></i>1…345…27<i class="fa fa-angle-right"></i>

264 日志
34 分类
38 标签
GitHub Zhihu Wechat
© 2024 史海杰 | Site words total count: 722k
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4