叩丁狼官方微信公众号
|
新浪微博
|
咨询热线:020-85628002
首页
在线课程
资料下载
就业信息
师资团队
社区论坛
关于我们
首页
>
叩丁狼技术圈
>
朝花夕拾系列文章
技术分类
朝花夕拾系列文章
Python+大数据
微信开发系列文章
超全SSO系统
RESTFul系列文章
Spring生态系列文章
Java系列文章
技术文章精译
H5前端技术文章
线程基础(二十三)-并发容器-LinkedBlockingQueue
更新时间:2019-02-15 17:31:02
|
阅读量(257)
>本文作者:王一飞,叩丁狼高级讲师。原创文章,转载请注明出处。 #####概念 LinkedBlockingQueue按照api解释:一个基于链表而实现的有界阻塞队列。遵循先进先出原则,由队头入列,再从队尾出列。具体操作上跟ArrayBlockingQueue类似,区别在于底层维护数据上,LinkedBlockingQueue底层是一个链接,而ArrayBlockingQueue是一个数组。  内部结构 ```java public class LinkedBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { private final AtomicInteger count = new AtomicInteger(); //队列元素个数 private final int capacity; //队列容器 transient Node
head; //队头 private transient Node
last; //队尾 //出列入列过程中维护现场安全的各类锁 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); //队列数据节点 static class Node
{ E item; Node
next; Node(E x) { item = x; } } } ``` ######基本操作 ```java public class App { public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue
queue = new LinkedBlockingQueue(5); //入列 queue.add("a"); //队列满后抛异常 queue.put("b");//队列满后阻塞 queue.offer("c"); //入列失败返回false System.out.println(queue); queue.put("a"); queue.put("b"); queue.put("c"); queue.put("d"); queue.put("e"); //出列 queue.remove("a"); //删除指定元素 queue.poll(); //出列,如果队列为空返回null queue.take(); //队列为空,阻塞等待 System.out.println(queue); } } ``` 一般推荐使用put入列, take出列 #####源码解析 构造-LinkedBlockingQueue提供了3个构造器,无参, 带有容量,带集合数据 ```java //无参数时,默认容量为int的最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //带容量参数【推荐】 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化队头,队尾 last = head = new Node
(null); } //带数据集合 public LinkedBlockingQueue(Collection extends E> c) { this(Integer.MAX_VALUE); //容量为int最大值 final ReentrantLock putLock = this.putLock; putLock.lock(); //谨慎起见也加锁,需要将传入集合逐一入列 try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node
(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } ``` LinkedBlockingQueue 3个构造器,实际使用中更推荐使用指定容量的队列。 在继续看源码前,先了解一个原子操作类:AtomicInteger ```java //int 类型的原子操作,不指定初始为0 //底层维护了一个volatile 修饰变量 value = 0 AtomicInteger atomicInteger = new AtomicInteger(); //获取:value = 0 System.out.println(atomicInteger.get()); //0 //返回value值0, 然后value 值加一【这里也是原子操作】 System.out.println(atomicInteger.getAndIncrement()); //0 //返回value值1, 然后value 值加一【这里也是原子操作】 System.out.println(atomicInteger.getAndIncrement()); //1 //获取:value=2 System.out.println(atomicInteger.get()); //2 ``` ```java //int 类型的原子操作,不指定初始为0 //底层维护了一个volatile 修饰变量 value = 0 AtomicInteger atomicInteger = new AtomicInteger(); //获取:value = 0 System.out.println(atomicInteger.get()); //0 //返回value值0, 然后value 值减一【这里也是原子操作】 System.out.println(atomicInteger.getAndDecrement()); //0 //返回value值1, 然后value 值减一【这里也是原子操作】 System.out.println(atomicInteger.getAndDecrement()); //-1 //返回value值1 System.out.println(atomicInteger.get()); //-2 ``` getAndIncrement : 返回未操作前value 的值, 然后加1 getAndDecrement : 返回未操作前value 的值, 然后减1 入列-put:将入列数据添加到队尾,如果队列满了,阻塞等待。 ```java public void put(E e) throws InterruptedException { //入列元素不允许为null if (e == null) throw new NullPointerException(); //队列临时容量缓存,作为执行唤醒/阻塞线程操作标记 int c = -1; Node
node = new Node
(e); //入列锁 final ReentrantLock putLock = this.putLock; //队列元素个数 final AtomicInteger count = this.count; putLock.lockInterruptibly(); //入列前加锁,可中断锁 try { //自旋排除硬件加锁延时问题 //如果队列已满,线程阻塞等待 while (count.get() == capacity) { notFull.await(); } //数据入列 enqueue(node); //原子操作,入列后再获取队列元素个数,并+1,确保当前操作队列元素个数最新 c = count.getAndIncrement(); //c + 1 < capacity 表示队列未满,仍可添加,唤醒未持锁而等待的入列线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); //释放锁 } //c == 0 说明队列为空,唤醒入列线程入列 if (c == 0) signalNotEmpty(); } ``` 从上面源码上看put方法其实做5件事: 1>判断元素是否null, 为null 抛异常 2>判断是否满列,满列则等待,此处需要留意while这个操作。 理想请求下,if即可,但是有种情况,如果jvm执行指令传到cpu到程序时间片执行存在一点的时间延时,while 重复执行,可以减少延时影响。 3>数据入列 4>入列后需要唤醒未持锁而等待的入列线程 5>c==0的判断, c的值是入列前容量值,如果为0说明入列前,队列为空,可以存在出列等待线程,所以在c==0的时候,且已经入列成功,可以唤醒出列等待线程,让其顺利出列。 出列-take : 出列,从队头弹出元素, 如果队列个数为0, 阻塞等待。 ```java public E take() throws InterruptedException { E x; //队列临时容量缓存,作为执行唤醒/阻塞线程操作标记 int c = -1; //队列元素个数 final AtomicInteger count = this.count; //出列锁 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//出列前加锁,可中断锁 try { //自旋排除硬件加锁延时问题 //如果队列已空,线程阻塞等待 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); //数据出列 //原子操作,出列后再获取队列元素个数,并-1,确保当前操作队列元素个数最新 c = count.getAndDecrement(); //c > 1 表示队列未空,仍可出列,唤醒未持锁而等待的出列线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock();//释放锁 } //c == capacity 说明队列已满,唤醒出列线程出列 if (c == capacity) signalNotFull(); return x; } ``` 上面源码看出,take操作跟put原理差不多,执行的是反向操作而已。需要注意的是take方法唤醒线程条件c 变量值判断条件。 1> 出列成功之后, c > 1 表示出列前队列至少有2个元素,所以出列成功后,唤醒未持锁而等待的出列线程 2>c == capacity 满足这个条件, 表示出列前,队列是满的,可能存在入列等待线程,出列成功之后,解除等待,唤醒入列等待线程。 删除-remove : 根据指定元素删除队列中的元素,如果有删除,如果没有返回false ```java public boolean remove(Object o) { if (o == null) return false; //参数为null 直接返回 fullyLock(); //为保证入列出列线程安全,加双锁 try { //循环遍历列表,删除指定元素 for (Node
trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); //元素删除 return true; } } return false; } finally { fullyUnlock(); //是否双锁 } } ``` ```java void fullyLock() { putLock.lock(); takeLock.lock(); } ``` ```java void unlink(Node
p, Node
trail) { p.item = null; trail.next = p.next; if (last == p) last = trail; //删除元素后,队列元素个数-1,唤醒入列等待线程 if (count.getAndDecrement() == capacity) notFull.signal(); } ``` ```java void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } ``` LinkedBlockingQueue 在执行删除操作,需要对takeLock 跟 putLock同时加锁,其目标确保在删除期间,其他线程无法操作队列,进而保证删除操作的线程安全。 ######总结 LinkedBlockingQueue 使用2把锁确保线程安全,入列时使用putLock,出列时使用takeLock,这种锁分离操作机制,在一定层面上提高队列的吞吐量,在高并发的情况下生产者(入列线程)和消费者(出列线程)可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
分享到:
QQ空间
新浪微博
腾讯微博
人人网
微信
上一篇:Elasticsearch(十五)使用Java操作Elasticsearch
下一篇:线程基础(二十四)-并发容器-PriorityBlockingQueue
在线咨询
有位老师想和你聊聊
QQ咨询
微信咨询
电话咨询
020-85628002
返回顶部