首页 > 技术文章 > 朝花夕拾系列文章 >

线程基础(二十二)-并发容器-ArrayBlockingQueue(下)

更新时间:2018-12-05 | 阅读量(1,351)

本文作者:王一飞,叩丁狼高级讲师。原创文章,转载请注明出处 #####概念 ArrayBlockingQueue 是一个有界阻塞的队列。有界原因是它底层维护了一个数组,初始化时,可以直接指定。要注意,一旦创建成功后,数组将无法进行再扩容。而阻塞是因为它对入列出列做了加锁处理,如果队列满了,再入列则需要阻塞等待, 如果队列是空的,出列时也需要阻塞等待。 ArrayBlockingQueue 底层是一个有界数组,遵循FIFO原则,对进入的元素进行排序,先进先出。 ArrayBlockingQueue 使用ReentrantLock锁,再配合两种Condition实现队列的线程安全操作。并发环境下ArrayBlockingQueue 使用频率较高 ArrayBlockingQueue 支持公平与非公平2种操作策略,在创建对象时通过构造函数将fair参数设置为true/false即可,需要注意的是,如果fair设置为false,表示持有公平锁,这种操作会降低系统吞吐量,慎用。 ![外部结构](https://upload-images.jianshu.io/upload_images/11401799-894dd6fa05aeeb84.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 内部结构 ```java public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { final Object[] items; //存放元素数组 final ReentrantLock lock; //互斥锁对象 private final Condition notEmpty; //非空条件变量 private final Condition notFull; //非满条件变量 .... } ``` 从内部结构源码上看,ArrayBlockingQueue 内部维护一个final数组,当队列初始化后将无法再进行拓展,保证队列的有界性。lock 互斥锁,在出队入队中保证线程的安全。而notEmpty 跟 notFull 条件变量保证队列在满队时入队等待, 当队列空列时,出队等待。 ######初始化 ```java //参数1:队列初始长度 //参数2:是否为公平队列 fasle: 是, true 不是 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } ``` ```java public ArrayBlockingQueue(int capacity) { this(capacity, false); } ``` ```java //参数3:队列初始化元素 public ArrayBlockingQueue(int capacity, boolean fair, Collection c) { ..... } ``` ArrayBlockingQueue 有3个构造器,核心是2个参数的构造器, capacity表示队列初始化长度, fair 指定ArrayBlockingQueue是公平队列还是非公平队列。 ######入列 ArrayBlockingQueue 入列方式有大体三种: ```java public class App { public static void main(String[] args) throws InterruptedException { //队列长度为2 ArrayBlockingQueue queue = new ArrayBlockingQueue(2); //方式1:满列抛异常 ///System.out.println(queue.add("add")); //true ///System.out.println(queue.add("add")); //true ///System.out.println(queue.add("add")); //满列异常 //方式2:满列返回false,不阻塞 //System.out.println(queue.offer("offer")); //true //System.out.println(queue.offer("offer")); //true //System.out.println(queue.offer("offer")); //false //方式3:满列阻塞(推荐) queue.put("put"); queue.put("put"); queue.put("put"); //满列阻塞等待 } } ``` 这里我们以put方法为例 ```java public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //取锁: 线程运行中断 try { while (count == items.length) notFull.await(); //队列满队,需要暂停等待 enqueue(e); //入列 } finally { lock.unlock(); //释放锁 } } ``` 在put方法开始前, 先获取可中断lock.lockInterruptibly(), 对put核心逻辑进行加锁,当判断到队列已满,阻塞当前线程。反之, 执行enqueue()实现入列逻辑。 ```java private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; //入列 //putIndex 表示下一个入列所以, 如果为队列长度, 下一个轮回 //原因: 队列为数组, 操作所以从0开始 if (++putIndex == items.length) putIndex = 0; count++; //总数+1 notEmpty.signal(); //唤醒等待出列线程 } ``` 进入enqueue之后, 因为该方法已经持有锁,所以无法再进行锁重入,在enqueue方法之后, 执行notEmpty.signal(); 唤醒出列等待线程。 ######出列 ArrayBlockingQueue 出列也对应的有3中方式 ```java public class App { public static void main(String[] args) throws InterruptedException { //队列长度为2 ArrayBlockingQueue queue = new ArrayBlockingQueue(2); queue.put("admin"); queue.put("admin"); //方式1:空列出队时,抛异常 //System.out.println(queue.remove()); //System.out.println(queue.remove()); //System.out.println(queue.remove()); //空列报异常 //方式2:空列出队时,返回null System.out.println(queue.poll()); System.out.println(queue.poll()); System.out.println(queue.poll()); //空列返回null //方式3:空列出队时,阻塞(推荐) System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); //空列阻塞 } } ``` 这里我们以take方法为例 ```java public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); // 队长为0,需要暂停等待 return dequeue(); } finally { lock.unlock(); } } ``` 跟put方法操作一样, 进入方法之后, 先获取锁,再判断队列长度是否为0, 如果为0, 当前线程进入阻塞。反之,进入dequeue 方法执行出列操作。 ```java private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; //出列之后,原先队列设置为null //takeIndex 下一个出列的数据索引, 一个轮回后,设置为0 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); //唤醒等待入列线程 return x; } ``` ######公平/非公平队列 ArrayBlockingQueue 可以实现公平与非公平2种队列, 公平队列表示在并发环境下,如果队列已经满列了,入列线程按照FIFO的顺序阻塞,等待召唤。非公平队列就没有这种规矩,谁先抢到,谁先入列。 来看一下例子: 需求:开启10个线程往边界为3的队列添加数据, 同时开始一个线程不断出列。 ```java public class App { public static void main(String[] args) throws InterruptedException { //队列长度为3 //公平队列 ArrayBlockingQueue queue = new ArrayBlockingQueue(3, true); for (int i= 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10)); //将问题放大 //线程进入 System.out.println("进入-"+ Thread.currentThread().getName()); //阻塞等待入列 queue.put("出列-" + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } },"t_" + i).start(); } new Thread(new Runnable() { @Override public void run() { while(true){ try { //按顺序出列 System.out.println("------" + queue.take()); } catch (Exception e) { e.printStackTrace(); } } } }).start(); } } ``` ```java 进入-t_5 进入-t_1 ------出列-t_5 ------出列-t_1 进入-t_8 ------出列-t_8 进入-t_7 ------出列-t_7 进入-t_2 ------出列-t_2 进入-t_9 ------出列-t_9 进入-t_0 ------出列-t_0 进入-t_3 进入-t_6 ------出列-t_3 ------出列-t_6 进入-t_4 ------出列-t_4 ``` 观察结果,发现进入顺序跟出列顺序一样。公平队列讲究公平, 进入0到9线程启动后,执行run方法,都能执行 “进入” 代码,但是入列的操作是阻塞的,同一时间点只允许一个线程进入。其他线程必须等待,那么谁先打印 “进入” 代码,就表示谁先阻塞,依照公平FIFO原则,就应该谁先出列。 所以当进入顺序与出列一致就把表示公平原则生效。 将参数改为false,我们再看打印结果 ```java ArrayBlockingQueue queue = new ArrayBlockingQueue(3, false); ``` ```java 进入-t_3 进入-t_7 进入-t_6 进入-t_0 进入-t_4 进入-t_8 进入-t_5 进入-t_1 ------出列-t_3 ------出列-t_7 ------出列-t_6 ------出列-t_8 ------出列-t_4 ------出列-t_5 ------出列-t_1 ------出列-t_0 进入-t_9 ------出列-t_9 进入-t_2 ------出列-t_2 ``` 观察, 很明显进入与出列顺序不一致,这就是非公平队列。 注意: 10个线程效果不是太明显,可以适当加大。 到这,本篇结束。
叩丁狼学员采访 叩丁狼学员采访
叩丁狼头条 叩丁狼头条
叩丁狼在线课程 叩丁狼在线课程