此系列文章放在了我的专栏里, 欢迎查看
https://blog.csdn.net/column/details/24187.html

相关衔接
从源码看Android常用的数据结构 ( SDK23版本 ) ( 一 , 总述 )
从源码看Android常用的数据结构 ( SDK23版本 ) ( 二, List篇 )
从源码看Android常用的数据结构 ( SDK23版本 ) ( 三 , Queue篇)
从源码看Android常用的数据结构 ( SDK23版本 ) ( 四, Set篇 )
从源码看Android常用的数据结构 ( SDK23版本 ) ( 五, Map篇 )
从源码看Android常用的数据结构 ( SDK23版本 ) ( 六, ConcurrentHashMap )
从源码看Android常用的数据结构 ( 七, SDK28下的HashMap )

Github里有一份Android使用到的小技术, 欢迎查看:
https://github.com/YouCii/LearnApp


总览

先翻译下官方注释, 了解其特性和设计目的.

A collection designed for holding elements prior to processing.Besides basic {@link java.util.Collection Collection} operations,queues provide additional insertion, extraction, and inspectionoperations.  Each of these methods exists in two forms: one throwsan exception if the operation fails, the other returns a specialvalue (either <tt>null</tt> or <tt>false</tt>, depending on theoperation).  The latter form of the insert operation is designedspecifically for use with capacity-restricted <tt>Queue</tt>implementations; in most implementations, insert operations cannotfail.Queue是一种用于在处理前持有elements的集合。除了基本的Collection操作之外,Queue提供了额外的插入、提取和检查操作。每个方法都有两种形式:一种是操作失败时抛出异常, 另一种是返回model或者根据操作类型返回null以及false。后一种插入操作形式是专门设计于Queue有容量限制的实现类, 因为在大多数实现类里,插入操作不允许失败。<p>Queues typically, but do not necessarily, order elements in aFIFO (first-in-first-out) manner.  Among the exceptions arepriority queues, which order elements according to a suppliedcomparator, or the elements' natural ordering, and LIFO queues (orstacks) which order the elements LIFO (last-in-first-out).Whatever the ordering used, the <em>head</em> of the queue is thatelement which would be removed by a call to {@link #remove() } or{@link #poll()}.  In a FIFO queue, all new elements are inserted atthe <em> tail</em> of the queue. Other kinds of queues may usedifferent placement rules.  Every <tt>Queue</tt> implementationmust specify its ordering properties.队列通常但不一定都是按照FIFO(先入先出)规则排列元素。比如PriorityQueue,它根据提供的比较器或元素的自然排序来排列元素; 再比如遵循LIFO(后进先出)规则的LIFO队列(或Stack堆栈)。无论使用的排序规则是什么,队列的head就是会被remove()或者poll()移除的那个元素。在FIFO队列中,所有新的元素都会被插入到队列的尾部。其他类型的队列可能会使用不同的放置规则。每个Queue的实现类都必须指定其排序属性。The {@link #offer offer} method inserts an element if possible,otherwise returning <tt>false</tt>.  This differs from the {@linkjava.util.Collection#add Collection.add} method, which can fail toadd an element only by throwing an unchecked exception.  The<tt>offer</tt> method is designed for use when failure is a normal,rather than exceptional occurrence, for example, in fixed-capacity(or &quot;bounded&quot;) queues.offer()会插入一个元素, 插入失败会返回false. 它和Collection接口中的add()不一样,add()只会在抛出UncheckedException时失败.offer()适用于插入失败并不是错误情况的情景, 比如说插入到一个固定大小或者有边界的队列中时.The {@link #remove()} and {@link #poll()} methods remove andreturn the head of the queue.Exactly which element is removed from the queue is afunction of the queue's ordering policy, which differs fromimplementation to implementation. The <tt>remove()</tt> and<tt>poll()</tt> methods differ only in their behavior when thequeue is empty: the <tt>remove()</tt> method throws an exception,while the <tt>poll()</tt> method returns <tt>null</tt>.The {@link #element()} and {@link #peek()} methods return, but donot remove, the head of the queue.remove()poll()方法会移除并返回队列的顶部元素. 确切的说, 到底会移除哪个元素取决于不同实现类的队列排序规则.remove()poll()只有一点不同, 就是在queue是空的情况下, remove()会抛出异常,poll()会返回null.element()peek()也会返回顶部元素, 但是不会移除它.The <tt>Queue</tt> interface does not define the <i>blocking queuemethods</i>, which are common in concurrent programming.  These methods,which wait for elements to appear or for space to become available, aredefined in the {@link java.util.concurrent.BlockingQueue} interface, whichextends this interface.Queue接口没有定义那些并发编程里常用的阻塞队列方法, 等待元素生产或者空间可用的那些相关方法被定义在了继承Queue的BlockingQueue接口里.<tt>Queue</tt> implementations generally do not allow insertionof <tt>null</tt> elements, although some implementations, such as{@link LinkedList}, do not prohibit insertion of <tt>null</tt>.Even in the implementations that permit it, <tt>null</tt> shouldnot be inserted into a <tt>Queue</tt>, as <tt>null</tt> is alsoused as a special return value by the <tt>poll</tt> method toindicate that the queue contains no elements.Queue的实现类一般不允许插入null元素, 尽管某些实现类并没有禁止null的插入, 比如说LinkedList. 不过就算允许插入null的这些实现类, 也不应该插入null, 因为有时候也会通过poll()是否返回null来判断queue是否是空的.<tt>Queue</tt> implementations generally do not defineelement-based versions of methods <tt>equals</tt> and<tt>hashCode</tt> but instead inherit the identity based versionsfrom class <tt>Object</tt>, because element-based equality is notalways well-defined for queues with the same elements but differentordering properties.Queue的实现类一般都不会定义基于元素的实体判断方法equals/hashCode, 而是直接继承Object里的基于对象身份的equals/hashCode方法. 这是因为基于对象的相等判断对于元素相同但是排序规则不同的Queue并不能很好的匹配.

总的来说, Queue队列额外提供了压入/弹出等操作, 这些操作执行的元素顺序由其实现类自定义.

入队出队检索区别
offer()poll()peek()执行失败时return false或null
add()remove()element()执行失败时抛出异常

Queue 接口的主要实现中 BlockingQueue 和 Deque 是接口

  1. BlockingQueue 定义了 并发编程常用的阻塞方法, 而Queue队列使用最多的还是在并发编程情况下, 所以核心在于BlockingQueue的实现类.
  2. Deque 是双端队列, 相当于Queue+Stack, 功能冗余, 一般不怎么用, 简单看下定义的额外方法
    头部入队|尾部入队|头部出队|尾部出队|头部检索|尾部检索|区别
    -|-|-|-|-|-|
    offerFirst(E)|offerLast(E)|pollFirst()|pollLast()|peekFirst()|peekLast()|失败返回false或null
    addFirst(E)|addLast(E)|removeFirst()|removeLast()|getFirst()|getLast()|失败抛出异常

而实现类主要分为三类:
1. 非线程安全: LinkedList, PriorityQueue 等;
2. 线程安全, 非阻塞: ConcurrentLinkedQueue;
3. 线程安全, 阻塞: BlockingQueue实现类: ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue

大体区别在此
从源码看Android常用的数据结构 ( SDK23版本 ) ( 三 , Queue篇)_第1张图片

下面主要看下 PriorityQueue/BlockingQueue/ConcurrentLinkedQueue相关实现类.


PriorityQueue

A PriorityQueue holds elements on a priority heap, which orders the elementsaccording to their natural order or according to the comparator specified atconstruction time. If the queue uses natural ordering, only elements that arecomparable are permitted to be inserted into the queue.The least element of the specified ordering is the first retrieved with{@link #poll()} and the greatest element is the last.A PriorityQueue is not synchronized. If multiple threads will have to accessit concurrently, use the {@link java.util.concurrent.PriorityBlockingQueue}.PriorityQueue使用有优先级的堆(其实还是一个数组)维护元素, 元素的排序规则根据自然顺序或者定义在构造方法里的比较器来确定. 如果queue使用了自然排序, 那么插入此queue的元素必须实现comparable接口.使用poll()弹出元素时, 最小的元素会优先弹出, 最大的元素排在最后.PriorityQueue不是同步的, 如果有多线程会同时访问的话, 请使用PriorityBlockingQueue.

从源码看, PriorityQueue 的优先级特性体现在这几个方法中, 下面在注释中进行说明

public class PriorityQueue<E> extends AbstractQueue<E> implements Serializable {/*** 构造方法中传入要使用的比较器, 这个比较器就是体现优先级的关键, 当然传入null的话就* 使用自然排序了*/public PriorityQueue(int initialCapacity, Comparator<? super E> comparator) {        if (initialCapacity < 1) {            throw new IllegalArgumentException("initialCapacity < 1: " + initialCapacity);        }        elements = newElementArray(initialCapacity);        this.comparator = comparator;    }/*** 压入一个元素, growToSize会实现扩容, 变为传入int值的两倍大小;* 压入成功之后, 把新元素排序.*/public boolean offer(E o) {        if (o == null) {            throw new NullPointerException("o == null");        }        // 扩容        growToSize(size + 1);        elements[size] = o;        // 排序        siftUp(size++);        return true;    }/*** 先把elements[index]替换成倒数第二个元素, 原来的elements[index]就已经被remove掉了;* 然后, 把原来的倒数第二个元素向后移, 把倒数第一个元素置空(因为已经少了一个了);* 如果原倒数第二个元素执行完后移操作之后仍然在原位置, 就往前移.*/private void removeAt(int index) {        size--;        E moved = elements[size];        elements[index] = moved;        siftDown(index);        elements[size] = null;        if (moved == elements[index]) {            siftUp(index);        }    }/*** 排序方法, 如果comparator为空, 使用E的自然排序, 如果实例化此类时没有传入* comparator, 元素类就必须实现comparable接口, 否则强转崩溃*/    private int compare(E o1, E o2) {        if (comparator != null) {            return comparator.compare(o1, o2);        }        return ((Comparable<? super E>) o1).compareTo(o2);    }/*** 使用二分法把新元素按大小排序, 前移* (elements[0]是最小的, 新插入元素本来在elements[size++]位置)*/    private void siftUp(int childIndex) {        E target = elements[childIndex];        int parentIndex;        while (childIndex > 0) {            parentIndex = (childIndex - 1) / 2;            E parent = elements[parentIndex];            if (compare(parent, target) <= 0) {            // 说明parent位置的元素小了                break;            }            // 继续向前查            elements[childIndex] = parent;            childIndex = parentIndex;        }        elements[childIndex] = target;    }/*** 向后移, 直到数组中间*/    private void siftDown(int rootIndex) {        E target = elements[rootIndex];        int childIndex;        while ((childIndex = rootIndex * 2 + 1) < size) {            if (childIndex + 1 < size                    && compare(elements[childIndex + 1], elements[childIndex]) < 0) {                childIndex++;            }            if (compare(target, elements[childIndex]) <= 0) {                break;            }            elements[rootIndex] = elements[childIndex];            rootIndex = childIndex;        }        elements[rootIndex] = target;    }}

BlockingQueue

阻塞队列.
先回答一个问题: 无界阻塞队列有什么问题? 因为阻塞队列一般都是用于生产者/消费者模式, 如果无界时生产者速度远超消费者, 队列会无限扩容, 最终可能会导致OOM.

A {@link java.util.Queue} that additionally supports operationsthat wait for the queue to become non-empty when retrieving anelement, and wait for space to become available in the queue whenstoring an element.BlockingQueue是一种特殊的Queue, 它额外支持了两类操作, 一是检索元素时会等待队列变为非空状态, 二是存储元素时会等待队列空间变为可用状态.{@code BlockingQueue} methods come in four forms, with different waysof handling operations that cannot be satisfied immediately, but may besatisfied at some point in the future:BlockingQueue的方法有四种形式, 提供了不同的可能会延迟生效的处理方法:one throws an exception, the second returns a special value (either{@code null} or {@code false}, depending on the operation), the thirdblocks the current thread indefinitely until the operation can succeed,and the fourth blocks for only a given maximum time limit before givingup.  These methods are summarized in the following table:一类抛出异常, 一类根据操作返回null或者false, 第三类会无限阻塞当前线程直到操作成功, 最后一类在第三类基础上设置了最长等待时间. 这些方法的简介列到下方的表格中(ps, 除了上方的方法外, 还有drainTo方法用于批量获取可用元素):
lockingQueue方法简介
抛出异常返回特殊值无限期阻塞会超时的阻塞
入队 add(e)offer(e)put(e)offer(e, time, unit)
出队remove()poll()take()poll(time, unit)
检索element()peek()not applicablenot applicable
A {@code BlockingQueue} does not accept {@code null} elements.Implementations throw {@code NullPointerException} on attemptsto {@code add}, {@code put} or {@code offer} a {@code null}.  A{@code null} is used as a sentinel value to indicate failure of{@code poll} operations.BlockingQueue不支持null元素, 子类中add()/put()/offer()方法都有可能抛出空指针异常, null只被用在poll()失败时返回的标志值A {@code BlockingQueue} may be capacity bounded. At any giventime it may have a {@code remainingCapacity} beyond which noadditional elements can be {@code put} without blocking.A {@code BlockingQueue} without any intrinsic capacity constraints alwaysreports a remaining capacity of {@code Integer.MAX_VALUE}.BlockingQueue可能是有界的, 它会维护一个remainingCapacity(剩余容量), 超过这个容量的后续元素只有先被阻塞. 没有限制容量的BlockingQueue的剩余容量永远是.Integer.MAX_VALUE.{@code BlockingQueue} implementations are designed to be usedprimarily for producer-consumer queues, but additionally supportthe {@link java.util.Collection} interface.  So, for example, it ispossible to remove an arbitrary element from a queue using{@code remove(x)}. However, such operations are in general<em>not</em> performed very efficiently, and are intended for onlyoccasional use, such as when a queued message is cancelled.BlockingQueue的实现类主要设计用途是生产者-消费者模式, 但也额外的提供了Collection的方法, 所以也有可能会执行类似于remove()之类的collection方法. 但是这些方法效率较差,只适合偶尔使用, 比如说取消queue消息时.{@code BlockingQueue} implementations are thread-safe.  Allqueuing methods achieve their effects atomically using internallocks or other forms of concurrency control. However, the<em>bulk</em> Collection operations {@code addAll},{@code containsAll}, {@code retainAll} and {@code removeAll} are<em>not</em> necessarily performed atomically unless specifiedotherwise in an implementation. So it is possible, for example, for{@code addAll(c)} to fail (throwing an exception) after addingonly some of the elements in {@code c}.BlockingQueue的实现类是线程安全的, 所有的队列方法都会利用内部锁或者其他形式的并发控制实现它们的操作原子化. 然而大多数的Collection方法, 例如addAll()/containsAll()/retainAll()/removeAll()等不一定会原子化执行, 除非有实现类另外规定. 所以有可能在例如addAll方法插入到一半的时候发生错误.A {@code BlockingQueue} does <em>not</em> intrinsically supportany kind of &quot;close&quot; or &quot;shutdown&quot; operation toindicate that no more items will be added.  The needs and usage ofsuch features tend to be implementation-dependent. For example, acommon tactic is for producers to insert special<em>end-of-stream</em> or <em>poison</em> objects, that areinterpreted accordingly when taken by consumers.BlockingQueue没有实质提供任何形式的用于结束元素添加的close或shutdown操作, 这种需求/使用一般都依赖于实现类, 通常的策略是生产者插入一个特殊的终止信号, 然后由消费者获取后特殊处理.Usage example, based on a typical producer-consumer scenario.Note that a {@code BlockingQueue} can safely be used with multipleproducers and multiple consumers.举个标准的生产者-消费者场景的例子, 注意BlockingQueue可以安全的用于多个生产者/多个消费者.class Producer implements Runnable {  private final BlockingQueue queue;  Producer(BlockingQueue q) { queue = q; }  public void run() {    try {      while (true) { queue.put(produce()); }    } catch (InterruptedException ex) { ... handle ...}  }  Object produce() { ... }}class Consumer implements Runnable {  private final BlockingQueue queue;  Consumer(BlockingQueue q) { queue = q; }  public void run() {    try {      while (true) { consume(queue.take()); }    } catch (InterruptedException ex) { ... handle ...}  }  void consume(Object x) { ... }}class Setup {  void main() {    BlockingQueue q = new SomeQueueImplementation();    Producer p = new Producer(q);    Consumer c1 = new Consumer(q);    Consumer c2 = new Consumer(q);    new Thread(p).start();    new Thread(c1).start();    new Thread(c2).start();  }}}<p>Memory consistency effects: As with other concurrentcollections, actions in a thread prior to placing an object into a{@code BlockingQueue}<a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>actions subsequent to the access or removal of that element fromthe {@code BlockingQueue} in another thread.内存一致性: 像其他并发Collctions一样, 在某线程中较先发生的插入操作 happen-before在其他线程较后发生的访问/删除操作(这是内存可见性).

官方注释写了很多, 总结一句话, 这个BlockingQueue是一种线程安全的阻塞队列, 提供了满入队/空出队时阻塞的方法.
它的实现类一般用在多线程下的生产消费场景.
下面看下BlockingQueue的三个主要实现类, ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue.

ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

LinkedBlockingQueue
基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。


ArrayBlockingQueue

类如其名, 利用数组实现的BlockingQueue, 按照惯例, 先看看官方的注释总览一下.

A bounded {@linkplain BlockingQueue blocking queue} backed by anarray.  This queue orders elements FIFO (first-in-first-out).  The<em>head</em> of the queue is that element that has been on thequeue the longest time.  The <em>tail</em> of the queue is thatelement that has been on the queue the shortest time. New elementsare inserted at the tail of the queue, and the queue retrievaloperations obtain elements at the head of the queue.使用数组实现的一种有界阻塞队列. 此类遵循先入先出规则排序, 头部是队列持有时间最长的元素, 尾部是持有时间最短的; 新的元素在尾部插入, 头部元素优先被检索.<p>This is a classic &quot;bounded buffer&quot;, in which afixed-sized array holds elements inserted by producers andextracted by consumers.  Once created, the capacity cannot bechanged.  Attempts to {@code put} an element into a full queuewill result in the operation blocking; attempts to {@code take} anelement from an empty queue will similarly block.这是一个经典的有界缓存, 它利用一个固定大小的数组持有由生产者插入、由消费者提取的元素. 一经创建, 队列大小不能在改变. 向一个满队列中put()会导致阻塞, 从一个空队列中take()也会造成同样的阻塞.<p>This class supports an optional fairness policy for orderingwaiting producer and consumer threads.  By default, this orderingis not guaranteed. However, a queue constructed with fairness setto {@code true} grants threads access in FIFO order. Fairnessgenerally decreases throughput but reduces variability and avoidsstarvation.这个类可以设置 等待的生产者消费者线程是否遵循公平排序策略, 默认不遵循. 在构造方法中, 可以传入true来保证这些等待线程按照FIFO规则来访问. 一般公平策略会降低吞吐量, 但会提高稳定性, 而且可以避免出现饿死情况(某些线程一直执行不到).<p>This class and its iterator implement all of the<em>optional</em> methods of the {@link Collection} and {@linkIterator} interfaces.这个类以及它的迭代器实现了所有Collection的可选方法以及Iterator的接口方法.

下面是类的结构, 红框内都是重点内容.
从源码看Android常用的数据结构 ( SDK23版本 ) ( 三 , Queue篇)_第2张图片

先来看下构造方法

    public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];        lock = new ReentrantLock(fair);        notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }

这里传入的int capacity是容量, ArrayBlockingQueue是有界队列, 不支持扩容, 与ArrayList中的初始容量是不同的; boolean fair用来设置可重入锁ReentrantLock的公平性, 如果是公平锁, 则阻塞的线程会遵循FIFO规则.
notEmptynotFull 是实现阻塞的重要因素, 详细介绍请看我的另一篇博客 ArrayBlockingQueue 中使用的 ReentrantLock.newCondition()

下面看下核心的BlockingQueue方法的实现

    /**    * 加锁, 数组满时返回false, 不满的话执行enqueue入队    */    public boolean offer(E e) {        if (e == null) throw new NullPointerException();        final ReentrantLock lock = this.lock;        lock.lock();        try {            if (count == items.length)                return false;            else {                enqueue(e);                return true;            }        } finally {            lock.unlock();        }    }    /**    * 数组满时使用 notFull 进入等待, 暂时释放lock锁    * (使用lock.lockInterruptibly()可中断锁, 以便wait过程中退出)    */    public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length)                notFull.await();            enqueue(e);        } finally {            lock.unlock();        }    }        /**    * 加锁, 数组为空时返回false, 不为空的话执行dequeue出队    */     public E poll() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            return (count == 0) ? null : dequeue();        } finally {            lock.unlock();        }    }    /**    * 数组满时使用 notEmpty进入等待, 暂时释放lock锁    * (使用lock.lockInterruptibly()可中断锁, 以便wait过程中退出)    */ public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0)                notEmpty.await();            return dequeue();        } finally {            lock.unlock();        }    }

