我從未使用過 ForkJoinPool,我遇到了這個代碼片段。我有一個Set<Document> docs. 文檔有寫方法。如果我執行以下操作,是否需要使用 get 或 join 來確保集合中的所有文檔都正確完成了它們的寫入方法?ForkJoinPool pool = new ForkJoinPool(concurrencyLevel);pool.submit(() -> docs.parallelStream().forEach( doc -> { doc.write(); }));如果其中一個文檔無法完成它的寫入會發生什么?說它拋出一個異常。給出的代碼是否等待所有文檔完成其寫入操作?
1 回答

呼啦一陣風
TA貢獻1802條經驗 獲得超6個贊
ForkJoinPool.submit(Runnable)
返回一個ForkJoinTask
代表待完成的任務。如果您想等待所有文檔被處理,您需要與該任務進行某種形式的同步,例如調用其 get()
方法(從Future
接口)。
關于異常處理,像往常一樣,流處理期間的任何異常都會停止它。但是,您必須參考以下文檔Stream.forEach(Consumer)
:
此操作的行為明顯是不確定的。對于并行流管道,此操作不保證遵守流的遇到順序,因為這樣做會犧牲并行性的好處。對于任何給定的元素,可以在庫選擇的任何時間和線程中執行該操作。[…]
這意味著如果發生異常,您無法保證將寫入哪個文檔。處理將停止,但您無法控制仍將處理哪個文檔。
如果您想確保處理剩余的文件,我會建議兩種解決方案:
document.write()
用try
/包圍catch
以確保沒有異常傳播,但這使得很難檢查哪個文檔成功或根本沒有失??;要么使用另一種解決方案來管理您的并行處理,例如
CompletableFuture
API。正如評論中所指出的,您當前的解決方案是一種由于實現細節而起作用的黑客,因此最好做一些更清潔的事情。
使用CompletableFuture
,你可以這樣做:
List<CompletableFuture<Void>> futures = docs.stream() .map(doc -> CompletableFuture.runAsync(doc::write, pool)) .collect(Collectors.toList());
這將確保處理所有文檔,并檢查返回列表中的每個未來是否成功。
添加回答
舉報
0/150
提交
取消