亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

有沒有辦法部分控制 Java 并行流的順序?

有沒有辦法部分控制 Java 并行流的順序?

三國紛爭 2022-10-26 16:43:01
我知道嘗試使并行流以特定順序執行每個元素是沒有意義的。由于它并行運行數據,因此排序中顯然會有一些不確定性。但是,我想知道是否有可能讓它按順序執行“排序”,或者至少嘗試保持順序有點類似于如果它是順序的。用例我需要對來自幾個數組的值的每個組合執行一些代碼。我創建了一個包含所有可能索引組合的流,如下所示(為了不泄露專有信息,變量的名稱已被混淆,我保證我通常不會命名我的變量arr1等arr2):public static void doMyComputation(double[] arr1, double[] arr2, double[] arr3) {  DoubleStream.of(arr1).mapToObj(Double::valueOf)    .flatMap(      i1->DoubleStream.of(arr2).mapToObj(Double::valueOf)        .flatMap(          i2->DoubleStream.of(arr3).mapToObj(Double::valueOf)            .flatMap(              i3->new Inputs(i1,i2,i3)             )        )    )    .parallel()    .forEach(input -> doComputationallyIntensiveThing(input.i1, input.i2, input.i3);這很好用(或者至少真實版本可以,我為我在此處發布的代碼片段簡化了一些事情,所以我可能把代碼片段弄亂了)。我希望由于并行性,我不會看到 order 中的值arr1[0], arr2[0], arr3[0],其次是arr1[0], arr2[0], arr3[1]等等。但是,我希望我至少會看到從arr1第一個開始的前幾個值的輸入,然后慢慢工作我走到盡頭的路arr1。我驚訝地發現它甚至沒有接近那個。問題在于,在該doComputationallyIntensiveThing方法中,只有當我們同時看到許多相同的值時,才會有一些緩存表現良好arr1。如果這些值是完全隨機輸入的,那么緩存弊大于利。有什么方法可以提示流以將輸入按中的值組合在一起的順序執行輸入arr1?如果沒有,那么我可能只為每個值創建一個新流,arr1它會正常工作,但我想看看是否有一種方法可以在一個流中完成這一切。
查看完整描述

2 回答

?
智慧大石

TA貢獻1946條經驗 獲得超3個贊

通常,您不應該假設并行流的特定處理順序,而是假設您的算法是正確的,無論實際處理順序如何,您都可以推斷順序和性能之間的關系。

Stream 實現已經被設計為允許從處理連續元素中受益——對于本地處理器。因此,當您有一個包含一百個元素的 Stream 時,例如IntStream.range(0, 100)為了簡化,并使用四個原本空閑的 CPU 內核對其進行處理,實現會將其分為四個范圍 0-25、25-50、50-75 和 75-100,最好是獨立處理。因此,每個處理器將在本地處理連續元素并受益于低級效果,例如一次將多個數組元素提取到其本地緩存中,等等。

因此,您的doComputationallyIntensiveThing方法的問題似乎是緩存(和您的監控)在本地無法正常工作。因此,繼續上面的示例,操作將從同時并行執行0、25、50和和。如果第一個評估的四個元素中的任何一個“獲勝”并確定緩存的數據,則它將僅適用于接下來的四個值中的一個。如果線程的時間發生變化,比率會變得更糟。751265176

一種解決方案是更改doComputationallyIntensiveThing為使用線程本地緩存,以從每個線程中連續元素的處理中受益。然后,您定義 Stream 操作的方式非常適合此操作,該操作受益于重復查看arr1. 不過,您可以簡化代碼并消除大量裝箱開銷:

Arrays.stream(arr1).parallel().forEach(i1 ->
    Arrays.stream(arr2).forEach(i2 ->
        Arrays.stream(arr3).forEach(i3 ->
            doComputationallyIntensiveThing(i1, i2, i3))));

但是,這帶來了之后清理線程本地緩存的挑戰,因為并行 Stream 使用了您無法控制的線程池。

一種更簡單的解決方法,即今天有效的方法,是更改嵌套:

Arrays.stream(arr2).parallel().forEach(i2 ->
    Arrays.stream(arr1).forEach(i1 ->
        Arrays.stream(arr3).forEach(i3 ->
            doComputationallyIntensiveThing(i1, i2, i3))));

現在,arr2按照上述方式進行拆分。然后,每個工作線程將對 進行相同的迭代arr1,處理其中的每個元素的次數與 中的元素一樣多arr3。這允許利用線程間緩存行為,但存在由于時間差異導致線程不同步的風險,最終會出現與以前相同的情況。

一個更好的選擇是重新設計doComputationallyIntensiveThing,創建兩種不同的方法,一種為返回包含元素緩存數據的對象的特定元素準備操作arr1,另一種用于使用緩存數據的實際處理:

Arrays.stream(arr1).parallel()
    .mapToObj(i1 -> prepareOperation(i1))
    .forEach(cached ->
        Arrays.stream(arr2).forEach(i2 ->
            Arrays.stream(arr3).forEach(i3 ->
                doComputationallyIntensiveThing(cached, i2, i3))));

在這里,返回的每個實例prepareOperation都與 的特定元素相關聯,arr1并充當與其相關聯的任何數據的本地緩存,但在特定元素的處理結束時會正常進行垃圾收集。所以不需要清理。

原則上,如果只返回一個空的持有者對象,它也可以工作,由特定元素prepareOperation的第一次調用填充。doComputationallyIntensiveThing


查看完整回答
反對 回復 2022-10-26
?
偶然的你

TA貢獻1841條經驗 獲得超3個贊

為了使代碼簡單,下面的代碼是針對一個數組的(您可以擴展它以包含更多數組)。


    class IteratorSpliteratorOfDouble implements Spliterator.OfDouble {


        private long m_estimate;

        private final DoubleSupplier m_supplier;


        /**

         * @param supplier -- returns Double.NaN if no more elements

         */

        private IteratorSpliteratorOfDouble(final long size,

                final DoubleSupplier supplier) {

            m_estimate = size;

            m_supplier = supplier;

        }


        public IteratorSpliteratorOfDouble(final double[] array) {

            this(array.length, new DoubleSupplier() {


                int m_idx = 0;


                @Override

                public synchronized double getAsDouble() {

                    if (m_idx >= array.length) {

                        return Double.NaN;

                    }


                    return array[m_idx++];

                }

            });

        }


        @Override

        public long estimateSize() {

            return m_estimate;

        }


        @Override

        public int characteristics() {

            return 0;

        }


        @Override

        public boolean tryAdvance(final DoubleConsumer action) {               


            final double next = m_supplier.getAsDouble();

            if (Double.isNaN(next)) {

                return false;

            }


            action.accept(next);

            return true;

        }


        @Override

        public Spliterator.OfDouble trySplit() {


            if (m_estimate == 0) {

                return null;

            }

            return new IteratorSpliteratorOfDouble(

                    m_estimate = m_estimate >>> 1, m_supplier);

        }

    }

使用上述的一個例子是:


    final double[] arr = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };

    StreamSupport.doubleStream(new IteratorSpliteratorOfDouble(arr), true)

            .forEach(idx -> doComputationallyIntensiveThing(idx));

該代碼將保持數組的元素順序,同時利用 java 并行。


查看完整回答
反對 回復 2022-10-26
  • 2 回答
  • 0 關注
  • 128 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號