1 回答

TA貢獻1836條經驗 獲得超5個贊
你走在正確的軌道上:
我還嘗試過存儲未處理項目的數量,并在每個線程上更新此數字時執行鎖定。當未處理的項目為 0 時,我將調用 AddingCompleted 方法。
問題是你實際上是在計算活躍工人的數量,而不是未處理項目的數量。也就是說,當您開始處理某事時,您只會增加計數器,因此隊列中可能有許多其他項目未由該計數器表示。要執行后者,您需要做的是每次向隊列中添加內容時遞增一個計數器,然后每次完成處理隊列中的內容時遞減計數器。
現在,如果您嘗試過,您可能會遇到一個不同的問題:默認情況下,該Parallel.ForEach()方法會嘗試從源中批量處理項目。這不適用于像BlockingCollection<T>在枚舉期間可能阻塞的源,等待額外的數據。在您的示例中,這會導致死鎖,在Parallel.ForEach()等待更多項目之前它將對最近的批次進行排隊,同時BlockingCollection<T>等待更多項目被處理,從而導致更多項目排隊。
方法等待集合,ForEach()集合等待ForEach()方法,就會出現死鎖。
不過有一個解決方法:您可以提供ForEach()一個分區程序,該分區程序專門配置為不緩沖數據,而是在檢索工作項時立即將其排隊。
將這兩種策略放在一起,你會得到一個看起來像這樣的代碼版本(我為診斷目的添加了一些小的輸出更改):
static void Main(string[] args)
{
const int firstValue = 30;
const int secondValues = 20;
const int thirdValues = 10;
var process = new BlockingCollection<int>() { firstValue };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
int totalItemCount = process.Count;
OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>
{
string message;
if (item > secondValues)
{
// Some add 2 items
Interlocked.Add(ref totalItemCount, 2);
process.Add(item - 1);
process.Add(item - 1);
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";
}
else if (item > thirdValues)
{
// Some add 1 item
Interlocked.Increment(ref totalItemCount);
process.Add(item - 1);
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";
}
else
{
// Some add 0 items
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";
}
int newCount = Interlocked.Decrement(ref totalItemCount);
if (newCount == 0)
{
process.CompleteAdding();
}
Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");
});
// Parallel.ForEach will exit
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
- 1 回答
- 0 關注
- 131 瀏覽
添加回答
舉報