亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

C#:使用 Parallel.ForEach 和異步操作限制最大并發操作

C#:使用 Parallel.ForEach 和異步操作限制最大并發操作

C#
慕碼人8056858 2022-01-16 20:05:06
我正在嘗試使用 asp.net core 2.1 實現自托管 Web 服務,但遇到了實現后臺長時間執行任務的問題。由于每種ProcessSingle方法(在下面的代碼片段中)的高 CPU 負載和時間消耗,我想限制同時執行的任務的數量。但是我可以看到所有任務Parallel.ForEach幾乎立即開始,盡管我設置了MaxDegreeOfParallelism = 3我的代碼是(這是一個簡化版本):public static async Task<int> Work(){    var id = await CreateIdInDB() // async create record in DB    // run background task, don't wait when it finishes    Task.Factory.StartNew(async () => {        Parallel.ForEach(            listOfData,            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },            async x => await ProcessSingle(x));    });    // return created id immediately    return id;}public static async Task ProcessSingle(MyInputData inputData){    var dbData = await GetDataFromDb(); // get data from DB async using Dapper    // some lasting processing (sync)    await SaveDataToDb(); // async save processed data to DB using Dapper}如果我理解正確,問題出async x => await ProcessSingle(x)在 Parallel.ForEach 內部,不是嗎?有人可以描述一下,它應該如何以正確的方式實施?更新由于我的問題存在某種歧義,因此有必要關注主要方面:方法分為三部分ProcessSingle:從數據庫異步獲取數據進行長時間高 CPU 負載的數學計算將結果保存到數據庫異步問題包括兩個獨立的:如何降低 CPU 使用率(例如同時運行不超過三個數學計算)?如何保持ProcessSingle方法的結構 - 由于異步 DB 調用而使它們保持異步。希望現在會更清楚。PS 已經給出了合適的答案,它可以工作(特別感謝@MatrixTai)。編寫此更新是為了進行一般說明。
查看完整描述

2 回答

?
慕森卡

TA貢獻1806條經驗 獲得超8個贊

更新


正如我剛剛注意到您在評論中提到的那樣,問題是由數學計算引起的。


將計算和更新數據庫的部分分開會更好。


對于計算部分,使用Parallel.ForEach()以便優化您的工作,您可以控制線程數。


只有在所有這些任務完成之后。用于async-await將您的數據更新到數據庫,無需SemaphoreSlim我提及。


public static async Task<int> Work()

{

    var id = await CreateIdInDB() // async create record in DB


    // run background task, don't wait when it finishes

    Task.Run(async () => {


        //Calculation Part

        ConcurrentBag<int> data = new ConcurrentBag<int>();

        Parallel.ForEach(

            listOfData,

            new ParallelOptions { CancellationToken = token, MaxDegreeOfParallelism = 3 },

            x => {ConcurrentBag.Add(calculationPart(x))});


        //Update DB part

        int[] data_arr = data.ToArray();

        List<Task> worker = new List<Task>();

        foreach (var i in data_arr)

        {

            worker.Add(DBPart(x));

        }

        await Task.WhenAll(worker);

    });


    // return created id immediately

    return id;

}

當你async-await在Parallel.forEach.


首先,閱讀第一個和第二個答案的這個問題。將這兩者結合起來毫無意義。


實際上async-await會最大限度地利用可用線程,所以簡單地使用它。


public static async Task<int> Work()

{

    var id = await CreateIdInDB() // async create record in DB


    // run background task, don't wait when it finishes

    Task.Run(async () => {

        List<Task> worker = new List<Task>();

        foreach (var i in listOfData)

        {

            worker.Add(ProcessSingle(x));

        }

        await Task.WhenAll(worker);

    });


    // return created id immediately

    return id;

}

但是還有另一個問題,在這種情況下,這些任務仍然一起開始,消耗你的 CPU 使用率。


因此,為避免這種情況,請使用 SemaphoreSlim


public static async Task<int> Work()

{

    var id = await CreateIdInDB() // async create record in DB


    // run background task, don't wait when it finishes

    Task.Run(async () => {

        List<Task> worker = new List<Task>();

        //To limit the number of Task started.

        var throttler = new SemaphoreSlim(initialCount: 20);

        foreach (var i in listOfData)

        {

            await throttler.WaitAsync();

            worker.Add(Task.Run(async () =>

            {

                await ProcessSingle(x);

                throttler.Release();

            }

            ));

        }

        await Task.WhenAll(worker);

    });


    // return created id immediately

    return id;

}


此外,不要Task.Factory.StartNew()在簡單Task.Run()就足以完成您想要的工作時使用,請閱讀 Stephen Cleary 撰寫的這篇出色的文章。


查看完整回答
反對 回復 2022-01-16
?
牛魔王的故事

TA貢獻1830條經驗 獲得超3個贊

如果您更熟悉“傳統”并行處理概念,請像這樣重寫您的 ProcessSingle() 方法:


public static void ProcessSingle(MyInputData inputData)

{

    var dbData = GetDataFromDb(); // get data from DB async using Dapper

    // some lasting processing (sync)

    SaveDataToDb(); // async save processed data to DB using Dapper

}

當然,您最好也以類似的方式更改 Work() 方法。


查看完整回答
反對 回復 2022-01-16
  • 2 回答
  • 0 關注
  • 1152 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號