3 回答

TA貢獻1801條經驗 獲得超8個贊
Collections.synchronizedList()您在使用時應該使用parallelStream(). 因為ArrayList它不是線程安全的,并且在并發訪問它時會出現意外的行為,就像您使用parallelStream().
我已經修改了您的代碼,現在它可以正常工作:
private static void reduce_parallelStream() {
List<String> vals = Arrays.asList("a", "b");
// Use Synchronized List when with parallelStream()
List<String> join = vals.parallelStream().reduce(Collections.synchronizedList(new ArrayList<>()),
(l, v) -> {
l.add(v);
return l;
}, (a, b) -> a // don't use addAll() here to multiplicate the output like [a, b, a, b]
);
System.out.println(join);
}
輸出:
有時你會得到這樣的輸出:
[a, b]
有時還有這個:
[b, a]
這樣做的原因是它是一個parallelStream()所以你無法確定執行的順序。

TA貢獻1876條經驗 獲得超6個贊
因為它是并行流,所以要減少的第一個參數new ArrayList() 可能會為每個輸入值 a 和 b 調用兩次。
這就是你錯的地方。第一個參數是單個ArrayList實例,而不是lambda 表達式可以產生多個ArrayList實例。
因此,整個縮減操作在單個ArrayList實例上進行。當多個線程ArrayList并行修改該值時,每次執行的結果可能會發生變化。
您combiner實際上將 a 的所有元素添加List到同一個List.
[a,b]如果 和accumulator函數combiner都會生成新的ArrayList而不是改變其輸入,則可以獲得預期的輸出ArrayList:
List<String> join = vals.parallelStream().reduce(
new ArrayList<String>(),
(List<String> l, String v) -> {
List<String> cl = new ArrayList<>(l);
cl.add(v);
return cl;
}, (a, b) -> {
List<String> ca = new ArrayList<>(a);
ca.addAll(b);
return ca;
}
);
也就是說,您reduce根本不應該使用。collect是執行可變歸約的正確方法:
List<String> join = vals.parallelStream()
.collect(ArrayList::new,ArrayList::add,ArrayList::addAll);
正如您所看到的,這里與 in 不同reduce,您傳遞的第一個參數是 a Supplier<ArrayList<String>>,它可用于生成所需數量的中間ArrayList實例。

TA貢獻1856條經驗 獲得超5個贊
這很簡單,第一個參數是身份,或者我會說從零開始。因為parallelStream usage這個值被重用。這意味著并發問題(添加中的空值)和重復問題。
這可以通過以下方式修補:
final ArrayList<String> zero = new ArrayList<>();
List<String> join = vals.parallelStream().reduce(zero,
(List<String> l, String v) -> {
if (l == zero) {
l = new ArrayList<>();
}
l.add(v);
return l;
}, (a, b) -> {
// See comment of Holger:
if (a == zero) return b;
if (b == zero) return a;
a.addAll(b);
return a;
}
);
安全的。
您可能想知道為什么reduce身份提供函數沒有重載。原因是collect這里應該使用它。
添加回答
舉報