抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

java并发工具类-队列

BlockingQueue

java.util.concurrent包中的Java BlockingQueue接口表示一个线程可以安全放入以及从中获取实例的队列。在本文中,我将向你展示如何使用BlockingQueue

使用

一个BlockingQueue通常用于在线程上生成对象,另一个线程消耗对象。这是一个说明这个原则的图表:

​ 生产线程将一直生成新对象并将它们插入队列,直到达到队列的容量上限。如果阻塞队列达到其上限,则在尝试插入新对象时会阻止生产线程。它将一直被阻塞,直到消费线程将一个对象从队列中取出。

​ 消费线程不断将对象从阻塞队列中取出并处理它们。如果消费线程试图将对象从空队列中取出实例,那么消费线程将被阻塞,直到生产线程向队列放入一个对象。

方法

BlockingQueue有4组不同的方法用于插入,删除和检查队列中的元素。当不能立即执行所请求的操作时,每组方法的行为会不同。这是一个方法表:

抛出异常 返回特殊值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
删除 remove(o) poll() take() poll(timeout, timeunit)
访问 element() peek()
方法行为

这4种不同的行为意味着:

抛出异常

如果请求的操作现在无法完成,则抛出异常。

特殊值

如果请求的操作现在无法完成,则返回特殊值(一般为 true / false).

阻塞

如果请求的操作现在无法完成,则方法调用将阻塞,直到操作能够进行。

超时

​ 如果请求的操作现在无法完成,则方法调用将阻塞直到它能够进行,但等待不超过给定的超时。返回一个特殊值,告知操作是否成功(通常为true / false)

注意

​ 无法插入nullBlockingQueue中。如果你尝试插入nullBlockingQueue则会抛出一个NullPointerException异常。

​ 你可以访问BlockingQueue内的所有元素,而不仅仅是开头和结尾的元素。例如,假设你已将一个对象入队等待处理,但你的应用程序决定取消它。你可以调用remove(o)这样的操作来删除队列中的特定对象。但是,这是个效率很低的操作,所以除非你真的需要,否则你不应该使用Collection中的这些方法。

示例

这是一个Java BlockingQueue示例。该示例使用实现BlockingQueue接口的ArrayBlockingQueue类。

BlockingQueueExample

首先, BlockingQueueExample类在不同的线程中启动ProducerConsumerProducer将一个字符串插入共享的BlockingQueue,而Consumer使用它们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class BlockingQueueExample {

public static void main(String[] args) throws Exception {

BlockingQueue queue = new ArrayBlockingQueue(1024);

Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);

new Thread(producer).start();
new Thread(consumer).start();

Thread.sleep(4000);
}
}
Producer

