学习数据结构的时候介绍过队列,今天介绍一种队列的其中一种,叫做阻塞队列。这个知识点属于多线程中的一个模块,对于我们理解消息中间件有份非常大的用处,希望对你有帮助。

一、什么是阻塞队列

1、概念理解

队列比较好理解,数据结构中我们都接触过,先进先出的一种数据结构,那什么是阻塞队列呢?从名字可以看出阻塞队列其实也就是队列的一种特殊情况。举个例子来说明一下吧,我们去餐馆吃饭,一个接一个的下单,这时候就是一个普通的队列,万一这家店生意好,餐馆挤满了人,这时候肯定不能把顾客赶出去,于是餐馆就在旁边设置了一个休息等待区。这就是一个阻塞队列了。我们使用一张图来演示一下:

图片


2、特点

从上面这张图我们会发现这样的规律:

(1)当阻塞队列为空时,从队列中获取元素的操作将会被阻塞,就好比餐馆休息区没人了,此时不能接纳新的顾客了。换句话,肚子为空的时候也没东西吃。

(2)当阻塞队列满了,往队列添加元素的操作将会被阻塞,好比餐馆的休息区也挤满了,后来的顾客只能走了。

从上面的概念我们类比到线程中去,我们会发现,在某些时候线程可能不能不阻塞,因为CPU内核就那么几个,阻塞现状更加说明了资源的利用率高,换句话来说,阻塞其实是一个好事。

阻塞队列应用最广泛的是生产者和消费者模式,待会也会给出一个实际案例演示一下。

还有一点,就是我们看这个阻塞队列有点线程池的感觉,其实基本上可以这样理解,阻塞队列在线程池中确实有着很大的应用。我们可以给出俩例子:

public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(nThreads, nThreads,
                   0L, TimeUnit.MILLISECONDS,
                   new LinkedBlockingQueue<Runnable>());
    }
public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                   60L, TimeUnit.SECONDS,
                   new SynchronousQueue<Runnable>());
}

前面说了这么久,来个标准点的定义吧:

在多线程中,阻塞的意思是,在某些情况下会挂起线程,一旦条件成熟,被阻塞的线程就会被自动唤醒。

也就是说,之前线程的wait和notify我们程序员需要自己控制,但有了这个阻塞队列之后我们程序员就不用担心了,阻塞队列会自动管理。

欧了,我们对概念先认识到这,我们从代码中看看,毕竟面试中X疼的就是代码。

二、常见的BlockQueue方法

BlockQueue接口继承自collection接口。他的主要实现方法比较多。我们分类来看一下:

图片

方法就这些,这些方法一个一个看和演示的话我觉得有点傻,参照网络上别人的一些博客也对其进行了分类:

根据插入和取出两种类型的操作,具体分为下面一些类型:

操作类型抛出异常特殊值阻塞现象时间超时
插入add(o)offer(o)put(o)offer(o, timeout, unit)
取出(删除)remove(o)poll()take()poll(timeout, unit)
  • 抛出异常:这时候插入和取出在不能立即被执行的时候就会抛出异常。

  • 特殊值:插入和取出在不能被立即执行的情况下会返回一个特殊的值(true 或者 false)

  • 阻塞:插入和取出操作在不能被立即执行时会阻塞线程,直到条件成熟,被其他线程唤醒

  • 超时:插入和取出操作在不能立即执行的时候会被阻塞一定的时候,如果在指定的时间内没有被执行,那么会返回一个特殊值。

单单从操作的维度来看的话,还是会有点乱,因为有些方法是阻塞方法,有些方法不是,我们从阻塞和不阻塞的维度再来一次划分:

非阻塞方法阻塞方法
add(E e)put(E e)
remove()take()
offer(E e)offer(E e,long timeout, TimeUnit unit)
poll()poll(long timeout, TimeUnit unit)
peek()

现在我们再来看,相信会比较清楚一点,不过需要注意一些特殊的情况,比如offer和poll,以是否包含超时时间来区分是否阻塞。

三、常见的阻塞队列

实现了BlockQueue接口的队列有很多,常见的没有几种,我们使用表格的形式给列出来,对比着分析一下:

队列名作用
ArrayBlockingQueue由数组结构组成的有界阻塞队列
LinkedBlockingQueue由链表结构组成的有界阻塞队列(默认为Integer.MAX_VALUE)
PriorityBlockingQueue支持优先级排序的***阻塞队列
DelayQueue使用优先级队列实现的延迟***阻塞队列
SynchronousQueue不存储元素的阻塞队列,也即单个元素的队列
LinkedTransferQueue由链表结构组成的***阻塞队列
LinkedBlockingDeque由链表结构组成的双向阻塞队列

常见的几种已经加粗了。

ArrayBlockingQueue和LinkedBlockingQueue是最为常用的阻塞队列,前者使用一个有边界的数组来作为存储介质,而后者使用了一个没有边界的链表来存储数据。

