Java阻塞队列理解

这篇内容都是基于自己的一些理解和总结,由于个人能力有限,可能会存在偏差之处,如有发现,欢迎指正!

1.概述

阻塞队列是一种队列,主要还是在多线程情况下使用,主要的应用场景是生产者消费者模型、线程池和消息中间件中,因为今天刚学习了线程池的知识,所以对阻塞队列的应用很有感触,在创建线程池的构造方法的七大参数里面,其中一项就是阻塞队列:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

当提交的任务不能得到立即执行时,线程池就会将提交的任务放到阻塞队列中暂存,比如下面的代码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newFixedThreadPool使用可LinkedBlockingQueue这种阻塞队列

当然线程池的知识这里只是简单提一下,后面会写一篇新的文章专门总结线程池,这篇文章主要还是以阻塞队列为主


使用阻塞队列时,主要还是在多线程情况使用,线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素

  1. 当阻塞队列是空时,从队列中获取元素的操作将被阻塞。
  2. 当阻塞队列是满时,往队列里添加元素的操作将被阻塞

2.BlockingQueue接口

阻塞队列在Java中具体对应的api就是BlockingQueue接口,我们来看一下它的结构:

这里做一个笔记:使用ctrl + alt + u查看类结构真香

BlockingQueue接口是Queue接口的一个子接口,而ArrayBlockingQueue是它的一个实现类,我们来看一下它的全部实现类:(使用F4查看)

那么这个接口中定义了哪些方法呢:

img

这一看定义的方法还真不少,对于向阻塞队列中增加或删除的操作,足足定义了四套方法,我们来看一下:

操作类型Throws ExceptionSpecial ValueBlockedTimed out
插入add(o)offer(o)put(o)offer(o, timeout, unit)
取出(删除)remove(o)poll()take()poll(timeout, unit)
  • Throws Exception 类型的插入和取出在不能立即被执行的时候就会抛出异常。

  • Special Value 类型的插入和取出在不能被立即执行的情况下会返回一个特殊的值(true 或者 false)

  • Blocked 类型的插入和取出操作在不能被立即执行的时候会阻塞线程直到可以操作的时候会被其他线程唤醒

  • Timed out 类型的插入和取出操作在不能立即执行的时候会被阻塞一定的时候,如果在指定的时间内没有被执行,那么会返回一个特殊值

3.为什么要使用阻塞队列

在使用一个技术之前,了解为什么会产生这个技术是一个好的习惯,那么为什么会产生阻塞队列呢?

这需要在具体的应用场景中来理解,例如在生产者消费者模式中,上面已经提到,如果消费者速度远大于生产者速度,没有可以消费的东西了,那么消费者就要进入阻塞状态来等待生产者生产出新的东西,反之亦然。这里面就需要使用到阻塞队列,这个在concurrent包中的数据结构可以自动控制线程状态的转换,可以将线程由运行状转换为阻塞状态,也可以将阻塞的线程进行唤醒,如果没有这个数据结构,那么其中的细节都需要程序员自己造轮子,那么我们就不仅要考虑线程安全问题,还要考虑效率问题