这是Producer类。注意每次put()调用之间它都会睡一秒钟。这将导致Consumer阻塞,为了等待获取队列中的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Producer implements Runnable{

protected BlockingQueue queue = null;

public Producer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Consumer

这是Consumer类。它从队列中取出对象,然后将它们打印到System.out

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Consumer implements Runnable{

protected BlockingQueue queue = null;

public Consumer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试

下面是一个测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import org.junit.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingExample {
@Test
public void test() throws Exception {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);

Thread thread1 = new Thread(producer);
Thread thread2 = new Thread(consumer);

thread1.start();
thread2.start();

thread1.join();
thread2.join();
}

private static class Producer implements Runnable {
private BlockingQueue<String> queue;

Producer(BlockingQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private static class Consumer implements Runnable {
private BlockingQueue<String> queue = null;

Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

BlockingDeque

java.util.concurrent中的BlockingDeque接口表示一个双向队列,它可以被线程安全的放入以及从中获取实例。在本文中,我将向你展示如何使用BlockingDeque

BlockingDeque类是一个Deque,它会阻塞尝试在deque中插入或删除的线程,以防它能够向队列中插入或删除元素。

​ deque是“Double Ended Queue”(双端队列)的缩写。因此,deque是一个队列,你可以从它的两端插入和获取元素。

用法

如果线程同时生成和使用同一队列的元素,则可以使用BlockingDeque。如果生成线程需要在队列的两端插入元素,并且消费线程需要从队列的两端移除元素,那么也可以使用它:

​ 线程将生成元素并将它们插入队列的任一端。如果deque当前已满,则插入线程将被阻塞,直到删除线程将元素从双端队列中取出。如果deque当前为空,则将阻止删除线程,直到插入线程将元素插入到双端队列中。

方法

BlockingDeque有4组不同的方法用于插入,移除以及检查双端队列中的元素。如果不能立即执行所请求的操作,则每组方法的行为都不同。这是一个方法表:

抛出异常 返回特殊值 阻塞 超时
Insert addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
Remove removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
Examine getFirst(o) peekFirst(o)
抛出异常 返回特殊值 阻塞 超时
Insert addLast(o) offerLast(o) putLast(o) offerLast(o, timeout, timeunit)
Remove removeLast(o) pollLast(o) takeLast(o) pollLast(timeout, timeunit)
Examine getLast(o) peekLast(o)

这与BlockingQueue类似,只多了一组方法。

继承自 BlockingQueue

BlockingDeque接口扩展BlockingQueue接口。这意味着你可以使用BlockingDeque作为BlockingQueue。如果这样做,各种插入方法会将元素添加到双端队列的末尾,而删除方法将从双端队列的开头删除元素,即BlockingQueue接口的插入和删除方法。

下面是一个表格,对应了BlockingQueueBlockingDeque的方法:

BlockingQueue BlockingDeque
add() addLast()
offer() x 2 offerLast() x 2
put() putLast()
remove() removeFirst()
poll() x 2 pollFirst() x 2
take() takeFirst()
element() getFirst()
peek() peekFirst()

示例

这是一个如何使用BlockingDeque方法的小代码示例:

1
2
3
4
5
6
7
BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

deque.addFirst("1");
deque.addLast("2");

String two = deque.takeLast();
String one = deque.takeFirst();

ArrayBlockingQueue

ArrayBlockingQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

ArrayBlockingQueue是一个有界的阻塞队列,它将元素存储在数组内部。有界意味着它无法存储无限量的元素,它可以同时存储的元素数量有一个上限。你需要在实例化时设置上限,之后无法更改,所以它和ArrayList有些区别,不要因为它们的名称相似而将它们的功能混杂。

ArrayBlockingQueue内部是以FIFO(先入先出)次序来存储元素的。队列的头部是在队列中存活时间最长的元素,而队列的尾部是在队列中存活时间最短的元素。

示例

以下是实例化和使用ArrayBlockingQueue的例子:

1
2
3
4
5
BlockingQueue queue = new ArrayBlockingQueue(1024);

queue.put("1");

Object object = queue.take();

这是一个使用Java 泛型的BlockingQueue例子:

1
2
3
4
5
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);

queue.put("1");

String string = queue.take();

源码

成员变量

ArrayBlockingQueue中使用了这几个成员变量来保证操作,其实内部使用了一个循环数组,其中takeIndex和putIndex其实相当于队列的头部和尾部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** 使用数组保存元素 */
final Object[] items;

/** 下一个take,poll,peek或remove方法调用时访问此下标的元素 */
int takeIndex;

/** 下一个put, offer, 或add方法调用时访问此下标的元素 */
int putIndex;

/**队列中的元素数量 */
int count;

/** 保护所有操作的主锁 */
final ReentrantLock lock;

/** 获取元素的等待条件 */
private final Condition notEmpty;

/** 放置元素的等待条件 */
private final Condition notFull;
构造函数

构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 使用一个固定的数值和默认的访问规则创建,默认是使用非公平锁
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

/**
* 使用一个固定的数值和指定的访问规则创建
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

/**
*使用一个固定的数值和指定的访问规则创建,并将给定集合中的元素
* 增加到队列中,增加的顺序是指定的集合迭代器的遍历顺序
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @param c the collection of elements to initially contain
* @throws IllegalArgumentException if {@code capacity} is less than
* {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
final Object[] items = this.items;
int i = 0;
try {
for (E e : c)
items[i++] = Objects.requireNonNull(e);
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
增加操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public boolean add(E e) {
return super.add(e);
}

public boolean add(E e) {
// 内部重用offer方法
if (offer(e))
return true;
// 如果增加失败,抛出异常指示队列已满
else
throw new IllegalStateException("Queue full");
}

-------------------------------------------------------------------------

public boolean offer(E e) {
// 检查是否是否为null,如果是抛出NPE异常
Objects.requireNonNull(e);
// 加锁。 此处使用final的原因是将成员变量赋值为局部变量,
// 然后使用此变量就不需要经过两次访问,即先访问this,再
// 访问lock,轻微提升程序性能,后面此种方法的使用也是一样。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列满了,返回false
if (count == items.length)
return false;
// 否则,加入队列
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

public static <T> T requireNonNull(T obj) {
if (obj == null)
throw new NullPointerException();
return obj;
}

private void enqueue(E e) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;

final Object[] items = this.items;
// 插入元素
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
// 随机通知一个等待的线程
notEmpty.signal();
}

-------------------------------------------------------------------------

// 阻塞方法
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已经,在notFull上阻塞自己等待通知
// 关于等待-通知机制已经说过很多次,此处不再多说
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

-------------------------------------------------------------------------

// 超时方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

Objects.requireNonNull(e);
// 计算超时时间,转换为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列已满,超时等待,如果时间用完,返回false
while (count == items.length) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
删除操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// 删除指定元素
public boolean remove(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列中存在元素
if (count > 0) {
final Object[] items = this.items;
// 注意此处精彩的循环使用,因为内部是一个循环数组
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++)
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (to == end) break;
}
}
return false;
} finally {
lock.unlock();
}
}

void removeAt(final int removeIndex) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;

final Object[] items = this.items;
// 如果删除的是头元素,只需修改头元素下标即可
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
// 此处是为了保持迭代器与队列的一致性
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove

// slide over all others up through putIndex.
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0;
// 如果已经移到了最后一个元素,跳出循环
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
// 将元素前移一位
items[pred] = items[i];
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}

-------------------------------------------------------------------------

public E remove() {
// 重用poll方法,如果队列为空,抛出异常
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

-------------------------------------------------------------------------

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;

final Object[] items = this.items;
// 获取头元素,因为使用Object[]保存,所以要进行类型转换
// 因为只能增加指定类型的元素,所以可以确保类型转换一定
// 会成功,抑制此非受检警告
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return e;
}

-------------------------------------------------------------------------

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

-------------------------------------------------------------------------

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

阻塞方法以及超时方法和增加操作一样,此处不多做讲解。

访问操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// element()方法在AbstractQueue<E>类中,ArrayBlockingQueue继承自此类
public E element() {
// 重用peek方法,如果队列为空抛出异常
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

-------------------------------------------------------------------------

public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}

final E itemAt(int i) {
return (E) items[i];
}
辅助方法

部分方法逻辑简单,有兴趣自己查看即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k;
// 如果队列中存在元素,清空队列
if ((k = count) > 0) {
circularClear(items, takeIndex, putIndex);
takeIndex = putIndex;
count = 0;
// 使迭代器保持一致
if (itrs != null)
itrs.queueIsEmpty();
// 如果有线程等待插入元素,唤醒
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}

// 将存在的元素全部置为null即可,等待 gc回收它们,此时等于清空了队列。
private static void circularClear(Object[] items, int i, int end) {
// assert 0 <= i && i < items.length;
// assert 0 <= end && end < items.length;

for (int to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++) items[i] = null;
if (to == end) break;
}
}

-------------------------------------------------------------------------

public int drainTo(Collection<? super E> c) {
// 重用drainTo(Collection<? super E> c, int maxElements)方法
return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
// 如果指定的集合是自己,抛出异常,符合BlockingQueue接口文档中的定义
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取需要转移的元素数量
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
// 通过直接访问数组,比重复调用poll()方法再增加性能会高很多
while (i < n) {
@SuppressWarnings("unchecked")
E e = (E) items[take];
c.add(e);
items[take] = null;
if (++take == items.length) take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
// 做一些处理工作
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
核心要点
  1. 内部使用了一个循环数组
  2. 是一个有界数组,提供了容量后无法被更改
  3. 可以指定锁的公平性

DelayQueue

DelayQueue类实现BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

DelayQueue内部阻止元素直到某个延迟到期,元素必须实现接口java.util.concurrent.Delayed。以下是java.util.concurrent.Delayed接口:

1
2
3
4
5
public interface Delayed extends Comparable<Delayed< {

public long getDelay(TimeUnit timeUnit);

}

getDelay()方法返回的值应该是在释放此元素之前剩余的延迟。如果返回0或负值,则延迟将被视为已过期,并且在DelayQueue调用下一个take()等操作时释放。

​ 传递给getDelay()方法的TimeUnit实例是一个Enum,它说明了延迟的时间单位。TimeUnit枚举有以下值:

1
2
3
4
5
6
7
DAYS
HOURS
MINUTES
SECONDS
MILLISECONDS
MICROSECONDS
NANOSECONDS

Delayed接口继承了java.lang.Comparable接口,这意味着Delayed对象可以被相互比较。这可能是在DelayQueue内部用于排序队列中的元素,因此它们能够按到期时间排序释放。

示例

以下是使用DelayQueue的示例:

1
2
3
4
5
6
7
8
9
public class DelayQueueExample {

public static void main(String[] args) {
DelayQueue queue = new DelayQueue();
Delayed element1 = new DelayedElement();
queue.put(element1);
Delayed element2 = queue.take();
}
}

DelayedElement是我创建的Delayed接口的实现。它不是java.util.concurrent包的一部分。你必须创建自己的Delayed接口实现才能使用DelayQueue类。

源码

整体介绍

DelayQueue类的泛型定义中可以看出,此类只能储存继承自Delayed接口的元素,内部使用一个优先级队列对元素进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();

// 等待队列的头节点,可以视作一个缓存
// 当一个线程成为leader,它只会等待指定延迟的时间,但
// 其他线程会一直等到。所以leader线程在获取到元素后
// 一定要释放其他线程,除非其他线程临时成为leader
private Thread leader;

/**
* 当队列头部的一个新元素可获得(即超时到期)或者一个新线程成为leader,唤醒此等待条件上的线程
*/
private final Condition available = lock.newCondition();
构造函数

只有两个构造方法,一个是默认构造方法,一个是给定一个集合,并将其中元素增加到等待队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
public DelayQueue() {}

/**
* Creates a {@code DelayQueue} initially containing the elements of the
* given collection of {@link Delayed} instances.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
增加操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public boolean add(E e) {
// 重用offer方法
return offer(e);
}

public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 将元素增加到优先级队列中
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

public void put(E e) {
// 因为是无界队列,所以插入不会被阻塞。超时方法同理
offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}

删除操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

// 提取并删除第一个元素,如果队列为空返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取第一个元素
E first = q.peek();
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: q.poll();
} finally {
lock.unlock();
}
}

/**
* 提取并删除队列的第一个元素,如果队列为空则等待
* 直到有可获得的元素
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
// 如果队列为空,阻塞
if (first == null)
available.await();
else {
// 获取头元素的等待延迟
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
first = null; // don't retain ref while waiting
// 如果已经有线程在等待获取头元素,那么阻塞自己
if (leader != null)
available.await();
// 否则,自己就是leader,等待给定延迟
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果成功获取到元素并且队列不为空,唤醒其他线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue,
* or the specified wait time expires.
*
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
// 如果队列为空,超时等待
if (first == null) {
if (nanos <= 0L)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
// 如果延迟还未到期,而指定的超时已到期,那么返回null
if (nanos <= 0L)
return null;
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

访问操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 委托给优先级队列获取
return q.peek();
} finally {
lock.unlock();
}
}
其他操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = 0;
for (E first;
n < maxElements
&& (first = q.peek()) != null
&& first.getDelay(NANOSECONDS) <= 0;) {
// 增加到集合中
c.add(first); // In this order, in case add() throws.
// 从队列中删除此元素
q.poll();
++n;
}
return n;
} finally {
lock.unlock();
}
}
迭代器

迭代器使用数组保存队列中的元素,当创建一个迭代器时,使用toArray()方法将当前队列转换为数组,所以此迭代器不一定会和内部的优先级队列保持一致。迭代器除了提供访问操作外,只提供了一个删除操作,这个删除操作保证不会出现不一致的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public Iterator<E> iterator() {
return new Itr(toArray());
}

/**
* Snapshot iterator that works off copy of underlying q array.
*/
private class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such

Itr(Object[] array) {
lastRet = -1;
this.array = array;
}

public boolean hasNext() {
return cursor < array.length;
}

@SuppressWarnings("unchecked")
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
return (E)array[lastRet = cursor++];
}

public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}

void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 获取优先级队列的迭代器,然后执行删除操作
for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
if (o == it.next()) {
it.remove();
break;
}
}
} finally {
lock.unlock();
}
}
核心要点
  1. 使用此队列时,元素必须要实现Delayed接口

  2. 当已经有一个线程等待获取队列头元素时,其他也想要获取元素的线程就会进行等待阻塞状态

  3. 迭代器不和内部的优先级队列保持一致性

  4. 迭代器的remove()方法与内部的优先级队列保持一致性

