亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

BlockingQueue的雙鎖源碼解析

標簽:
Java

关于队列的基本概念和常用方法的含义以及使用实例已经在最初的文章中简单列过了,这次我们来通过BlockingQueue来阅读一下队列的put和take方法的源码来理解其中的奥秘。
我们以BlockingQueue中的最常用的LinkedBlockingQueue入手

1.构造方法
public LinkedBlockingQueue(int capacity) {        if (capacity <= 0) throw new IllegalArgumentException();        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

其中capacity是队列的长度,构造方法很简单,初始化node,并设置队列的最大容量capacity。

2.核心属性

/** The capacity bound, or Integer.MAX_VALUE if none */
//队列长度,不指定默认是Integer.MAX_VALUE
private final int capacity;
/** Current number of elements */
//当前队列的元素个数,用AtomicInteger 保证同步
private final AtomicInteger count = new AtomicInteger();
/** Lock held by take, poll, etc */
//take锁,类型为ReentrantLock
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
//take的条件
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
//put锁,类型为ReentrantLock
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
//put的条件
private final Condition notFull = putLock.newCondition();

3.入队put
public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        int c = -1;
        Node<E> node = new Node<E>(e);        final ReentrantLock putLock = this.putLock;        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();        try {            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }        if (c == 0)
            signalNotEmpty();
    }

第一步判断入队元素的合法性,第二步新建一个元素Node,然后请求锁并拿到当前队列的元素总数count,这些都比较容易理解,我们重点关注下try里的逻辑

1.先判断count等于队列最大长度capacity,此时用notFull阻塞等待,为什么这里不用if判断而是while呢?因为当阻塞被唤醒后,if会直接执行enqueue(node);操作,而在执行增加操作前可能又被其它线程拿到锁添加满了,所以必须再次判断才可以保证正确性。
2.完成入队后,判断c + 1 < capacity,然后随机唤醒一个notFull,这里为什么是唤醒一个消费者线程而不是唤醒全部呢?原因是有可能在队列满的时候假如共有5个生产线程,那么5个都会阻塞,这时消费者同时消费了多个元素,但是可能只发出了1个唤醒生产者的信号,这时候醒着的put线程就会通过这种方式来唤醒其它的4个put线程,以弥补take线程的信号不足。相比于signalAll()唤醒所有生产者,这种解决方案使得同一时间最多只有一个生产者在清醒的竞争锁,性能提升非常明显。

这里入队逻辑基本完成,出队逻辑是和入队对应的。

4.特殊情况

我们知道LinkedBlockingQueue是通过两把锁一把是put锁,一把是take锁,但是这样有一个特殊情况当队列长度为1时,到底入队和出队之间会存在锁竞争吗?
我们来看它是怎么做的

//初始化public LinkedBlockingQueue(int capacity) {    if (capacity <= 0) throw new IllegalArgumentException();    this.capacity = capacity;
    last = head = new Node<E>(null);
}//入队操作private void enqueue(Node<E> node) {    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}//出队操作private E dequeue() {    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;    return x;
}

1.初始化时,定义了一个dummy节点,这个和lock、countdownlatch实现一样,都有一个哨兵节点,head和tail都指向这个哨兵。
2.在队尾入队时,tail节点后移,并指向这个第一个入队的元素,此时head还是指向dummy。
3.出队时,创建一个Node h指向head也就是dummy,然后first指向head的next节点,然后把first的值赋值x,消除first,返回x。

总的来说就是互换head和head.next的值,最终把x返回.



作者:激情的狼王
链接:https://www.jianshu.com/p/fc2af13fc635


點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消