下面附上我们自己手动进行同步的生产者消费者模式代码:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ShareData{  //资源类
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws Exception{
        lock.lock();
        try{
            //        1.判断
            while(number !=0){
//            等待,不能生产
                condition.await();
            }
            //2.干活
            number++;
            System.out.println(Thread.currentThread().getName()+"\t"+number);
            //3.通知唤醒
            condition.signalAll();
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

    public void decrement() throws Exception{
        lock.lock();
        try{
            //        1.判断
            while(number ==0){
//            等待,不能生产
                condition.await();
            }
            //2.干活
            number--;
            System.out.println(Thread.currentThread().getName()+"\t"+number);
            //3.通知唤醒
            condition.signalAll();
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

}

/*
* 题目:一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮
*
* 1.线程操作资源类
* 2.判断 干活 通知
* 3.防止虚假唤醒机制
* */
public class ProdConsumer_TraditionDemo {
    public static void main(String[] args){
        ShareData shareData = new ShareData();

        new Thread(()->{
            for(int i=1;i<=5;i++){
                try {
                    shareData.increment();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        },"AAA").start();

        new Thread(()->{
            for(int i=1;i<=5;i++){
                try {
                    shareData.decrement();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        },"BBB").start();
    }
}

在这个例子中我们可以看到,是手动使用lock锁和await()方法等方式来保证线程状态的转换以及线程安全性的,但是使用阻塞队列就不一样了,一切自动控制,大大解放了开发者的工作量


在另外一个场景中,阻塞队列还应用于线程池中,可以在阻塞队列中保存等待执行的任务,关于具体细节在另外一篇文章中讲述,这里主要就是为了说明阻塞队列的好处

4.ArrayBlockingQueue

接下来我们专门看一下这个类,底层使用一个有界的数组作为存储介质,主要有三个构造方法:


public ArrayBlockingQueue(int capacity): // 初始化数组大小

public ArrayBlockingQueue(int capacity, boolean fair): //初始化数组大小,并且设定是否是fire模式
 
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) //初始化数组大小,设置是否是fair模式,然后使用一个集合初始化阻塞队列

capacity定义了阻塞列表使用的数组长度,而fair定义了阻塞队列的一种策略选择,因为底层使用的是ReentranLock锁,而这个锁是可以在构造方法中通过fair指定是否为公平锁,关于ReentrantLock的细节不在本文的叙述范围,关于java锁的详细内容将在其他的篇章中单独学习分析


下面是ArrayBlockingQueue的一些关键成员变量:


    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

具体关于变量的解释在注释中已经说明,可以看到阻塞队列使用ReentranLock来做同步,使用两个Condition变量来进行插入和取出操作的同步

我们重点来看一下enqueue()和dequeue()这两个方法:

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

首先将putIndex的位置放上元素,然后进行++putIndex操作更新索引,最为重要的是notEmpty.signal();这句话会唤醒一个等待notEmpty变量的线程,因为notEmpty这个变量表示阻塞队列是否为空,插入数据势必会让队列不为空,而在插入数据之前,如果队列为空,可能会有线程已经尝试来获取数据了,那么就会等待在这个条件变量上面,那么当插入数据之后,需要唤醒这些线程,为了减少不必要的麻烦,这个条件变量在插入一个数据之后仅仅唤醒一个等待在这个条件变量上的线程。

还有一点需要注意,成员变量有两个参数,putIndex和takeIndex,配合这两个变量之后数组的使用就像唤醒队列一样,注意,可能有人担心会有一种情况,队列满了之后没有消费线程,再次插入第一个队列的元素会被覆盖吗?这就多虑了,具体我们看下面的代码:


    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

上面的代码展示了put操作的细节,可以很明显的看到,当数组中的元素数量达到设定的容量之后就会在notFull这个条件变量上等待,而不会再次调用enqueue这个方法来插入,所以不会出现上面的那种情况。


下面来看dequeue()方法:


       private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

上面的代码展示了获取一个队列元素的方法细节。依然需要关注的是notFull.signal(),这句话的意思是:唤醒一个等待在notFull这个条件变量上的线程。具体的语义是什么呢?就是,有可能有线程在进行插入操作的时候会发现队列被用完了,那么就会阻塞到notFull这个条件变量上,当某个线程获取了一个元素之后,队列就有空闲的空间可以插入了,那么就可以唤醒一个等待在这个条件变量上的线程了,具体就是唤醒一个等待插入的线程开始活动。


查看源码之后可以发现,ArrayBlockingQueue中的offer()和put()方法底层都是调用了enqueue()方法,poll()和take()都是调用了dequeue()方法,只不过是出现队列满和队列空时的处理情况不一样,下面来分别看一下:

  • put

put方法的内容上文中已经提到过,在此不再罗列,它是一个阻塞方法,在操作不能立刻得到执行的时候会阻塞等待。具体的就是,如果发现队列使用的数组没有可用的容量了,那么就等待在一个条件变量上,而这个条件变量需要在有空闲空间的时候唤醒等待在他上面的线程。

  • offer()

这个方法在插入操作失败时就返回false,插入成功就返回true


  public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
  • take()

take操作会在获取元素失败的时候阻塞直达有线程唤醒它。下面是具体的细节:


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

  • peek()

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

peek操作会取得队列头的内容,但是不会将其从队列中删除,下次peek还是同样的内容。

关于ArrayBlockingQueue的内容就分析到这里,这是一个最为简单的阻塞队列实现,对ArrayBlockingQueue的分析也较为细节,下面分析LinkedBlockingQueue的时候有的内容就会一笔带过,因为LinkedBlockingQueue和ArrayBlockingQueue事一样的,只是ArrayBlockingQueue使用数组做队列,而LinkedBlockingQueue使用链表做队列。如何选择哪种阻塞队列和如何选择数组和链表这两种数据结构的思路是一样的,对于频繁进行队列元素获取操作的场景下,首选ArrayBlockingQueue,而在需要频繁进行队列元素删除、添加的场景下首选LinkedBlockingQueue。

5.LinkedBlockingQueue

主要还是提供了三个构造方法:

public LinkedBlockingDeque()
public LinkedBlockingDeque(int capacity)  
public LinkedBlockingDeque(Collection<? extends E> c)    

也是可以通过capacity进行指定容量的


下面展示了LinkedBlockingQueue的关键成员变量:


/** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

需要注意的是,LinkedBlockingQueue的插入操作和获取数据的操作使用了不同的锁,而ArrayBlockingQueue进行插入和操作只使用了一个锁

还有和ArrayBlockingQueue不同的是,有两个可以操作条件变量的方法:


    /**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

signalNotEmpty方法首先获取takeLock,然后对等待在notEmpty这个条件变量上的某个线程进行唤醒操作。而signalNotFull则首先获取putLock,然后对等待在notFull这个条件变量上的某个线程进行唤醒操作。和ArrayBlockingQueue一样,LinkedBlockingQueue也提供了两个插入队列和从队列获取元素的方法:


    /**
     * Links node at end of queue.
     *
     * @param node the node
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }


相比于ArrayBlockingQueue更为简单了,只是一些链表操作,新的元素将被放到链表的尾部,而获取元素的操作将从链表首部获取节点。

下面的代码展示了put操作的细节:


    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary for space to become available.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

如果队列容量已满,那么该线程就会阻塞在notFull这个条件变量上等待唤醒。

在最后的if(c == 0)中,如果队列以前为空,此时插入第一个元素,那么有可能在插入元素之前已经有线程试图获取元素,那么这个试图获取元素的线程就会阻塞住,所以需要告诉它们中的一个,现在有数据可以获取了,可以醒来消费数据了。

下面的代码展示了take方法的细节:


    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

和put方法相反,当队列里面没有数据的时候就会阻塞等待,获取元素成功之后会唤醒那些希望插入数据而被阻塞的线程(唤醒一个线程)


算上这两种阻塞队列,一共有七种,因为这两个是最经常使用的,所以我们上面使用了很大的篇幅用来详细介绍,其他不做详细的说明了,如果有需要用到,那么会逐一的了解,这里对其他阻塞队列只是有一个概念性的理解就好了,如图:

image

6.SynchronousQueue

这里还有一个阻塞队列不得不说,在线程池中也是被使用到,而且也非常特殊

  • SynchronousQueue没有容量。

  • 与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。

  • 每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

具体的详细原理,请参考文章:java并发之SynchronousQueue实现原理


参考文章 :https://www.jianshu.com/p/4028efdbfc35

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://hadoo666.top/archives/java阻塞队列理解md