在 put() 和 take() 中阻塞时会执行 notFull.await(); notEmpty.await(); , 那什么时候会停止 wait 继续执行呢, 就是在 notEmpty / notFull不满足时, 也就是新插入元素或者删除元素时, 具体在这两个方法中:

/** * 入队后说明当前数组不为空了, 会通过 notEmpty.signal() 通知一个 notEmpty.await(), 如果有很多线 * 程在await(), 只能等待下次插入元素时再通知另一个等待线程. * 注意, 只能在持有锁时调用. */private void enqueue(E x) {    final Object[] items = this.items;    items[putIndex] = x;    if (++putIndex == items.length) putIndex = 0;    count++;    notEmpty.signal();}/** * 与enqueue类似, 出队一个元素说明数组肯定不是满的了, 会通过 notFull.signal() 通知一个 notFull.await(). */private E dequeue() {    final Object[] items = this.items;    @SuppressWarnings("unchecked")    E x = (E) items[takeIndex];    items[takeIndex] = null;    if (++takeIndex == items.length) takeIndex = 0;    count--;    if (itrs != null)        itrs.elementDequeued();    notFull.signal();    return x;}

这里面的 count 作为元素数量就不用说了, 注意这里的两个 int 变量: takeIndex 和 putIndex, 它们定义了队列的头尾.
takeIndex 是头部, 下一次 take, poll, peek, remove操作执行的index; putIndex 是尾部, 下一次put, offer, add执行的index.
takeIndex 和 putIndex 设计非常巧妙, enqueue 和dequeue 通过对 takeIndex 和 putIndex 的维护, 避免了数组平移操作, 提高了读写效率.


