并发工具类
本系列文章主要讲解Java并发相关的内容,包括同步、锁、信号量、阻塞队列、线程池等,整体思维导图如下:
系列文章列表:
本文主要以实例讲解Semaphore、阻塞队列等内容。
Semaphore
基本概念和用途
Semaphore常称信号量,其维护了一个许可集,可以用来控制线程并发数。线程调用acquire()方法去或者许可证,然后执行相关任务,任务完成后,调用release()方法释放该许可证,让其他阻塞的线程可以运行。
Semaphore可以用于流量控制,尤其是一些公共资源有限的场景,比如数据库连接。假设我们上面的账户余额管理中的账户修改操作涉及到去更改mysql数据库,为了避免数据库并发太大,我们进行相关限制。
常用方法
Semaphore(int permits):构造方法,初始化许可证数量
void acquire():获取许可证
void release():释放许可证
int availablePermits() :返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。
运行示例
虽然在代码中设置了20个线程去运行,但同时设置了许可证的数量为5,因而实际的最大并发数还是5。
package com.aidodoo.java.concurrent;
import java.util.concurrent.*;
/**
* Created by zhangkh on 2018/9/9.
*/
public class SemaphoreDemo {
public static void main(String[] args){
Semaphore semaphore=new Semaphore(5);
ExecutorService executorService = Executors.newFixedThreadPool(20);
Account account=new Account();
for(int i=0;i<20;i++){
SpenderWithSemaphore spender = new SpenderWithSemaphore(account, semaphore);
executorService.submit(spender);
}
executorService.shutdown();
}
}
class SpenderWithSemaphore implements Runnable {
private final Account account;
private final Semaphore semaphore;
public SpenderWithSemaphore(Account account, Semaphore semaphore) {
this.account = account;
this.semaphore = semaphore;
}
@Override
public void run() {
try{
semaphore.acquire();
System.out.println(String.format("%s get a premit at time %s,change and save data to mysql",Thread.currentThread().getName(),System.currentTimeMillis()/1000));
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}finally {
// System.out.println(String.format("%s release a premit",Thread.currentThread().getName()));
semaphore.release();
}
}
}
获取许可证后,模拟操作mysql,我们让线程睡眠2秒,程序输出如下:
pool-1-thread-2 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-5 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-3 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-4 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-1 get a premit at time 1536480858,change and save data to mysql
pool-1-thread-8 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-7 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-6 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-9 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-10 get a premit at time 1536480860,change and save data to mysql
pool-1-thread-11 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-13 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-12 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-14 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-15 get a premit at time 1536480862,change and save data to mysql
pool-1-thread-16 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-17 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-19 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-18 get a premit at time 1536480864,change and save data to mysql
pool-1-thread-20 get a premit at time 1536480864,change and save data to mysql
可以看到前面5个线程同一时间1536480858获得许可证,然后执行操作,并不是20个线程一起操作,这样能降低对mysql数据库的影响。
如果把上面Semaphore的构造方法中的许可证数量改为20,大家可以看到20个线程的运行时间基本一致。
源码实现
Semaphore实现直接基于AQS,有公平和非公平两种模式。公平模式即按照调用acquire()的顺序依次获得许可证,遵循FIFO(先进先出),非公平模式是抢占式的,谁先抢到先使用。
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
获取许可证
acquire()方法最终调用父类AQS中的acquireSharedInterruptibly方法。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //(1)
doAcquireSharedInterruptibly(arg); //(2)
}
(1):调用tryAcquireShared,尝试去获取许可证
(2):如果获取失败,则调用doAcquireSharedInterruptibly,将线程加入到等待队列中
tryAcquireShared方法由Semaphore的内部类,同时也是AQS的子类去实现,即NonfairSync和FairSync,下面我们以NonfairSync为例说明其实现。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
而nonfairTryAcquireShared方法如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); //(1)
int remaining = available - acquires; //(2)
if (remaining < 0 ||
compareAndSetState(available, remaining)) (3)
return remaining;
}
}
(1):获取state的值,也就是总许可证数量
(2):计算本次申请后,剩余的许可证数量
(3):如果剩余的许可证数量大于0且通过CAS将state的值修改成功后,返回剩余的许可证数量,否则继续循环阻塞。
释放许可证
release()方法的调用最终会调用父类AQS的releaseShared()方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //(1)
doReleaseShared(); //(2)
return true;
}
return false;
}
(1):尝试释放许可证
(2):如果释放许可证成功,则通知阻塞的线程,让其执行
tryReleaseShared方法很简单,基本上是nonfairTryAcquireShared的逆过程,即增加许可证的数量,并通过CAS修改state的值。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
BlockingQueue
基本概念
阻塞队列主要是解决如何高效安全传输数据的问题,此外能降低程序耦合度,让代码逻辑更加清晰。
其继承了Queue,并在其基础上支持了两个附加的操作:
- 当队列为空时,获取元素的线程会阻塞,等待队列变为非空
- 当队列满时,添加元素的线程会阻塞,等待队列可用
比较典型的使用场景是生产者和消费者。
BlockingQueue根据对于不能立即满足但可能在将来某一时刻可以满足的操作,提供了不同的处理方法,进而导致众多的api操作:
| Throws exception | Special value | Blocks | Times out | |
| Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| Remove | remove() | poll() | take() | poll(time, unit) |
| Examine | element() | peek()} | not applicable | not applicable |
Throws exception:指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常
Special value:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
Blocks:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
Time out:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
整体架构和类图
Java并发包根据不同的结构和功能提供了不同的阻塞队列,整体类图如下:
其中BlockingQueue有如下子类:
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。DelayQueue:一个使用优先级队列实现的无界阻塞队列。PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。SynchronousQueue:一个不存储元素的阻塞队列。LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
其中BlockingDeque有一个子类:
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
BlockingDeque作为双端队列,针对头部元素,还提供了如下方法:
| First Element (Head) | ||||
| Throws exception | Special value | Blocks | Times out | |
| Insert | addFirst(e) | offerFirst(e) | putFirst(e) | offerFirst(e, time, unit) |
| Remove | removeFirst() | pollFirst() | takeFirst() | pollFirst(time, unit) |
| Examine | getFirst() | peekFirst() | not applicable | not applicable |
针对尾部元素
| Last Element (Tail) | ||||
| Throws exception | Special value | Blocks | Times out | |
| Insert | addLast(e) | offerLast(e) | putLast(e) | offerLast(e, time, unit) |
| Remove | removeLast() | pollLast() | takeLast() | pollLast(time, unit) |
| Examine | getLast() | peekLast() | not applicable | not applicable |
使用示例
一个典型的生产者和消费者实例如下,一个BlockingQueue可以安全地与多个生产者和消费者一起使用,Producer线程调用NumerGenerator.getNextNumber()生成自增整数,不断地写入数字,然后Consumer循环消费。
package com.aidodoo.java.concurrent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by zhangkh on 2018/7/17.
*/
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(1024,true);
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int i = 0; i < 5; i++) {
executorService.submit(new Producer(queue));
}
for (int i = 0; i < 3; i++) {
executorService.submit(new Consumer(queue));
}
Thread.sleep(30 * 1000L);
executorService.shutdown();
}
}
class Producer implements Runnable {
Logger logger = LoggerFactory.getLogger(Producer.class.getName());
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for(int i=0;i<3;i++){
int num = NumerGenerator.getNextNumber();
queue.put(num);
Thread.sleep(1000);
logger.info("{} producer put {}", Thread.currentThread().getName(), num);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
Logger logger = LoggerFactory.getLogger(Consumer.class.getName());
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
int ele = (int) queue.take();
logger.info("{} Consumer take {}", Thread.currentThread().getName(), ele);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class NumerGenerator{
private static AtomicInteger count = new AtomicInteger();
public static Integer getNextNumber(){
return count.incrementAndGet();
}
}
程序输出如下:
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 1
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 2
18/09/10 14:34:33 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-3 producer put 3
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-2 producer put 2
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-1 producer put 1
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-5 producer put 5
18/09/10 14:34:34 INFO concurrent.Producer: pool-1-thread-4 producer put 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 4
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 5
18/09/10 14:34:34 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-3 producer put 6
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-1 producer put 8
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-2 producer put 7
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-5 producer put 9
18/09/10 14:34:35 INFO concurrent.Producer: pool-1-thread-4 producer put 10
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 7
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 8
18/09/10 14:34:35 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 9
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-1 producer put 12
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-3 producer put 11
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-5 producer put 14
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-4 producer put 15
18/09/10 14:34:36 INFO concurrent.Producer: pool-1-thread-2 producer put 13
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 10
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 11
18/09/10 14:34:36 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 12
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-6 Consumer take 13
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-7 Consumer take 14
18/09/10 14:34:37 INFO concurrent.Consumer: pool-1-thread-8 Consumer take 15
其他BlockingQueue子类的使用可参考对应的Java Api。
源码分析
由于BlockingQueue相关的子类众多,我们仅以ArrayBlockingQueue从源码角度分析相关实现。
构造方法
ArrayBlockingQueue中定义的成员变量如下:
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null
各变量的解释如下,以便了解后续的代码:
items用于存储具体的元素takeIndex元素索引,用于记录下次获取元素的位置putIndex元素索引,用于记录下次插入元素的位置count用于记录当前队列中元素的个数notEmpty条件变量,此处为获取元素的条件,即队列不能为空,否则线程阻塞notFull条件变量,此处为插入元素的条件,即队列不能已满,否则线程阻塞itrs用于维护迭代器相关内容
内部结构如下:
构造方法如下:
public ArrayBlockingQueue(int capacity) {
this(capacity, false); //(1)
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity]; //(2)
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition(); //(3)
notFull = lock.newCondition(); //(4)
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) { //(5)
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
(1):默认情况下,非公平模式,即抢占式
(2):数组初始化
(3)/(4):条件变量初始化
(5):如果构造方法中,含有初始化集合的话,则将对应元素添加到内部数组,并更改count和putIndex的值。
插入数据
插入数据,我们主要看put()方法的实现,重点看生产者和消费者插入和获取数据时,线程何时阻塞,同时又何时唤醒。
public void put(E e) throws InterruptedException {
checkNotNull(e); //(1)
final ReentrantLock lock = this.lock; //(2)
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); //(3)
enqueue(e);
} finally {
lock.unlock(); //(4)
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x; //(5)
if (++putIndex == items.length) //(6)
putIndex = 0;
count++; //(7)
notEmpty.signal(); //(8)
}
(1):非空检查,插入的元素不能为null,否则抛出NullPointerException
(2):获取互斥锁
(3):如果当前队列的元素个数等于队列总长度,即队列已满,则通过条件变量,释放和notFull相关的锁,当前线程阻塞。当前线程唤醒的条件如下:
- 其他某个线程调用此
Condition的signal()方法,并且碰巧将当前线程选为被唤醒的线程; - 或者其他某个线程调用此
Condition的signalAll()方法; - 或者其他某个线程中断当前线程,且支持中断线程的挂起;
- 或者发生“虚假唤醒”
(5):如果队列未满,则将元素添加的putIndex索引的位置
(6):putIndex增加1后和队列长度相等,即已到达队列尾部,则putIndex置0
(7):队列已有元素数量加1
(8):通知notEmpty条件变量,唤醒等待获取元素的线程
(4):释放互斥锁
可以看到ArrayBlockingQueue每次插入元素后,都会去唤醒等待获取元素的线程。
获取数据
take()方法源码如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //(1)
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); //(2)
return dequeue();
} finally {
lock.unlock(); //(9)
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //(3)
items[takeIndex] = null; //(4)
if (++takeIndex == items.length)
takeIndex = 0; //(5)
count--; //(6)
if (itrs != null)
itrs.elementDequeued(); //(7)
notFull.signal(); //(8)
return x;
}
(1):获取互斥锁
(2):如果count为0,即队列为空,则释放互斥锁,然后挂起当前线程
(3):根据takeIndex索引到数组中获取具体的值,并赋值给x
(4):赋值完成后,takeIndex索引位置数据置null,便于回收
(5):takeIndex加1,然后和队列长度比较,如果相等,即已经读取到队列尾部,takeIndex置0
(6):获取后,将队列元素个数count减1
(7):维护和queue相关的迭代器
(8):唤醒等待插入元素的线程
(9):释放互斥锁
可以看到ArrayBlockingQueue每次获取元素后,都会唤醒等待插入元素的线程。
迭代器
在分析源码前,我们先看在一个迭代器的示例
package com.aidodoo.java.concurrent;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Created by zhangkh on 2018/9/10.
*/
public class ArrayBlockingQueueIterDemo {
public static void main(String[] args) throws InterruptedException{
BlockingQueue<String> queue=new ArrayBlockingQueue(5);
queue.put("hadoop");
queue.put("spark");
queue.put("storm");
queue.put("flink");
Iterator<String> iterator1 = queue.iterator();
System.out.println( queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println();
while(iterator1.hasNext()) {
System.out.println(iterator1.next());
}
System.out.println();
Iterator<String> iterator2 = queue.iterator();
while(iterator2.hasNext()) {
System.out.println(iterator2.next());
}
}
}
程序输出如下:
hadoop
spark
storm
hadoop
flink
flink
我们结合这个示例来具体分析数据插入和获取时,内部成员变量的值
当分别插入hadoop、spark、storm、flink四个元素后,内部变量的值如下:
此时,ArrayBlockingQueue的成员变量的值itrs为null。
调用iterator()方法后,源码如下:
public Iterator<E> iterator() {
return new Itr(); //(1)
}
Itr() {
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock(); //(2)
try {
if (count == 0) { //(3)
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex); //(4)
cursor = incCursor(takeIndex); //(5)
if (itrs == null) {
itrs = new Itrs(this); //(6)
} else {
itrs.register(this); //(7)
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock(); //(8)
}
}
(1):调用内部类Itr的构造方法
(2):获取外部类即ArrayBlockingQueue的锁
(3):没有没有元素,初始化变量值。内部类Itr的成员变量如下:
/** Index to look for new nextItem; NONE at end */
private int cursor;
/** Element to be returned by next call to next(); null if none */
private E nextItem;
/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;
/** Last element returned; null if none or not detached. */
private E lastItem;
/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;
/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;
/** Previous value of iters.cycles */
private int prevCycles;
(4):将外部类的takeIndex赋值给内部类nextIndex,并获取数组具体的值赋值给nextItem
(5):计算游标cursor的下个值,其中incCursor方法如下:
private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
(6):注册,主要是维护链表
(7):清理itrs
(8):释放外部类的互斥锁
在上面的示例中,调用iterator()方法后,Itr的内部变量值如下:
由于后面三次调用了queue.take(),依次输出hadoop、spark、storm后,相关成员变量的值见图片标识,重点关注takeIndex=3。
当调用next()方法时,代码如下:
public E next() {
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached()) //(1)
incorporateDequeues();
lastRet = nextIndex;
final int cursor = this.cursor;
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}
其中(1)处的isDetached方法如下
boolean isDetached() {
// assert lock.getHoldCount() == 1;
return prevTakeIndex < 0;
}
由于我们示例中初始化Itr的时候的prevTakeIndex为0,故isDetached返回为false,程序将调用incorporateDequeues方法,根据注释我们也知道,该方法主要是调整和迭代器相关的内部索引。
/**
* Adjusts indices to incorporate all dequeues since the last
* operation on this iterator. Call only from iterating thread.
*/
private void incorporateDequeues() {
final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
final int len = items.length;
// how far takeIndex has advanced since the previous
// operation of this iterator
long dequeues = (cycles - prevCycles) * len
+ (takeIndex - prevTakeIndex);
// Check indices for invalidation
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
lastRet = REMOVED;
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
nextIndex = REMOVED;
if (invalidated(cursor, prevTakeIndex, dequeues, len))
cursor = takeIndex;
if (cursor < 0 && nextIndex < 0 && lastRet < 0)
detach();
else {
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}
}
注意cursor = takeIndex这句代码,将外部内的takeIndex赋值给cursor,这样子将队列和迭代器数据读取进行了同步。
对于iterator1,第一次调用next()方法时,cursor被赋值为3首先将nextItem的值保持在x变量中,即hadoop字符串。
然后设置nextItem和cursor的值
nextItem = itemAt(nextIndex = cursor);
this.cursor = incCursor(cursor);
设置完成后,nextItem为flink,cursor为-1。
最后返回保存在x变量中的值,即返回hadoop字符串。
第二次调用next()方法时,输出的值即上次保存的nextItem值,即flink字符串。
迭代器运行过程中,相关变量内容如下:
至于iterator2迭代器,各位可以自己去分析,不再赘述。
本文主要以实例讲解Semaphore、阻塞队列,并分析了相关核心源码实现。
本文参考
关于作者
爱编程、爱钻研、爱分享、爱生活
关注分布式、高并发、数据挖掘
如需捐赠,请扫码
共同學習,寫下你的評論
評論加載中...
作者其他優質文章






