2 回答

TA貢獻1946條經驗 獲得超3個贊
通常,您不應該假設并行流的特定處理順序,而是假設您的算法是正確的,無論實際處理順序如何,您都可以推斷順序和性能之間的關系。
Stream 實現已經被設計為允許從處理連續元素中受益——對于本地處理器。因此,當您有一個包含一百個元素的 Stream 時,例如IntStream.range(0, 100)
為了簡化,并使用四個原本空閑的 CPU 內核對其進行處理,實現會將其分為四個范圍 0-25、25-50、50-75 和 75-100,最好是獨立處理。因此,每個處理器將在本地處理連續元素并受益于低級效果,例如一次將多個數組元素提取到其本地緩存中,等等。
因此,您的doComputationallyIntensiveThing
方法的問題似乎是緩存(和您的監控)在本地無法正常工作。因此,繼續上面的示例,操作將從同時并行執行0
、25
、50
和和。如果第一個評估的四個元素中的任何一個“獲勝”并確定緩存的數據,則它將僅適用于接下來的四個值中的一個。如果線程的時間發生變化,比率會變得更糟。75
1
26
51
76
一種解決方案是更改doComputationallyIntensiveThing
為使用線程本地緩存,以從每個線程中連續元素的處理中受益。然后,您定義 Stream 操作的方式非常適合此操作,該操作受益于重復查看arr1
. 不過,您可以簡化代碼并消除大量裝箱開銷:
Arrays.stream(arr1).parallel().forEach(i1 -> Arrays.stream(arr2).forEach(i2 -> Arrays.stream(arr3).forEach(i3 -> doComputationallyIntensiveThing(i1, i2, i3))));
但是,這帶來了之后清理線程本地緩存的挑戰,因為并行 Stream 使用了您無法控制的線程池。
一種更簡單的解決方法,即今天有效的方法,是更改嵌套:
Arrays.stream(arr2).parallel().forEach(i2 -> Arrays.stream(arr1).forEach(i1 -> Arrays.stream(arr3).forEach(i3 -> doComputationallyIntensiveThing(i1, i2, i3))));
現在,arr2
按照上述方式進行拆分。然后,每個工作線程將對 進行相同的迭代arr1
,處理其中的每個元素的次數與 中的元素一樣多arr3
。這允許利用線程間緩存行為,但存在由于時間差異導致線程不同步的風險,最終會出現與以前相同的情況。
一個更好的選擇是重新設計doComputationallyIntensiveThing
,創建兩種不同的方法,一種為返回包含元素緩存數據的對象的特定元素準備操作arr1
,另一種用于使用緩存數據的實際處理:
Arrays.stream(arr1).parallel() .mapToObj(i1 -> prepareOperation(i1)) .forEach(cached -> Arrays.stream(arr2).forEach(i2 -> Arrays.stream(arr3).forEach(i3 -> doComputationallyIntensiveThing(cached, i2, i3))));
在這里,返回的每個實例prepareOperation
都與 的特定元素相關聯,arr1
并充當與其相關聯的任何數據的本地緩存,但在特定元素的處理結束時會正常進行垃圾收集。所以不需要清理。
原則上,如果只返回一個空的持有者對象,它也可以工作,由特定元素prepareOperation
的第一次調用填充。doComputationallyIntensiveThing

TA貢獻1841條經驗 獲得超3個贊
為了使代碼簡單,下面的代碼是針對一個數組的(您可以擴展它以包含更多數組)。
class IteratorSpliteratorOfDouble implements Spliterator.OfDouble {
private long m_estimate;
private final DoubleSupplier m_supplier;
/**
* @param supplier -- returns Double.NaN if no more elements
*/
private IteratorSpliteratorOfDouble(final long size,
final DoubleSupplier supplier) {
m_estimate = size;
m_supplier = supplier;
}
public IteratorSpliteratorOfDouble(final double[] array) {
this(array.length, new DoubleSupplier() {
int m_idx = 0;
@Override
public synchronized double getAsDouble() {
if (m_idx >= array.length) {
return Double.NaN;
}
return array[m_idx++];
}
});
}
@Override
public long estimateSize() {
return m_estimate;
}
@Override
public int characteristics() {
return 0;
}
@Override
public boolean tryAdvance(final DoubleConsumer action) {
final double next = m_supplier.getAsDouble();
if (Double.isNaN(next)) {
return false;
}
action.accept(next);
return true;
}
@Override
public Spliterator.OfDouble trySplit() {
if (m_estimate == 0) {
return null;
}
return new IteratorSpliteratorOfDouble(
m_estimate = m_estimate >>> 1, m_supplier);
}
}
使用上述的一個例子是:
final double[] arr = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
StreamSupport.doubleStream(new IteratorSpliteratorOfDouble(arr), true)
.forEach(idx -> doComputationallyIntensiveThing(idx));
該代碼將保持數組的元素順序,同時利用 java 并行。
添加回答
舉報