PriorityBlockingQueue是一个优先阻塞队列。所谓优先队列,就是每次从队队列里面获取到的都是队列中优先级最高的,对于优先级,PriorityBlockingQueue需要你为插入其中的元素类型提供一个Comparator,PriorityBlockingQueue使用这个Comparator来确定元素之间的优先级关系。底层的数据结构是堆,也就是我们数据结构中的那个堆。

DelayQueue是一个延时队列,所谓延时队列就是消费线程将会延时一段时间来消费元素。

SynchronousQueue是最为复杂的阻塞队列。SynchronousQueue和前面分析的阻塞队列都不同,因为SynchronousQueue不存在容量的说法,任何插入操作都需要等待其他线程来消费,否则就会阻塞等待,看到这种队列心里面估计就立马能联想到生产者消费者的这种模式了,没错,就可以使用这个队列来实现。

现在,我们已经把阻塞队列的一些基本知识点介绍了,完全带细节的介绍费时又费力,下面我们针对某个阻塞队列来看一下原理,其实就是看看源码是如何实现的。

四、原理

我们以ArrayBlockingQueue为例,以下源码均来自jdk1.8。还是以变量、构造函数、普通函数的顺序来看:

1、变量

//The queued items:底层以数组来存储元素 
private final E[] items;
//takeIndex和putIndex分别表示队首元素和队尾元素的下标
private int takeIndex;
private int putIndex;
//count表示队列中元素的个数。
private int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/

/** Main lock guarding all access:可重入锁 */
private final ReentrantLock lock;
//notEmpty和notFull是等待条件
private final Condition notEmpty;
private final Condition notFull;

变量的作用基本上就是这样,我们再来接着看构造函数

2、构造函数

//1、指定队列的容量
public ArrayBlockingQueue(int capacity) {}
//2、不仅指定容量,也指定了是否公平
public ArrayBlockingQueue(int capacity, boolean fair) { }
//3、容量、公平性而且还可以对另外一个集合进行初始化
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c)
 
{}

上面的这些其实都是为了给其他操作做铺垫。

3、put函数

   /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

首先检查是否为空,从这个方法中我们可以看到,首先检查队列是否为空,然后获取锁,判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。当被其他线程唤醒时,通过enqueue(e)方法插入元素,最后解锁。

我们按照这个源码来看,真正实现插入操作的是enqueue,我们跟进去看看:

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

就几行代码,就是一个正常的移动数组插入的过程,不过最后还要再通知一下队列,插入了元素,此时的队列就不为空了。

4、take元素

还是看源码

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

take的这个操作根据put反过来看就可以,真正实现的是dequeue,跟进去看看:

   /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

取出的时候也是一样,数组少一个元素,数量少一,最后通过队列不为空。其他的就不详述了。

最后我们看看使用。我们举一个生产者消费者的例子,毕竟这个是一个面试考点:

五、应用

生产者消费者模式的实现方式超级多,比如volatile、CAS、AtomicInteger等等,这次我们就使用阻塞队列来实现一下:

public class Data {
    //flag表示是否生产,默认生产
    private volatile boolean flag = true;
    //aInteger表示产品 
    private AtomicInteger aInteger = new AtomicInteger();
    BlockingQueue<Object> queue = null;
    public Data(BlockingQueue<Object> queue) {
        this.queue = queue;
    }
    public void produce() throws Exception{
        String data = null;
        boolean retValue;
        while(flag){
            data = aInteger.incrementAndGet()+"";
            retValue = queue.offer(data, 2L, TimeUnit.SECONDS);
            System.out.println(Thread.currentThread().getName()+" 插入的结果是:"+retValue);
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+" 休息一会,马上回来");
    }
    public void consumer() throws Exception{
        Object result = null;
        while(flag){
            result = queue.poll(2L, TimeUnit.SECONDS);
            if(result==null || ((String) result).equalsIgnoreCase("")){
                flag = false;
            }

            System.out.println(Thread.currentThread().getName()+" 消费资源成功");
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

验证就比较简单,我们新建几个生产线程和几个消费线程即可。在这里就不贴代码了。


更多相关文章

  1. Java线程之线程的调度-让步
  2. 栈和队列就是这么简单
  3. 阻塞队列 BlockingQueue
  4. java中的几个线程池的使用
  5. (不谈废话,只有干货)解决线程间协作问题的工具类Exchanger详解
  6. 一文看懂 Node.js 中的多线程和多进程[每日前端夜话0x107]
  7. 如何停止一个线程池?
  8. 如何创建、启动 Java 线程?

随机推荐

  1. 一个简单的基于XML的模块集成框架
  2. 从无到有实现一个xml数据库登录验证
  3. Xml_javascript分页
  4. 使用xml作为数据源,配合asp:Menu类自由扩
  5. 创建带有关联的 XML 架构的 XML 文件 &&
  6. 利用XSLT把ADO记录集转换成XML
  7. XPath 11个实例
  8. XML 问题: 超越DOM(轻松使用 DOM 的技巧
  9. 从xml中获取城市,省份名称
  10. 使用 XML 文件记录操作日志