LinkedBlockingQueue

LinkedBlockingQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

LinkedBlockingQueue在内部将元素存储在链接结构(链接节点)中。如果需要,该链接结构可以具有一个上限。如果未指定上限,则使用Integer.MAX_VALUE作为上限。

LinkedBlockingQueue内部将元素以FIFO(先入先出)次序存储。队列的头部是已在队列中的时间最长的元素,队列的尾部是已在队列中的时间最短的元素。

示例

以下是如何实例化和使用LinkedBlockingQueue

1
2
3
4
5
6
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024);

bounded.put("Value");

String value = bounded.take();

源码

整体介绍

LinkedBlockingQueue内部使用了一个单向链表,同时它提供了两个锁,一个用于获取并删除元素,一个用于增加元素。count字段使用原子变量,避免修改它时需要同时获取两个锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static class Node<E> {
E item;

/**
* 下面中的一个:
* - 真实的后继节点
* - 这个节点本身,此时原后继节点现在是head.next,即第一个元素
* - null, 意味没有后继节点,此节点是队列最后一个节点
*/
Node<E> next;

Node(E x) { item = x; }
}

private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
增加操作

注意进行增加操作时,只对putLock加锁,如果还对takeLock也进行加锁,那么就会影响性能。同时,为了弥补此方法带来的后果,count使用原子变量,进行CAS更新,防止数据不一致。

