阻塞队列LinkedBlockingQueue
# 前言
LinkedBlockingQueue内部基于链表实现,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以LinkedBlockingQueue也被称作无界队列,代表几乎没有界限。LinkedBlockingQueue也支持指定长度。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
重要方法
public void put(E e) throws InterruptedException
put(E e) 表示添加元素
public E take() throws InterruptedException
take() 表示取出元素
构造方法
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
1
2
3
4
5
2
3
4
5
重要属性
private final int capacity; // 容量,指定容量就是有界队列
private final AtomicInteger count = new AtomicInteger(); // 元素数量
transient Node<E> head; // 链表头
private transient Node<E> last; // 链表尾
private final ReentrantLock takeLock = new ReentrantLock(); // 取出元素的锁
private final Condition notFull = putLock.newCondition(); // 添加元素的锁
// 消费者(内置条件队列)
private final Condition notEmpty = takeLock.newCondition();
// 生产者(内置条件队列)
private final ReentrantLock putLock = new ReentrantLock();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
单链表结构
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
1
2
3
4
5
2
3
4
5
# 1 添加元素
public void put(E e) throws InterruptedException {
//不允许添加空元素
if (e == null) throw new NullPointerException();
int c = -1;
//新建节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//用putLock加锁
putLock.lockInterruptibly();
try {
//阻塞队列元素个数等于容量时,生产者挂起
while (count.get() == capacity) {
notFull.await();
}
//阻塞队列元素个数不等于容量时,入队
enqueue(node);
//总数加1,返回旧值
c = count.getAndIncrement();
//总数小于容量时,唤醒生产者。相当于提前唤醒,不需要等到取元素的时候才唤醒
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放生产的锁
putLock.unlock();
}
//当原队列元素个数为0(当前个数为1),立即唤醒消费者
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
//当前节点插入到队列尾部,并且队列尾部指针指向当前节点
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
//用takeLock加锁
takeLock.lock();
try {
//唤醒消费者
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 2 取出元素
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//用takeLock加锁
takeLock.lockInterruptibly();
try {
//阻塞队列元素个数为0时,消费者挂起
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
//总数减1,返回旧值
c = count.getAndDecrement();
//阻塞队列有元素时,唤醒消费者
if (c > 1)
notEmpty.signal();
} finally {
//释放消费的锁
takeLock.unlock();
}
//队列元素快满了的时候(总数-1),立即唤醒生产者
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;
//first = 头结点的后继节点
Node<E> first = h.next;
//准备回收原头节点
h.next = h; // help GC
//头结点指针指向后继节点
head = first;
//返回的元素从原头结点的后继节点中获取
E x = first.item;
//后继节点的元素被取出后清空(此时first已经成为头结点)
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
//putLock加锁
putLock.lock();
try {
//唤醒生产者
notFull.signal();
} finally {
putLock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
上次更新: 2022/11/24, 17:59:25