LinkedBlockingQueue##

看一下官方注释, 这里省略了大量与 ArrayBlockingQueue 一致的说明.

An optionally-bounded {@linkplain BlockingQueue blocking queue} based onlinked nodes.基于Node链表实现的一种可选边界的阻塞队列.Linked queues typically have higher throughput than array-based queues butless predictable performance in most concurrent applications.链表队列通常比基于数组的队列的吞吐量更高, 但是在大多数高并发场景下可预测性较差.The optional capacity bound constructor argument serves as away to prevent excessive queue expansion. The capacity, if unspecified,is equal to {@link Integer#MAX_VALUE}.  Linked nodes aredynamically created upon each insertion unless this would bring thequeue above capacity.构造方法中的可选容量限制参数用于防止队列过度扩容. 如果不指定容量的话, 默认为Integer.MAX_VALUE. 如果不会超过队列容量的话, 链表的node会在插入元素时自动创建.

LinkedBlockingQueue 与 ArrayBlockingQueue 十分类似, 主要区别有两点:

  1. ArrayBlockingQueue 基于 数组, 而 LinkedBlockingQueue 是基于链表.

    transient Node<E> head;  // head.item == null, 可以理解为ArrayBlockingQueue的takeIndexprivate transient Node<E> last; // last.next == null, 可以理解为ArrayBlockingQueue的putIndexprivate void enqueue(Node<E> node) {    last = last.next = node;}private E dequeue() {    Node<E> h = head;    Node<E> first = h.next;    h.next = h; // help GC    head = first;    E x = first.item;    first.item = null;    return x;}
  2. ArrayBlockingQueue 只使用了一把锁, 两个Condition都是使用一把锁创建.

    lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();

    LinkedBlockingQueue 使用了两把锁, 两个Condition分别创建.

    private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();

    而且 LinkedBlockingQueue 的 count 使用的 AtomicInteger, ArrayBlockingQueue 仅用了 int. 之所以这样设计, 是为了生产者和消费者的读写操作可以并行执行.
    因为 LinkedBlockingQueue 会频繁创建 node, 在高并发的情景下与 ArrayBlockingQueue 相比效率较差, 所以使用了读写锁分开的设计.

    参考: https://blog.csdn.net/chichuduxing/article/details/69298730

下面放上put()和take()的源码, 与 ArrayBlockingQueue 基本类似.

    public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        int c = -1;        Node<E> node = new Node<E>(e);        final ReentrantLock putLock = this.putLock;        final AtomicInteger count = this.count;        putLock.lockInterruptibly();        try {            while (count.get() == capacity) {                notFull.await();            }            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();        } finally {            putLock.unlock();        }        // 个人猜测可能是因为c = count.getAndIncrement()时返回的数据已经是被take()等方法减小了        if (c == 0)            signalNotEmpty();    }    public E take() throws InterruptedException {        E x;        int c = -1;        final AtomicInteger count = this.count;        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();        try {            while (count.get() == 0) {                notEmpty.await();            }            x = dequeue();            c = count.getAndDecrement();            if (c > 1)                notEmpty.signal();        } finally {            takeLock.unlock();        }        if (c == capacity)            signalNotFull();        return x;    }

PriorityBlockingQueue##

An unbounded {@linkplain BlockingQueue blocking queue} that usesthe same ordering rules as class {@link PriorityQueue} and suppliesblocking retrieval operations.  While this queue is logicallyunbounded, attempted additions may fail due to resource exhaustion(causing {@code OutOfMemoryError}). This class does not permit{@code null} elements.  A priority queue relying on {@linkplainComparable natural ordering} also does not permit insertion ofnon-comparable objects (doing so results in{@code ClassCastException}).一种使用类似PriorityQueue的排序规则并提供了检索阻塞操作的无界阻塞队列. 虽然此队列逻辑上是无界的, 但尝试插入时也有可能会因为资源耗尽抛出OOM错误导致错误.此类不允许插入null, 同时基于自然Comparable排序的PriorityBlockingQueue也不允许插入非可比较对象, 否则会抛出ClassCastException错误.This class and its iterator implement all of the<em>optional</em> methods of the {@link Collection} and {@linkIterator} interfaces.  The Iterator provided in method {@link#iterator()} is <em>not</em> guaranteed to traverse the elements ofthe PriorityBlockingQueue in any particular order. If you needordered traversal, consider using{@code Arrays.sort(pq.toArray())}.  Also, method {@code drainTo}can be used to <em>remove</em> some or all elements in priorityorder and place them in another collection.此类及其迭代器实现了所有的Collection/Iterator的可选方法. iterator()提供的迭代器在遍历PriorityBlockingQueue的元素时不能保证任何顺序, 如果需要按顺序遍历的话, 考虑使用 Arrays.sort(pq.toArray()) 先进行排序. 另外, drainTo()方法可以用来剪切部分元素到另外一个collection中.<p>Operations on this class make no guarantees about the orderingof elements with equal priority. If you need to enforce anordering, you can define custom classes or comparators that use asecondary key to break ties in primary priority values.  Forexample, here is a class that applies first-in-first-outtie-breaking to comparable elements. To use it, you would insert a{@code new FIFOEntry(anEntry)} instead of a plain entry object.这个类的操作不能保证焗油相同优先级的元素的顺序, 如果需要强制排序的话, 你可以自定义一个类或者比较器, 使用额外的key来打破主要的优先级关系. 比如有支持FIFO的类可以打破比较规则, 使用它的时候需要插入new FIFOEntry(anEntry)来代替普通的元素类.

简单地说, PriorityBlockingQueue 是一种无界、提供读阻塞的PriorityQueue, 它的排序规则与PriorityQueue类似.

private transient Object[] queue;private PriorityQueue<E> q; // 序列化时使用PriorityQueueprivate transient Comparator<? super E> comparator;private final ReentrantLock lock;private final Condition notEmpty; // 只有空列表时的读阻塞

因为其无界的特性, PriorityBlockingQueue 只有在队列为空时才会有读取阻塞, 插入元素时不会阻塞.

public boolean add(E e) {        return offer(e);    }    public void put(E e) {        offer(e); // never need to block    }    public boolean offer(E e, long timeout, TimeUnit unit) {        return offer(e); // never need to block    }    public boolean offer(E e) {        if (e == null)            throw new NullPointerException();        final ReentrantLock lock = this.lock;        lock.lock();        int n, cap;        Object[] array;        while ((n = size) >= (cap = (array = queue).length))            tryGrow(array, cap);        try {            Comparator<? super E> cmp = comparator;            if (cmp == null)                siftUpComparable(n, e, array);            else                siftUpUsingComparator(n, e, array, cmp);            size = n + 1;            notEmpty.signal();        } finally {            lock.unlock();        }        return true;    }

读操作阻塞, 与ArrayBlockingQueue基本一致, 这里只看不大一样的dequeue()方法, 出队后需要排序.

    private E dequeue() {        int n = size - 1;        if (n < 0)            return null;        else {            Object[] array = queue;            E result = (E) array[0];            E x = (E) array[n];            array[n] = null;            Comparator<? super E> cmp = comparator;            if (cmp == null)                siftDownComparable(0, x, array, n);            else                siftDownUsingComparator(0, x, array, n, cmp);            size = n;            return result;        }    }

入队/出队时各有两个排序方法. 入队时sifUp, 出队时siftDown.

void siftUpComparable(int k, T x, Object[] array) {...}void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {...}void siftDownComparable(int k, T x, Object[] array, int n) {...}void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp){        if (n > 0) {            int half = n >>> 1;            while (k < half) {                int child = (k << 1) + 1;                Object c = array[child];                int right = child + 1;                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)                    c = array[child = right];                if (cmp.compare(x, (T) c) <= 0)                    break;                array[k] = c;                k = child;            }            array[k] = x;        }    }

ConcurrentLinkedQueue

ConcurrentLinkedQueue 并发链表, 同 LinkedBlockingQueue 一样也是单边链表形式.
ConcurrentLinkedQueue 是无界的, 读写均不会阻塞.
ConcurrentLinkedQueue 与前面阻塞队列的最大不同在于 ConcurrentLinkedQueue 没有使用锁机制, 而是使用 Unsafe 类的 CAS 方法来保证线程安全的, 所以如果没有阻塞需求的话, 使用 ConcurrentLinkedQueue 会有更好的效率.

链表节点类, 内部使用Unsafe来执行读写

    private static class Node<E> {        volatile E item;        volatile Node<E> next;        Node(E item) {            UNSAFE.putObject(this, itemOffset, item);        }        boolean casItem(E cmp, E val) {            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);        }        void lazySetNext(Node<E> val) {            UNSAFE.putOrderedObject(this, nextOffset, val);        }        boolean casNext(Node<E> cmp, Node<E> val) {            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);        }                private static final sun.misc.Unsafe UNSAFE;        private static final long itemOffset;        private static final long nextOffset;        static {            try {                UNSAFE = sun.misc.Unsafe.getUnsafe();                Class<?> k = Node.class;                itemOffset = UNSAFE.objectFieldOffset                    (k.getDeclaredField("item"));                nextOffset = UNSAFE.objectFieldOffset                    (k.getDeclaredField("next"));            } catch (Exception e) {                throw new Error(e);            }        }    }

