java并发工具类-队列
BlockingQueue
java.util.concurrent
包中的Java BlockingQueue
接口表示一个线程可以安全放入以及从中获取实例的队列。在本文中,我将向你展示如何使用BlockingQueue
。
使用
一个BlockingQueue
通常用于在线程上生成对象,另一个线程消耗对象。这是一个说明这个原则的图表:
生产线程将一直生成新对象并将它们插入队列,直到达到队列的容量上限。如果阻塞队列达到其上限,则在尝试插入新对象时会阻止生产线程。它将一直被阻塞,直到消费线程将一个对象从队列中取出。
消费线程不断将对象从阻塞队列中取出并处理它们。如果消费线程试图将对象从空队列中取出实例,那么消费线程将被阻塞,直到生产线程向队列放入一个对象。
方法
BlockingQueue
有4组不同的方法用于插入,删除和检查队列中的元素。当不能立即执行所请求的操作时,每组方法的行为会不同。这是一个方法表:
抛出异常
返回特殊值
阻塞
超时
插入
add(o)
offer(o)
put(o)
offer(o, timeout, timeunit)
删除
remove(o)
poll()
take()
poll(timeout, timeunit)
访问
element()
peek()
方法行为
这4种不同的行为意味着:
抛出异常
如果请求的操作现在无法完成,则抛出异常。
特殊值
如果请求的操作现在无法完成,则返回特殊值(一般为 true / false).
阻塞
如果请求的操作现在无法完成,则方法调用将阻塞,直到操作能够进行。
超时 如果请求的操作现在无法完成,则方法调用将阻塞直到它能够进行,但等待不超过给定的超时。返回一个特殊值,告知操作是否成功(通常为true / false)
注意 无法插入null
到BlockingQueue
中。如果你尝试插入null
, BlockingQueue
则会抛出一个NullPointerException
异常。
你可以访问BlockingQueue
内的所有元素,而不仅仅是开头和结尾的元素。例如,假设你已将一个对象入队等待处理,但你的应用程序决定取消它。你可以调用remove(o)
这样的操作来删除队列中的特定对象。但是,这是个效率很低的操作,所以除非你真的需要,否则你不应该使用Collection
中的这些方法。
示例
这是一个Java BlockingQueue
示例。该示例使用实现BlockingQueue
接口的ArrayBlockingQueue
类。
BlockingQueueExample
首先, BlockingQueueExample
类在不同的线程中启动Producer
和Consumer
。Producer
将一个字符串插入共享的BlockingQueue
,而Consumer
使用它们。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class BlockingQueueExample { public static void main (String[] args) throws Exception { BlockingQueue queue = new ArrayBlockingQueue (1024 ); Producer producer = new Producer (queue); Consumer consumer = new Consumer (queue); new Thread (producer).start(); new Thread (consumer).start(); Thread.sleep(4000 ); } }
Producer
这是Producer
类。注意每次put()
调用之间它都会睡一秒钟。这将导致Consumer
阻塞,为了等待获取队列中的对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Producer implements Runnable { protected BlockingQueue queue = null ; public Producer (BlockingQueue queue) { this .queue = queue; } public void run () { try { queue.put("1" ); Thread.sleep(1000 ); queue.put("2" ); Thread.sleep(1000 ); queue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer
这是Consumer
类。它从队列中取出对象,然后将它们打印到System.out
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Consumer implements Runnable { protected BlockingQueue queue = null ; public Consumer (BlockingQueue queue) { this .queue = queue; } public void run () { try { System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }
测试
下面是一个测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import org.junit.Test;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class BlockingExample { @Test public void test () throws Exception { BlockingQueue<String> queue = new ArrayBlockingQueue <>(1024 ); Producer producer = new Producer (queue); Consumer consumer = new Consumer (queue); Thread thread1 = new Thread (producer); Thread thread2 = new Thread (consumer); thread1.start(); thread2.start(); thread1.join(); thread2.join(); } private static class Producer implements Runnable { private BlockingQueue<String> queue; Producer(BlockingQueue<String> queue) { this .queue = queue; } @Override public void run () { try { queue.put("1" ); Thread.sleep(1000 ); queue.put("2" ); Thread.sleep(1000 ); queue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Consumer implements Runnable { private BlockingQueue<String> queue = null ; Consumer(BlockingQueue<String> queue) { this .queue = queue; } @Override public void run () { try { System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } }
BlockingDeque
java.util.concurrent
中的BlockingDeque
接口表示一个双向队列,它可以被线程安全的放入以及从中获取实例。在本文中,我将向你展示如何使用BlockingDeque
。
BlockingDeque
类是一个Deque
,它会阻塞尝试在deque中插入或删除的线程,以防它能够向队列中插入或删除元素。
deque是“Double Ended Queue”(双端队列)的缩写。因此,deque是一个队列,你可以从它的两端插入和获取元素。
用法
如果线程同时生成和使用同一队列的元素,则可以使用BlockingDeque
。如果生成线程需要在队列的两端插入元素,并且消费线程需要从队列的两端移除元素,那么也可以使用它:
线程将生成元素并将它们插入队列的任一端。如果deque当前已满,则插入线程将被阻塞,直到删除线程将元素从双端队列中取出。如果deque当前为空,则将阻止删除线程,直到插入线程将元素插入到双端队列中。
方法
BlockingDeque
有4组不同的方法用于插入,移除以及检查双端队列中的元素。如果不能立即执行所请求的操作,则每组方法的行为都不同。这是一个方法表:
抛出异常
返回特殊值
阻塞
超时
Insert
addFirst(o)
offerFirst(o)
putFirst(o)
offerFirst(o, timeout, timeunit)
Remove
removeFirst(o)
pollFirst(o)
takeFirst(o)
pollFirst(timeout, timeunit)
Examine
getFirst(o)
peekFirst(o)
抛出异常
返回特殊值
阻塞
超时
Insert
addLast(o)
offerLast(o)
putLast(o)
offerLast(o, timeout, timeunit)
Remove
removeLast(o)
pollLast(o)
takeLast(o)
pollLast(timeout, timeunit)
Examine
getLast(o)
peekLast(o)
这与BlockingQueue
类似,只多了一组方法。
继承自 BlockingQueue BlockingDeque
接口扩展BlockingQueue
接口。这意味着你可以使用BlockingDeque
作为BlockingQueue
。如果这样做,各种插入方法会将元素添加到双端队列的末尾,而删除方法将从双端队列的开头删除元素,即BlockingQueue
接口的插入和删除方法。
下面是一个表格,对应了BlockingQueue
和BlockingDeque
的方法:
BlockingQueue
BlockingDeque
add()
addLast()
offer() x 2
offerLast() x 2
put()
putLast()
remove()
removeFirst()
poll() x 2
pollFirst() x 2
take()
takeFirst()
element()
getFirst()
peek()
peekFirst()
示例
这是一个如何使用BlockingDeque
方法的小代码示例:
1 2 3 4 5 6 7 BlockingDeque<String> deque = new LinkedBlockingDeque <String>(); deque.addFirst("1" ); deque.addLast("2" ); String two = deque.takeLast();String one = deque.takeFirst();
ArrayBlockingQueue
ArrayBlockingQueue
类实现了BlockingQueue
接口。阅读BlockingQueue
文本以获取有关的更多信息。
ArrayBlockingQueue
是一个有界的阻塞队列 ,它将元素存储在数组内部。有界意味着它无法存储无限量的元素,它可以同时存储的元素数量有一个上限。你需要在实例化时设置上限,之后无法更改,所以它和ArrayList
有些区别,不要因为它们的名称相似而将它们的功能混杂。
ArrayBlockingQueue
内部是以FIFO(先入先出)次序来存储元素的。队列的头部是在队列中存活时间最长的元素,而队列的尾部是在队列中存活时间最短的元素。
示例
以下是实例化和使用ArrayBlockingQueue
的例子:
1 2 3 4 5 BlockingQueue queue = new ArrayBlockingQueue (1024 );queue.put("1" ); Object object = queue.take();
这是一个使用Java 泛型的BlockingQueue
例子:
1 2 3 4 5 BlockingQueue<String> queue = new ArrayBlockingQueue <String>(1024 ); queue.put("1" ); String string = queue.take();
源码 成员变量
ArrayBlockingQueue
中使用了这几个成员变量来保证操作,其实内部使用了一个循环数组,其中takeIndex和putIndex其实相当于队列的头部和尾部。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull;
构造函数
构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public ArrayBlockingQueue (int capacity) { this (capacity, false ); } public ArrayBlockingQueue (int capacity, boolean fair) { if (capacity <= 0 ) throw new IllegalArgumentException (); this .items = new Object [capacity]; lock = new ReentrantLock (fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public ArrayBlockingQueue (int capacity, boolean fair, Collection<? extends E> c) { this (capacity, fair); final ReentrantLock lock = this .lock; lock.lock(); try { final Object[] items = this .items; int i = 0 ; try { for (E e : c) items[i++] = Objects.requireNonNull(e); } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException (); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
增加操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 public boolean add (E e) { return super .add(e); } public boolean add (E e) { if (offer(e)) return true ; else throw new IllegalStateException ("Queue full" ); } ------------------------------------------------------------------------- public boolean offer (E e) { Objects.requireNonNull(e); final ReentrantLock lock = this .lock; lock.lock(); try { if (count == items.length) return false ; else { enqueue(e); return true ; } } finally { lock.unlock(); } } public static <T> T requireNonNull (T obj) { if (obj == null ) throw new NullPointerException (); return obj; } private void enqueue (E e) { final Object[] items = this .items; items[putIndex] = e; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); } ------------------------------------------------------------------------- public void put (E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } ------------------------------------------------------------------------- public boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException { Objects.requireNonNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0L ) return false ; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true ; } finally { lock.unlock(); } }
删除操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 public boolean remove (Object o) { if (o == null ) return false ; final ReentrantLock lock = this .lock; lock.lock(); try { if (count > 0 ) { final Object[] items = this .items; for (int i = takeIndex, end = putIndex, to = (i < end) ? end : items.length; ; i = 0 , to = end) { for (; i < to; i++) if (o.equals(items[i])) { removeAt(i); return true ; } if (to == end) break ; } } return false ; } finally { lock.unlock(); } } void removeAt (final int removeIndex) { final Object[] items = this .items; if (removeIndex == takeIndex) { items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); } else { for (int i = removeIndex, putIndex = this .putIndex;;) { int pred = i; if (++i == items.length) i = 0 ; if (i == putIndex) { items[pred] = null ; this .putIndex = pred; break ; } items[pred] = items[i]; } count--; if (itrs != null ) itrs.removedAt(removeIndex); } notFull.signal(); } ------------------------------------------------------------------------- public E remove () { E x = poll(); if (x != null ) return x; else throw new NoSuchElementException (); } ------------------------------------------------------------------------- public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue () { final Object[] items = this .items; @SuppressWarnings("unchecked") E e = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return e; } ------------------------------------------------------------------------- public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } ------------------------------------------------------------------------- public E poll (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) { if (nanos <= 0L ) return null ; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
阻塞方法以及超时方法和增加操作一样,此处不多做讲解。
访问操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public E element () { E x = peek(); if (x != null ) return x; else throw new NoSuchElementException (); } ------------------------------------------------------------------------- public E peek () { final ReentrantLock lock = this .lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } } final E itemAt (int i) { return (E) items[i]; }
辅助方法
部分方法逻辑简单,有兴趣自己查看即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 public void clear () { final ReentrantLock lock = this .lock; lock.lock(); try { int k; if ((k = count) > 0 ) { circularClear(items, takeIndex, putIndex); takeIndex = putIndex; count = 0 ; if (itrs != null ) itrs.queueIsEmpty(); for (; k > 0 && lock.hasWaiters(notFull); k--) notFull.signal(); } } finally { lock.unlock(); } } private static void circularClear (Object[] items, int i, int end) { for (int to = (i < end) ? end : items.length; ; i = 0 , to = end) { for (; i < to; i++) items[i] = null ; if (to == end) break ; } } ------------------------------------------------------------------------- public int drainTo (Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } public int drainTo (Collection<? super E> c, int maxElements) { Objects.requireNonNull(c); if (c == this ) throw new IllegalArgumentException (); if (maxElements <= 0 ) return 0 ; final Object[] items = this .items; final ReentrantLock lock = this .lock; lock.lock(); try { int n = Math.min(maxElements, count); int take = takeIndex; int i = 0 ; try { while (i < n) { @SuppressWarnings("unchecked") E e = (E) items[take]; c.add(e); items[take] = null ; if (++take == items.length) take = 0 ; i++; } return n; } finally { if (i > 0 ) { count -= i; takeIndex = take; if (itrs != null ) { if (count == 0 ) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped(); } for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); } } } finally { lock.unlock(); } }
核心要点
内部使用了一个循环数组
是一个有界数组,提供了容量后无法被更改
可以指定锁的公平性
DelayQueue
DelayQueue
类实现BlockingQueue
接口。阅读BlockingQueue
文本以获取有关的更多信息。
DelayQueue
内部阻止元素直到某个延迟到期,元素必须实现接口java.util.concurrent.Delayed
。以下是java.util.concurrent.Delayed
接口:
1 2 3 4 5 public interface Delayed extends Comparable <Delayed< { public long getDelay (TimeUnit timeUnit) ; }
getDelay()
方法返回的值应该是在释放此元素之前剩余的延迟。如果返回0或负值,则延迟将被视为已过期,并且在DelayQueue
调用下一个take()
等操作时释放。
传递给getDelay()
方法的TimeUnit
实例是一个Enum
,它说明了延迟的时间单位。TimeUnit
枚举有以下值:
1 2 3 4 5 6 7 DAYS HOURS MINUTES SECONDS MILLISECONDS MICROSECONDS NANOSECONDS
Delayed
接口继承了java.lang.Comparable
接口,这意味着Delayed
对象可以被相互比较。这可能是在DelayQueue
内部用于排序队列中的元素,因此它们能够按到期时间排序释放。
示例
以下是使用DelayQueue
的示例:
1 2 3 4 5 6 7 8 9 public class DelayQueueExample { public static void main (String[] args) { DelayQueue queue = new DelayQueue (); Delayed element1 = new DelayedElement (); queue.put(element1); Delayed element2 = queue.take(); } }
DelayedElement
是我创建的Delayed
接口的实现。它不是java.util.concurrent
包的一部分。你必须创建自己的Delayed
接口实现才能使用DelayQueue
类。
源码 整体介绍
从DelayQueue
类的泛型定义中可以看出,此类只能储存继承自Delayed
接口的元素,内部使用一个优先级队列对元素进行排序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class DelayQueue <E extends Delayed > extends AbstractQueue <E> implements BlockingQueue <E> { private final transient ReentrantLock lock = new ReentrantLock (); private final PriorityQueue<E> q = new PriorityQueue <E>(); private Thread leader; private final Condition available = lock.newCondition();
构造函数
只有两个构造方法,一个是默认构造方法,一个是给定一个集合,并将其中元素增加到等待队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 public DelayQueue () {}public DelayQueue (Collection<? extends E> c) { this .addAll(c); }
增加操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public boolean add (E e) { return offer(e); } public boolean offer (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null ; available.signal(); } return true ; } finally { lock.unlock(); } } public void put (E e) { offer(e); } public boolean offer (E e, long timeout, TimeUnit unit) { return offer(e); }
删除操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 public E remove () { E x = poll(); if (x != null ) return x; else throw new NoSuchElementException (); } public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { E first = q.peek(); return (first == null || first.getDelay(NANOSECONDS) > 0 ) ? null : q.poll(); } finally { lock.unlock(); } } public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L ) return q.poll(); first = null ; if (leader != null ) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock.unlock(); } } public E poll (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) { if (nanos <= 0L ) return null ; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L ) return q.poll(); if (nanos <= 0L ) return null ; first = null ; if (nanos < delay || leader != null ) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock.unlock(); } }
访问操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public E element () { E x = peek(); if (x != null ) return x; else throw new NoSuchElementException (); } public E peek () { final ReentrantLock lock = this .lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } }
其他操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public int drainTo (Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } public int drainTo (Collection<? super E> c, int maxElements) { Objects.requireNonNull(c); if (c == this ) throw new IllegalArgumentException (); if (maxElements <= 0 ) return 0 ; final ReentrantLock lock = this .lock; lock.lock(); try { int n = 0 ; for (E first; n < maxElements && (first = q.peek()) != null && first.getDelay(NANOSECONDS) <= 0 ;) { c.add(first); q.poll(); ++n; } return n; } finally { lock.unlock(); } }
迭代器
迭代器使用数组保存队列中的元素,当创建一个迭代器时,使用toArray()
方法将当前队列转换为数组,所以此迭代器不一定 会和内部的优先级队列保持一致 。迭代器除了提供访问操作外,只提供了一个删除操作,这个删除操作保证不会出现不一致的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public Iterator<E> iterator () { return new Itr (toArray()); } private class Itr implements Iterator <E> { final Object[] array; int cursor; int lastRet; Itr(Object[] array) { lastRet = -1 ; this .array = array; } public boolean hasNext () { return cursor < array.length; } @SuppressWarnings("unchecked") public E next () { if (cursor >= array.length) throw new NoSuchElementException (); return (E)array[lastRet = cursor++]; } public void remove () { if (lastRet < 0 ) throw new IllegalStateException (); removeEQ(array[lastRet]); lastRet = -1 ; } } void removeEQ (Object o) { final ReentrantLock lock = this .lock; lock.lock(); try { for (Iterator<E> it = q.iterator(); it.hasNext(); ) { if (o == it.next()) { it.remove(); break ; } } } finally { lock.unlock(); } }
核心要点
使用此队列时,元素必须要实现Delayed
接口
当已经有一个线程等待获取队列头元素时,其他也想要获取元素的线程就会进行等待阻塞状态
迭代器不和内部的优先级队列保持一致性
迭代器的remove()
方法与内部的优先级队列保持一致性
LinkedBlockingQueue
LinkedBlockingQueue
类实现了BlockingQueue
接口。阅读BlockingQueue
文本以获取有关的更多信息。
LinkedBlockingQueue
在内部将元素存储在链接结构(链接节点)中。如果需要,该链接结构可以具有一个上限。如果未指定上限,则使用Integer.MAX_VALUE
作为上限。
LinkedBlockingQueue
内部将元素以FIFO(先入先出)次序存储。队列的头部是已在队列中的时间最长的元素,队列的尾部是已在队列中的时间最短的元素。
示例
以下是如何实例化和使用LinkedBlockingQueue
:
1 2 3 4 5 6 BlockingQueue<String> unbounded = new LinkedBlockingQueue <String>(); BlockingQueue<String> bounded = new LinkedBlockingQueue <String>(1024 ); bounded.put("Value" ); String value = bounded.take();
源码 整体介绍
LinkedBlockingQueue
内部使用了一个单向链表,同时它提供了两个锁,一个用于获取并删除元素,一个用于增加元素。count
字段使用原子变量,避免修改它时需要同时获取两个锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 static class Node <E> { E item; Node<E> next; Node(E x) { item = x; } } private final int capacity;private final AtomicInteger count = new AtomicInteger ();transient Node<E> head;private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock ();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock ();private final Condition notFull = putLock.newCondition();
增加操作
注意进行增加操作时,只对putLock
加锁,如果还对takeLock
也进行加锁,那么就会影响性能。同时,为了弥补此方法带来的后果,count
使用原子变量,进行CAS更新,防止数据不一致。
为了提升性能,在增加元素成功后,如果队列还没有满,那么便唤醒其他因队列满而被阻塞的插入线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); int c = -1 ; Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); } private void enqueue (Node<E> node) { last = last.next = node; } private void signalNotEmpty () { final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } public boolean offer (E e) { if (e == null ) throw new NullPointerException (); final AtomicInteger count = this .count; if (count.get() == capacity) return false ; int c = -1 ; Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); return c >= 0 ; } public boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null ) throw new NullPointerException (); long nanos = unit.toNanos(timeout); int c = -1 ; final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0L ) return false ; nanos = notFull.awaitNanos(nanos); } enqueue(new Node <E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); return true ; }
删除操作
删除操作与增加操作一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; } private void signalNotFull () { final ReentrantLock putLock = this .putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } public E poll (long timeout, TimeUnit unit) throws InterruptedException { E x = null ; int c = -1 ; long nanos = unit.toNanos(timeout); final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { if (nanos <= 0L ) return null ; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } public E poll () { final AtomicInteger count = this .count; if (count.get() == 0 ) return null ; E x = null ; int c = -1 ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { if (count.get() > 0 ) { x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } public boolean remove (Object o) { if (o == null ) return false ; fullyLock(); try { for (Node<E> pred = head, p = pred.next; p != null ; pred = p, p = p.next) { if (o.equals(p.item)) { unlink(p, pred); return true ; } } return false ; } finally { fullyUnlock(); } } void unlink (Node<E> p, Node<E> pred) { p.item = null ; pred.next = p.next; if (last == p) last = pred; if (count.getAndDecrement() == capacity) notFull.signal(); }
访问操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public E peek () { if (count.get() == 0 ) return null ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { return (count.get() > 0 ) ? head.next.item : null ; } finally { takeLock.unlock(); } }
其他操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public void clear () { fullyLock(); try { for (Node<E> p, h = head; (p = h.next) != null ; h = p) { h.next = h; p.item = null ; } head = last; if (count.getAndSet(0 ) == capacity) notFull.signal(); } finally { fullyUnlock(); } } public int drainTo (Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } public int drainTo (Collection<? super E> c, int maxElements) { Objects.requireNonNull(c); if (c == this ) throw new IllegalArgumentException (); if (maxElements <= 0 ) return 0 ; boolean signalNotFull = false ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { int n = Math.min(maxElements, count.get()); Node<E> h = head; int i = 0 ; try { while (i < n) { Node<E> p = h.next; c.add(p.item); p.item = null ; h.next = h; h = p; ++i; } return n; } finally { if (i > 0 ) { head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) signalNotFull(); } }
迭代器
LinkedBlockingQueue
的迭代器与DelayQueue
的不同,DelayQueue
的迭代器与原组件没有任何的一致性,而LinkedBlockingQueue
的迭代器与内部的链表保持了弱一致性。
注意它的next()
方法,它会跳过内容为null的节点,回忆前面删除操作中的remove(Object)
方法,他没有修改节点的next字段,如果修改了,迭代器就会无法正常工作,而为了保证一致性,迭代器也需要跳过这个空节点。
而它的forEachRemaining(Consumer<? super E> action)
方法是分批次进行处理的,每批64个元素,如果数量小于64,那就使用此数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 private class Itr implements Iterator <E> { private Node<E> next; private E nextItem; private Node<E> lastRet; private Node<E> ancestor; Itr() { fullyLock(); try { if ((next = head.next) != null ) nextItem = next.item; } finally { fullyUnlock(); } } public boolean hasNext () { return next != null ; } public E next () { Node<E> p; if ((p = next) == null ) throw new NoSuchElementException (); lastRet = p; E x = nextItem; fullyLock(); try { E e = null ; for (p = p.next; p != null && (e = p.item) == null ; ) p = succ(p); next = p; nextItem = e; } finally { fullyUnlock(); } return x; } Node<E> succ (Node<E> p) { if (p == (p = p.next)) p = head.next; return p; } public void forEachRemaining (Consumer<? super E> action) { Objects.requireNonNull(action); Node<E> p; if ((p = next) == null ) return ; lastRet = p; next = null ; final int batchSize = 64 ; Object[] es = null ; int n, len = 1 ; do { fullyLock(); try { if (es == null ) { p = p.next; for (Node<E> q = p; q != null ; q = succ(q)) if (q.item != null && ++len == batchSize) break ; es = new Object [len]; es[0 ] = nextItem; nextItem = null ; n = 1 ; } else n = 0 ; for (; p != null && n < len; p = succ(p)) if ((es[n] = p.item) != null ) { lastRet = p; n++; } } finally { fullyUnlock(); } for (int i = 0 ; i < n; i++) { @SuppressWarnings("unchecked") E e = (E) es[i]; action.accept(e); } } while (n > 0 && p != null ); } public void remove () { Node<E> p = lastRet; if (p == null ) throw new IllegalStateException (); lastRet = null ; fullyLock(); try { if (p.item != null ) { if (ancestor == null ) ancestor = head; ancestor = findPred(p, ancestor); unlink(p, ancestor); } } finally { fullyUnlock(); } } }
核心要点
内部使用一个单向链表,以FIFO顺序存储
可以在链表两头同时进行操作,所以使用两个锁分别保护
插入线程在执行完操作后如果队列未满会唤醒其他等待插入的线程,同时队列非空还会唤醒等待获取元素的线程;提取线程同理。
迭代器与单向链表保持弱一致性,调用remove(T)
方法删除一个元素后,不会解除其对下一个结点的next引用,否则迭代器将无法工作。
迭代器的forEachRemaining(Consumer<? super E> action)
以64个元素为一批进行操作