首先声明,本文是伪源码分析。主要是基于状态机自己实现一个简化的并发队列,有助于读者掌握并发程序设计的核心——状态机;最后对源码实现略有提及。
ConcurrentLinkedQueue不支持阻塞,没有BlockingQueue那么易用;但在中等规模的并发场景下,其性能却比BlockingQueue高不少,而且相当稳定。同时,ConcurrentLinkedQueue是学习CAS的经典案例。根据github的code results排名,ConcurrentLinkedQueue(164k)也十分流行,比我想象中的使用量大多了。非常值得一讲。
对于状态机和并发程序设计的基本理解,可以参考源码|并发一枝花之BlockingQueue,建议第一次接触状态机的同学速读参考文章之后,再来阅读此文章。
JDK版本:oracle java 1.8.0_102
准备知识:CAS
读者可以跳过这部分,后面讲到offer()方法的实现时再回顾。
悲观锁与乐观锁
悲观锁:假定并发环境是悲观的,如果发生并发冲突,就会破坏一致性,所以要通过独占锁彻底禁止冲突发生。有一个经典比喻,“如果你不锁门,那么捣蛋鬼就回闯入并搞得一团糟”,所以“你只能一次打开门放进一个人,才能时刻盯紧他”。
乐观锁:假定并发环境是乐观的,即,虽然会有并发冲突,但冲突可发现且不会造成损害,所以,可以不加任何保护,等发现并发冲突后再决定放弃操作还是重试。可类比的比喻为,“如果你不锁门,那么虽然捣蛋鬼会闯入,但他们一旦打算破坏你就能知道”,所以“你大可以放进所有人,等发现他们想破坏的时候再做决定”。
通常认为乐观锁的性能比悲观所更高,特别是在某些复杂的场景。这主要由于悲观锁在加锁的同时,也会把某些不会造成破坏的操作保护起来;而乐观锁的竞争则只发生在最小的并发冲突处,如果用悲观锁来理解,就是“锁的粒度最小”。但乐观锁的设计往往比较复杂,因此,复杂场景下还是多用悲观锁。
首先保证正确性,有必要的话,再去追求性能。
CAS
乐观锁的实现往往需要硬件的支持,多数处理器都都实现了一个CAS指令,实现“Compare And Swap”的语义(这里的swap是“换入”,也就是set),构成了基本的乐观锁。
CAS包含3个操作数:
需要读写的内存位置V
进行比较的值A
拟写入的新值B
当且仅当位置V的值等于A时,CAS才会通过原子方式用新值B来更新位置V的值;否则不会执行任何操作。无论位置V的值是否等于A,都将返回V原有的值。
一个有意思的事实是,“使用CAS控制并发”与“使用乐观锁”并不等价。CAS只是一种手段,既可以实现乐观锁,也可以实现悲观锁。乐观、悲观只是一种并发控制的策略。下文将分别用CAS实现悲观锁和乐观锁?
我们先不讲JDK提供的实现,用状态机模型来分析一下,看我们能不能自己实现一版。
队列的状态机模型
状态机模型与是否需要并发无关,一个类不管是否是线程安全的,其状态机模型从类被实现(此时,所有类行为都是确定的)开始就是确定的。接口是类行为的一个子集,我们从接口出发,逐渐构建出简化版ConcurrentLinkedQueue的状态机模型。
队列接口
ConcurrentLinkedQueue实现了Queue接口:
public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); E remove(); E poll(); E element(); E peek(); }
需要关注的是一对方法:
offer():入队,成功返回true,失败返回false。JDK中ConcurrentLinkedQueue实现为无界队列,这里我们也只讨论无界的情况——因此,offer()方法必返回true。
poll():出队,有元素返回元素,没有就返回null。
同时,理想的线程安全队列中,入队和出队之间不应该存在竞争,这样入队的状态机模型和出队的状态机模型可以完全解耦,互不影响。
对我们的状态机作出两个假设:
假设1:只支持这入队、出队两种行为。
假设2:入队、出队之间不存在竞争,即入队模型与出队模型是对偶、独立的两个状态机。
从而,可以先分析入队,再参照分析出队;然后可尝试去掉假设2,看如何完善我们的实现来保证假设2成立;最后看看真·神Doug Lea如何实现,学习一波。
状态机定义
现在基于假设1和假设2,尝试定义入队模型的状态机。
我们构造一个简化的场景:存在2个生产者P1、P2,同时触发入队操作。
状态集
如果是单线程环境,入队操作将是这样的:
// 准备newNode.next = null; curTail = tail;// 入队前assert tail == curTail && tail.next == null; // 状态S1// 开始入队tail.next = newNode; // 事件E1// 入队中assert tail == curTail && tail.next == newNode; // 状态S2tail = tail.next; // 事件E2// 结束入队// 入队后assert tail == newNode && tail.next == null; // 状态S3,合并到状态S1
该过程涉及对两个域的修改:tail.next、tail。则随着操作的进行,队列会经历2种状态:
状态S1:事件E1执行前,tail指向实际的尾节点curTail,tail.next==null。如生产者P1、P2都还没有触发入队时,队列处于状态S1;生产者P1完成入队P2还没触发入队时,队列处于状态S1。
状态S2:事件E1执行后、E2执行前,tail指向旧的尾节点curTail,tail.next==newNode。
状态S3:事件E2执行后,tail指向新的尾节点newNode,tail.next==null。同状态S1,合并。
状态转换集
两个事件分别对应两个状态转换:
状态转换T1:S1->S2,即tail.next = newNode。
状态转换T2:S2->S1,即tail = tail.next。
是不是很熟悉?因为ConcurrentLinkedQueue也是队列,必然同BlockingQueue相似甚至相同。区别在于如何维护这些状态和状态转换。
自撸ConcurrentLinkedQueue
依赖CAS,两个状态转换T1、T2都可以实现为原子操作。留给我们的问题是,如何维护合法的状态转换。
入队方法offer()
入队过程需要经过两个状态转换,且这两个状态转换必须连续发生。
不严谨。“连续”并不是必要的,最后分析源码的时候会看到。不过,我们暂时使用强一致性的模型。
思路1:让同一个生产者P1连续完成两个状态转换T1、T2,保证P2不会插入进来
LinkedBlockingQueue的思路即是如此。这是一种悲观策略——一次开门只放进来一个生产者,似乎只能像LinkedBlockingQueue那样,用传统的锁putLock实现,实际上,依靠CAS也能实现:
public class ConcurrentLinkedQueue1<E> { private volatile Node<E> tail; public ConcurrentLinkedQueue1() { throw new UnsupportedOperationException("Not implement"); } public boolean offer(E e) { Node<E> newNode = new Node<E>(e, new AtomicReference<>(null)); while (true) { Node<E> curTail = tail; AtomicReference<Node<E>> curNext = curTail.next; // 尝试T1:CAS设置tail.next if (curNext.compareAndSet(null, newNode)) { // 成功者视为获得独占锁,完成了T1。直接执行T2:设置tail tail = curNext.get(); return true; } // 失败者自旋等待 } } private static class Node<E> { private volatile E item; private AtomicReference<Node<E>> next; public Node(E item, AtomicReference<Node<E>> next) { this.item = item; this.next = next; } } }
思路2:生产者P1完成状态转换T1后,P2代劳完成状态转换T2
再来分析下T1、T2两个状态转换:
T1涉及newNode,只能由封闭持有newNode的生产者P1完成
T2只涉及队列中的信息,任何持有队列的生产者都有能力完成。P1可以,P2也可以
思路1是悲观的,认为T1、T2必须都由P1完成,如果P2插入就会“搞破坏”。而思路2则打开大门,欢迎任何“有能力”的生产者完成T2,是典型的乐观策略。
public class ConcurrentLinkedQueue2<E> { private AtomicReference<Node<E>> tail; public ConcurrentLinkedQueue2() { throw new UnsupportedOperationException("Not implement"); } public boolean offer(E e) { Node<E> newNode = new Node<E>(e, new AtomicReference<>(null)); while (true) { Node<E> curTail = tail.get(); AtomicReference<Node<E>> curNext = curTail.next; // 尝试T1:CAS设置tail.next if (curNext.compareAndSet(null, newNode)) { // 成功者完成了T1,队列处于S2,继续尝试T2:CAS设置tail tail.compareAndSet(curTail, curNext.get()); // 成功表示该生产者P1完成连续完成了T1、T2,队列处于S1 // 失败表示T2已经由生产者P2完成,队列处于S1 return true; } // 失败者得知队列处于S2,则尝试T2:CAS设置tail tail.compareAndSet(curTail, curNext.get()); // 如果成功,队列转换到S1;如果失败,队列表示T2已经由生产者P1完成,队列已经处于S1 // 然后循环,重新尝试T1 } } private static class Node<E> { private volatile E item; private AtomicReference<Node<E>> next; public Node(E item, AtomicReference<Node<E>> next) { this.item = item; this.next = next; } } }
减少无效的竞争
我们涉及的状态比较少(只有2个状态),继续看看能否减少无效的竞争,比如:
共同學習,寫下你的評論
評論加載中...
作者其他優質文章