Queue核心方法

// 入队public boolean offer(E e) {        checkNotNull(e);        final Node<E> newNode = new Node<E>(e);        for (Node<E> t = tail, p = t;;) {            Node<E> q = p.next;            if (q == null) {                // p is last node                if (p.casNext(null, newNode)) {                    if (p != t)                        casTail(t, newNode);                    return true;                }                // Lost CAS race to another thread; re-read next            } else if (p == q)                 p = (t != (t = tail)) ? t : head;            else                 p = (p != t && t != (t = tail)) ? t : q;        }    }// 出队    public E poll() {        restartFromHead:        for (;;) {            for (Node<E> h = head, p = h, q;;) {                E item = p.item;                if (item != null && p.casItem(item, null)) {                    if (p != h) // hop two nodes at a time                        updateHead(h, ((q = p.next) != null) ? q : p);                    return item;                } else if ((q = p.next) == null) {                    updateHead(h, p);                    return null;                } else if (p == q)                    continue restartFromHead;                else                    p = q;            }        }    }    // 检索    public E peek() {        restartFromHead:        for (;;) {            for (Node<E> h = head, p = h, q;;) {                E item = p.item;                if (item != null || (q = p.next) == null) {                    updateHead(h, p);                    return item;                } else if (p == q)                    continue restartFromHead;                else                    p = q;            }        }    }

更多相关文章

  1. ImageView之android:tint=" "属性方法作用详解
  2. C#/IOS/Android通用加密解密方法
  3. WebView的使用之Android与JS通过WebView互调方法
  4. Android APK文件在电脑上面运行方法
  5. 获取Android SDK 源代码并在Eclipse中关联查看的方法--转
  6. android中遍历arrayList的四种方法
  7. Android获取状态栏高度的方法

随机推荐

  1. Android(安卓)FFMpeg应用实例(一):利用CM
  2. 《Android》Android(安卓)Studio打开项目
  3. Android入门第十六篇之Style与Theme
  4. 深度定制的linux,用户体验不输android(视频
  5. Android++:为Android(安卓)App开发而生的V
  6. Android(安卓)--- 使用SAX读取xml文件
  7. 创建android文件系统
  8. android系统学习笔记三
  9. Android广播管理二--广播注册(registerRec
  10. 友盟2011第三季度Android数据报告