​ 为了提升性能,在增加元素成功后,如果队列还没有满,那么便唤醒其他因队列满而被阻塞的插入线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 注意即使count没有被锁保护,它依然可以被用作等待条件
// 判定。因为此时count只会被减少(putLock已加锁),如果容量
// 改变,会被唤醒。count在其他地方的使用也与此相似。

// 队列已满,阻塞自己
while (count.get() == capacity) {
notFull.await();
}
// 插入队列中
enqueue(node);
// CAS更新count值
c = count.getAndIncrement();
// 如果队列没满,唤醒其他等待插入的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果队列原来是空队列,唤醒等待提取元素的线程
if (c == 0)
signalNotEmpty();
}

private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 先加锁,才能调用对应Condtion的signal()方法
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 队列已满,返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 等待-超时机制
while (count.get() == capacity) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
删除操作

删除操作与增加操作一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 当队列为空,阻塞自己
while (count.get() == 0) {
notEmpty.await();
}
// 将头节点出队
x = dequeue();
c = count.getAndDecrement();
// 如果队列还有元素,唤醒其他等待提取元素的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果原本队列是满的,唤醒增加线程,因为现在元素已经被取出,队列不满
if (c == capacity)
signalNotFull();
return x;
}

private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;

// 头节点为空,其中不存储元素
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

