亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

Java 多線程(五)—— 線程池基礎 之 FutureTask源碼解析

標簽:
Java

 

正文

FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。
如:

  1. 取消任务执行

  2. 查询任务是否执行完成

  3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。
    注意:一旦任务执行完成或取消任务,则不能执行取消任务或者重新启动任务。(除非一开始就使用runAndReset模式运行任务)

FutureTask实现了Runnable接口和Future接口,因此FutureTask可以传递到线程对象Thread或Excutor(线程池)来执行。

如果在当前线程中需要执行比较耗时的操作,但又不想阻塞当前线程时,可以把这些作业交给FutureTask,另开一个线程在后台完成,当当前线程将来需要时,就可以通过FutureTask对象获得后台作业的计算结果或者执行状态。

回到顶部

示例

复制代码

 1 public class FutureTaskDemo { 2     public static void main(String[]args)throws InterruptedException { 3         FutureTask < Integer > ft = new FutureTask <  > (new Callable < Integer > () { 4                  @Override 
 5                  public Integer call()throws Exception { 6                     int num = new Random().nextInt(10); 7                     TimeUnit.SECONDS.sleep(num); 8                     return num; 9                 }10             });11         Thread t = new Thread(ft);12         t.start(); 
13         //这里可以做一些其它的事情,跟futureTask任务并行,等需要futureTask的运行结果时,可以调用get方法获取14         try { 
15             //等待任务执行完成,获取返回值16             Integer num = ft.get();17             System.out.println(num);18         } catch (Exception e) {19             e.printStackTrace();20         }21     }22 }

复制代码

回到顶部

FutureTask 源码分析

JDK1.8自己实现了一个同步等待队列,在结果返回之前,所有的线程都被阻塞,存放到等待队列中。

下面我们来分析下JDK1.8的FutureTask 源码

FutureTask 类结构

复制代码

 1 public class FutureTask<V> implements RunnableFuture<V> { 
 2 /** * 当前任务的运行状态。 
 3 * 
 4 * 可能存在的状态转换 
 5 * NEW -> COMPLETING -> NORMAL(有正常结果) 
 6 * NEW -> COMPLETING -> EXCEPTIONAL(结果为异常) 
 7 * NEW -> CANCELLED(无结果) 
 8 * NEW -> INTERRUPTING -> INTERRUPTED(无结果) 
 9 */ 10 private volatile int state; 
11 private static final int NEW = 0; //初始状态 12 private static final int COMPLETING = 1; //结果计算完成或响应中断到赋值给返回值之间的状态。 13 private static final int NORMAL = 2; //任务正常完成,结果被set 14 private static final int EXCEPTIONAL = 3; //任务抛出异常 15 private static final int CANCELLED = 4; //任务已被取消 16 private static final int INTERRUPTING = 5; //线程中断状态被设置ture,但线程未响应中断 17 private static final int INTERRUPTED = 6; //线程已被中断 
18 19 //将要执行的任务 20 private Callable<V> callable; //用于get()返回的结果,也可能是用于get()方法抛出的异常 21 private Object outcome; // non-volatile, protected by state reads/writes //执行callable的线程,调用FutureTask.run()方法通过CAS设置 22 private volatile Thread runner; //栈结构的等待队列,该节点是栈中的最顶层节点。 23 private volatile WaitNode waiters; 
24 ....

复制代码

FutureTask实现的接口信息如下:

RunnableFuture 接口

1 public interface RunnableFuture<V> extends Runnable, Future<V> {2     void run();3 }

RunnableFuture 接口基础了Runnable和Future接口

Future 接口

复制代码

 1 public interface Future<V> { 
 2     //取消任务  3     boolean cancel(boolean mayInterruptIfRunning); 
 4     //判断任务是否已经取消  5     boolean isCancelled(); 
 6     //判断任务是否结束(执行完成或取消)  7     boolean isDone(); 
 8     //阻塞式获取任务执行结果  9     V get() throws InterruptedException, ExecutionException; 
10     //支持超时获取任务执行结果 11     V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 
12 }

复制代码

run 方法

复制代码

 1 public void run() { 2     //保证callable任务只被运行一次 3     if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) 4         return; 5     try { 6         Callable < V > c = callable; 7         if (c != null && state == NEW) { 8             V result; 9             boolean ran;10             try { 
11                 //执行任务,上面的例子我们可以看出,call()里面可能是一个耗时的操作,不过这里是同步的12                 result = c.call();13                 //上面的call()是同步的,只有上面的result有了结果才会继续执行14                 ran = true;15             } catch (Throwable ex) {16                 result = null;17                 ran = false;18                 setException(ex);19             }20             if (ran)21                 //执行完了,设置result22                 set(result);23         }24     }25     finally {26         runner = null;27         int s = state;28         //判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成29         if (s >= INTERRUPTING)30             handlePossibleCancellationInterrupt(s);31     }32 }

复制代码

1.如果state状态不为New或者设置运行线程runner失败则直接返回false,说明线程已经启动过,保证任务在同一时刻只被一个线程执行。
2.调用callable.call()方法,如果调用成功则执行set(result)方法,将state状态设置成NORMAL。如果调用失败抛出异常则执行setException(ex)方法,将state状态设置成EXCEPTIONAL,唤醒所有在get()方法上等待的线程。
3.如果当前状态为INTERRUPTING(步骤2已CAS失败),则一直调用Thread.yield()直至状态不为INTERRUPTING

set方法

复制代码

1 protected void set(V v) {2     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {3         outcome = v;4         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state5         finishCompletion();6     }7 }

复制代码

  1. 首先通过CAS把state的NEW状态修改成COMPLETING状态。

  2. 修改成功则把v值赋给outcome变量。然后再把state状态修改成NORMAL,表示现在可以获取返回值。

  3. 最后调用finishCompletion()方法,唤醒等待队列中的所有节点。

finishCompletion方法

复制代码

 1 private void finishCompletion() { 2     for (WaitNode q; (q = waiters) != null; ) { 
 3         //通过CAS把栈顶的元素置为null,相当于弹出栈顶元素 4         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 5             for (; ; ) { 6                 Thread t = q.thread; 7                 if (t != null) { 8                     q.thread = null; 9                     LockSupport.unpark(t);10                 }11                 WaitNode next = q.next;12                 if (next == null)13                     break;14                 q.next = null; // unlink to help gc15                 q = next;16             }17             break;18         }19     }20     done();21     callable = null; // to reduce footprint22 }

复制代码

把栈中的元素一个一个弹出,并通过 LockSupport.unpark(t)唤醒每一个节点,通知每个线程,该任务执行完成(可能是执行完成,也可能cancel,异常等)

get方法

复制代码

1 public V get()throws InterruptedException, ExecutionException {2     int s = state;3     if (s <= COMPLETING)4         s = awaitDone(false, 0L);5     return report(s);6 }

复制代码

如果state状态小于等于COMPLETING,说明任务还没开始执行或还未执行完成,然后调用awaitDone方法阻塞该调用线程。

如果state的状态大于COMPLETING,则说明任务执行完成,或发生异常、中断、取消状态。直接通过report方法返回执行结果。

awaitDone 方法

复制代码

 1 private int awaitDone(boolean timed, long nanos)throws InterruptedException { 2     final long deadline = timed ? System.nanoTime() + nanos : 0L; 3     WaitNode q = null; 4     boolean queued = false; 5     for (; ; ) { 
 6         //如果该线程执行interrupt()方法,则从队列中移除该节点,并抛出异常 7         if (Thread.interrupted()) { 8             removeWaiter(q); 9             throw new InterruptedException();10         }11         int s = state; 
12         //如果state状态大于COMPLETING 则说明任务执行完成,或取消13         if (s > COMPLETING) {14             if (q != null)15                 q.thread = null;16             return s;17         } 
18         //如果state=COMPLETING,则使用yield,因为此状态的时间特别短,通过yield比挂起响应更快。19         else if (s == COMPLETING) // cannot time out yet20             Thread.yield(); 
21         //构建节点22         else if (q == null)23             q = new WaitNode();24         //把当前节点入栈25         else if (!queued)26             queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);27         //如果需要阻塞指定时间,则使用LockSupport.parkNanos阻塞指定时间28         //如果到指定时间还没执行完,则从队列中移除该节点,并返回当前状态29         else if (timed) {30             nanos = deadline - System.nanoTime();31             if (nanos <= 0L) {32                 removeWaiter(q);33                 return state;34             }35             LockSupport.parkNanos(this, nanos);36         }37         //阻塞当前线程38         else39             LockSupport.park(this);40     }41 }

复制代码

构建栈链表的节点元素,并将该节点入栈,同时阻塞当前线程等待运行主任务的线程唤醒该节点。

report方法

复制代码

1 private V report(int s)throws ExecutionException {2     Object x = outcome;3     if (s == NORMAL)4         return (V)x;5     if (s >= CANCELLED)6         throw new CancellationException();7     throw new ExecutionException((Throwable)x);8 }

复制代码

如果state的状态为NORMAL,说明任务正确执行完成,直接返回计算后的值。
如果state的状态大于等于CANCELLED,说明任务被成功取消执行、或响应中断,直接返回CancellationException异常
否则返回ExecutionException异常。

原文出处:https://www.cnblogs.com/java-chen-hao/p/10243509.html  

點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消