亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何知道何時停止并行 foreach,其中消費者也是 C# 中的生產者

如何知道何時停止并行 foreach,其中消費者也是 C# 中的生產者

C#
夢里花落0921 2022-11-21 20:32:42
我正在嘗試使用 Parallel.ForEach() 并行處理 BlockingCollection 中的一些項目。當處理一個項目時,它可以生成 0-2 個以上的項目來處理。要處理的項目數最終總是會達到 0。我的問題是,由于消費者也是生產者(處理項目可以生成更多要處理的項目),當 BlockingCollection 為空時,我無法調用 BlockingCollection 的 CompleteAdding(),因為當前可能有其他線程正在處理將生成更多項目的項目項目。因此我不知道如何讓 BlockingCollection/Parallel.ForEach 知道它可以退出。這是情況的示例(為簡單起見進行了修改)using System;using System.Collections.Concurrent;using System.Threading.Tasks;namespace Example{    class Example    {        static void Main(string[] args)        {            var process = new BlockingCollection<int>() { 30 };            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>            {                if (item > 20)                {                    // Some add 2 items                    process.Add(item - 1);                    process.Add(item - 1);                    Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");                }                else if (item > 10)                {                    // Some add 1 item                    process.Add(item-1);                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");                }                else                {                    // Some add 0 items                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");                }            });            // Parallel.ForEach never exits            Console.WriteLine("Completed Processing");            Console.ReadKey();        }    }}我嘗試將 Parallel.ForEach 期間的 MaxDegreeOfParallelism 修改為要處理的項目數和 Environment.ProcessorCount 中的最小值,但這在 Parallel.ForEach 期間沒有任何作用。我還嘗試過存儲未處理項目的數量,并在每個線程上更新此數字時執行鎖定。當未處理的項目為 0 時,我將調用 AddingCompleted 方法。這也不管用。
查看完整描述

1 回答

?
一只甜甜圈

TA貢獻1836條經驗 獲得超5個贊

你走在正確的軌道上:


我還嘗試過存儲未處理項目的數量,并在每個線程上更新此數字時執行鎖定。當未處理的項目為 0 時,我將調用 AddingCompleted 方法。


問題是你實際上是在計算活躍工人的數量,而不是未處理項目的數量。也就是說,當您開始處理某事時,您只會增加計數器,因此隊列中可能有許多其他項目未由該計數器表示。要執行后者,您需要做的是每次向隊列中添加內容時遞增一個計數器,然后每次完成處理隊列中的內容時遞減計數器。


現在,如果您嘗試過,您可能會遇到一個不同的問題:默認情況下,該Parallel.ForEach()方法會嘗試從源中批量處理項目。這不適用于像BlockingCollection<T>在枚舉期間可能阻塞的源,等待額外的數據。在您的示例中,這會導致死鎖,在Parallel.ForEach()等待更多項目之前它將對最近的批次進行排隊,同時BlockingCollection<T>等待更多項目被處理,從而導致更多項目排隊。


方法等待集合,ForEach()集合等待ForEach()方法,就會出現死鎖。


不過有一個解決方法:您可以提供ForEach()一個分區程序,該分區程序專門配置為不緩沖數據,而是在檢索工作項時立即將其排隊。


將這兩種策略放在一起,你會得到一個看起來像這樣的代碼版本(我為診斷目的添加了一些小的輸出更改):


static void Main(string[] args)

{

    const int firstValue = 30;

    const int secondValues = 20;

    const int thirdValues = 10;


    var process = new BlockingCollection<int>() { firstValue };


    var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };

    int totalItemCount = process.Count;


    OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);


    Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>

    {

        string message;


        if (item > secondValues)

        {

            // Some add 2 items

            Interlocked.Add(ref totalItemCount, 2);

            process.Add(item - 1);

            process.Add(item - 1);

            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";

        }

        else if (item > thirdValues)

        {

            // Some add 1 item

            Interlocked.Increment(ref totalItemCount);

            process.Add(item - 1);

            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";

        }

        else

        {

            // Some add 0 items

            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";

        }


        int newCount = Interlocked.Decrement(ref totalItemCount);


        if (newCount == 0)

        {

            process.CompleteAdding();

        }


        Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");

    });


    // Parallel.ForEach will exit

    Console.WriteLine("Completed Processing");    

    Console.ReadKey();

}



查看完整回答
反對 回復 2022-11-21
  • 1 回答
  • 0 關注
  • 131 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號