// 删除一个指定元素
public boolean remove(Object o) {
if (o == null) return false;
// 将两个锁全部加锁
fullyLock();
try {
for (Node<E> pred = head, p = pred.next;
p != null;
pred = p, p = p.next) {
if (o.equals(p.item)) {
// 从队列中移除此节点
unlink(p, pred);
return true;
}
}
return false;
} finally {
// 释放全部两个锁
fullyUnlock();
}
}

void unlink(Node<E> p, Node<E> pred) {
// assert putLock.isHeldByCurrentThread();
// assert takeLock.isHeldByCurrentThread();
// p.next没有被设置为null,为了保证迭代器遍历到p时继续工作,
// 保证弱一致性
p.item = null;
pred.next = p.next;
if (last == p)
last = pred;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
访问操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E peek() {
// 队列为空,返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 返回第一个元素
return (count.get() > 0) ? head.next.item : null;
} finally {
takeLock.unlock();
}
}

其他操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
// 使得next指向自己
h.next = h;
// 解除对元素实体的引用
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
// 如果原来队列是满的,唤醒等待的插入线程
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}


public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 获取当前队列中的元素数量
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
// 将n个元素加入到指定集合中
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}

迭代器

LinkedBlockingQueue的迭代器与DelayQueue的不同,DelayQueue的迭代器与原组件没有任何的一致性,而LinkedBlockingQueue的迭代器与内部的链表保持了弱一致性。

​ 注意它的next()方法,它会跳过内容为null的节点,回忆前面删除操作中的remove(Object)方法,他没有修改节点的next字段,如果修改了,迭代器就会无法正常工作,而为了保证一致性,迭代器也需要跳过这个空节点。

​ 而它的forEachRemaining(Consumer<? super E> action)方法是分批次进行处理的,每批64个元素,如果数量小于64,那就使用此数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
private class Itr implements Iterator<E> {
private Node<E> next; // 持有nextItem的节点
private E nextItem; // 下一个进行处理的元素
private Node<E> lastRet; // 上一个返回的元素,即当前正在使用的
private Node<E> ancestor; // Helps unlink lastRet on remove()

Itr() {
fullyLock();
try {
// 保存第一个元素
if ((next = head.next) != null)
nextItem = next.item;
} finally {
fullyUnlock();
}
}

public boolean hasNext() {
return next != null;
}

public E next() {
Node<E> p;
if ((p = next) == null)
throw new NoSuchElementException();
lastRet = p;
E x = nextItem;
fullyLock();
try {
E e = null;
// 注意此处,遇到空节点会跳过去访问下一个节点
for (p = p.next; p != null && (e = p.item) == null; )
p = succ(p);
next = p;
nextItem = e;
} finally {
fullyUnlock();
}
return x;
}

Node<E> succ(Node<E> p) {
// 正常出队的元素next字段会指向自己
if (p == (p = p.next))
p = head.next;
return p;
}

public void forEachRemaining(Consumer<? super E> action) {
// A variant of forEachFrom
Objects.requireNonNull(action);
Node<E> p;
if ((p = next) == null) return;
lastRet = p;
next = null;
final int batchSize = 64;
Object[] es = null;
int n, len = 1;
do {
fullyLock();
try {
if (es == null) {
p = p.next;
// 获取真正存在的元素的数量,如果多于64,分批进行,一批为64个
for (Node<E> q = p; q != null; q = succ(q))
if (q.item != null && ++len == batchSize)
break;
es = new Object[len];
es[0] = nextItem;
nextItem = null;
n = 1;
} else
n = 0;
// n为1的使用只因为p=p.next,经过此步后p已经不是首元素,
// 而是第二个元素。而后面批次的插入直接从0开始即可
// 将元素放入数组中
for (; p != null && n < len; p = succ(p))
if ((es[n] = p.item) != null) {
lastRet = p;
n++;
}
} finally {
fullyUnlock();
}
// 分别调用accept方法
for (int i = 0; i < n; i++) {
@SuppressWarnings("unchecked") E e = (E) es[i];
action.accept(e);
}
} while (n > 0 && p != null);
}

public void remove() {
// 获取当前元素
Node<E> p = lastRet;
if (p == null)
throw new IllegalStateException();
lastRet = null;
fullyLock();
try {
if (p.item != null) {
if (ancestor == null)
ancestor = head;
// 获取p的前驱结点
ancestor = findPred(p, ancestor);
// 从链表中删除结点p
unlink(p, ancestor);
}
} finally {
fullyUnlock();
}
}
}

核心要点
  1. 内部使用一个单向链表,以FIFO顺序存储

  2. 可以在链表两头同时进行操作,所以使用两个锁分别保护

  3. 插入线程在执行完操作后如果队列未满会唤醒其他等待插入的线程,同时队列非空还会唤醒等待获取元素的线程;提取线程同理。

  4. 迭代器与单向链表保持弱一致性,调用remove(T)方法删除一个元素后,不会解除其对下一个结点的next引用,否则迭代器将无法工作。

  5. 迭代器的forEachRemaining(Consumer<? super E> action)以64个元素为一批进行操作

评论