亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在低于正常優先級的 Windows 服務中停止 Parallel.ForEach

在低于正常優先級的 Windows 服務中停止 Parallel.ForEach

C#
jeck貓 2021-11-28 20:04:07
Parallel.ForEach我的 Windows 服務中有一個代碼。如果ParallelOptions.MaxDegreeOfParallelism設置為 -1,我將使用大部分 CPU。但是停止服務會持續半分鐘。一些應該接收服務應該停止的信號的內部控制器線程因處理器時間不足而被餓死。我將進程優先級設置為低于正常值,但這可能與此處無關。即使所有線程都忙,如何縮短停止服務的時間?我想暫時降低線程池中線程的優先級,因為我沒有任何異步代碼,但 Internet 說這是一個壞主意,所以在這里要求一種“正確”的方法。螺紋(包括操作系統和.NET)在所有情況下之間的不同OnStart和OnStop。此外,如果停止時間很長,那么OnStop有時最終會被調用的 OS 線程是一個新線程,不會更早地顯示在日志中。要構建此代碼,請創建新的 Windows 服務項目,從設計器添加 ProjectInstaller 類,將 Account 更改為 LocalService,并使用 InstallUtil 安裝一次。確保 LocalService 可以寫入 C:\Temp。public partial class Service1 : ServiceBase{    private ManualResetEvent stopEvent = new ManualResetEvent(false);    private Task mainTask;    private StreamWriter writer = File.AppendText(@"C:\Temp\Log.txt");    public Service1()    {        InitializeComponent();        writer.AutoFlush = true;    }    protected override void OnStart(string[] args)    {        Log("--------------");        Log("OnStart");        mainTask = Task.Run(new Action(Run));    }    protected override void OnStop()    {        Log("OnStop");        stopEvent.Set();        mainTask.Wait();        Log("--------------");    }    private void Log(string line)    {        writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",            DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));    }
查看完整描述

3 回答

?
千萬里不及你

TA貢獻1784條經驗 獲得超9個贊

這段代碼將在一兩秒內停止服務,而已經在計算的線程只有在完成實際工作后才會結束。正如您在服務中看到的那樣,OnStop 方法立即接收信號。但是,TaskManager 顯示與服務關聯的進程只有在消費線程全部完成后才會停止。


這使用了一個單獨的線程正在填充的字符串(路徑)的 BlockingCollection。并且有許多低優先級的線程會消耗字符串。


public partial class Service1 : ServiceBase

{

    private StreamWriter writer = File.AppendText(@"C:\temp\Log.txt");


    const int nbTreads = 30;

    BlockingCollection<string> dataItems;

    bool stopCompute = false;

    List<Thread> threads = new List<Thread>();

    Thread threadProd;

    private object aLock = new object();


    public Service1()

    {

        InitializeComponent();


        dataItems = new BlockingCollection<string>(nbTreads);


        writer.AutoFlush = true;

    }



    protected override void OnStart(string[] args)

    {

        Log("--------------");

        Log("OnStart");

        threadProd = new Thread(new ThreadStart(ProduireNomFichier));

        threadProd.Start();

        Thread.Sleep(1000); // fill the collection a little

        for (int i = 0; i < nbTreads; i++)

        {

            Thread threadRun = new Thread(() => Run());

            threadRun.Priority = ThreadPriority.Lowest;

            threadRun.Start();

            threads.Add(threadRun);

        }

    }


    private void ProduireNomFichier()

    {

        foreach (string nomFichier in Directory.EnumerateFiles(Environment.SystemDirectory))

        {

            dataItems.Add(nomFichier);

        }

    }


    protected override void OnStop()

    {

        lock (aLock)

        {

            stopCompute = true;

        }

        Log("OnStop");

        Log("--------------");

        threadProd.Abort();

    }


    private void Log(string line)

    {

        writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",

            DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));

    }


    private void Run()

    {

        try

        {

            using (var sha = SHA256.Create())

            {

                while (dataItems.TryTake(out string fileName))

                {

                    lock (aLock)

                    {

                        if (stopCompute) return;

                    }

                    try

                    {

                        var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());

                        Log(String.Format("file={0}, sillyhash={1}", fileName, Convert.ToBase64String(hash)));

                    }

                    catch (Exception ex)

                    {

                        Log(String.Format("file={0}, exception={1}", fileName, ex.Message));

                    }

                }

            }

        }

        catch (Exception ex)

        {

            Log(String.Format("exception={0}", ex.Message));

        }

    }

}


查看完整回答
反對 回復 2021-11-28
?
函數式編程

TA貢獻1807條經驗 獲得超9個贊

在 Parallel.Foreach 中,您讀取文件的所有字節,然后使用 LINQ 對它們進行排序。這效率不高。嘗試使用 Array.Sort。對于 25 Mb 的文件,速度可以提高 85%。


Array.Sort 2230 ms

OrderBy 14650 ms

而且因為 OnStop 方法會等待任何已經開始的迭代結束,所以它可以更快地停止您的服務。


var fileBinary = File.ReadAllBytes(fileName);

Array.Sort(fileBinary);

var hash = sha.ComputeHash(fileBinary);


查看完整回答
反對 回復 2021-11-28
?
SMILET

TA貢獻1796條經驗 獲得超4個贊

這是一個工作代碼。它立即停止。請注意,主要思想來自:SylF。


但我不能給出一個明確的解釋為什么會發生......更新(在你下面的評論之后):你找到了原因,它很好地解釋了為什么你有這種行為。謝謝!我真的很高興知道。


盡管這項工作是在低優先級線程中完成的,但在 CPU 幾乎沒有工作的機器上,您應該不會注意到任何額外的延遲。


抱歉,我混淆了您的代碼示例以實現一些測試。但主要思想是改變調度程序(似乎不推薦)。但這是我找到的唯一方法。


代碼:


using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Diagnostics;

using System.IO;

using System.Linq;

using System.Security.Cryptography;

using System.ServiceProcess;

using System.Text;

using System.Threading;

using System.Threading.Tasks;


namespace StackOverflowQuestionWindowsService1

{

    public partial class Service1 : ServiceBase

    {

        private ManualResetEvent stopEvent = new ManualResetEvent(false);

        private Task mainTask;

        private StreamWriter writer = File.CreateText(@"C:\Temp\Log.txt");     //TAKE CARE - I do not append anymore  ********

        private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        private int count = 0;


        public Service1()

        {

            InitializeComponent();


            writer.AutoFlush = true;

        }


        protected override void OnStart(string[] args)

        {

            Log("--------------");

            Log("OnStart");


            Task.Run(()=>Run());

        }


        protected override void OnStop()

        {

            Log("OnStop with actual thread count: " + Process.GetCurrentProcess().Threads.Count);


            cancellationTokenSource.Cancel();

        }


        private void Log(string line)

        {

            writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",

                DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));

        }


        private void Run()

        {

            Stopwatch stopWatchTotal = new Stopwatch();

            stopWatchTotal.Start();


            try

            {

                using (var sha = SHA256.Create())

                {

                    var parallelOptions = new ParallelOptions();

                    parallelOptions.MaxDegreeOfParallelism = -1;

                    parallelOptions.CancellationToken = cancellationTokenSource.Token;

                    parallelOptions.TaskScheduler = new PriorityScheduler(ThreadPriority.Lowest);


                    Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),

                        parallelOptions, (fileName, parallelLoopState) =>

                        {

                            // Thread.CurrentThread.Priority = ThreadPriority.Lowest;

                            Stopwatch stopWatch = new Stopwatch();

                            stopWatch.Start();


                            Interlocked.Increment(ref count);


                            if (parallelOptions.CancellationToken.IsCancellationRequested)

                            {

                                Log(String.Format($"{count}"));

                                return;

                            }


                            try

                            {

                                var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());

                                stopWatch.Stop();

                                Log(FormatTicks(stopWatch.ElapsedTicks));

                                Log(String.Format($"{count}, {FormatTicks(stopWatch.ElapsedTicks)}, file={fileName}, sillyhash={Convert.ToBase64String(hash)}"));

                            }

                            catch (Exception ex)

                            {

                                Log(String.Format($"{count} file={fileName}, exception={ex.Message}"));

                            }

                        });

                }

            }

            catch (Exception ex)

            {

                Log(String.Format("exception={0}", ex.Message));

            }


            stopWatchTotal.Stop();


            Log(FormatTicks(stopWatchTotal.ElapsedTicks));


            writer.Close();

            Process.GetCurrentProcess().Kill();

        }


        private string FormatTicks(long ticks)

        {

            return new TimeSpan(ticks).ToString();

        }

    }

}

優先調度程序:(感謝 Roman Starkov 在:來自微軟Bnaya Eshet 的StackOverflow)


using System;

using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading;

using System.Threading.Tasks;


namespace StackOverflowQuestionWindowsService1

{

    public class PriorityScheduler : TaskScheduler

    {

        public static PriorityScheduler AboveNormal = new PriorityScheduler(ThreadPriority.AboveNormal);

        public static PriorityScheduler BelowNormal = new PriorityScheduler(ThreadPriority.BelowNormal);

        public static PriorityScheduler Lowest = new PriorityScheduler(ThreadPriority.Lowest);


        private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();

        private Thread[] _threads;

        private ThreadPriority _priority;

        private readonly int _maximumConcurrencyLevel = Math.Max(1, Environment.ProcessorCount);


        public PriorityScheduler(ThreadPriority priority)

        {

            _priority = priority;

        }


        public override int MaximumConcurrencyLevel

        {

            get { return _maximumConcurrencyLevel; }

        }


        protected override IEnumerable<Task> GetScheduledTasks()

        {

            return _tasks;

        }


        protected override void QueueTask(Task task)

        {

            _tasks.Add(task);


            if (_threads == null)

            {

                _threads = new Thread[_maximumConcurrencyLevel];

                for (int i = 0; i < _threads.Length; i++)

                {

                    int local = i;

                    _threads[i] = new Thread(() =>

                    {

                        foreach (Task t in _tasks.GetConsumingEnumerable())

                            base.TryExecuteTask(t);

                    });

                    _threads[i].Name = string.Format("PriorityScheduler: ", i);

                    _threads[i].Priority = _priority;

                    _threads[i].IsBackground = true;

                    _threads[i].Start();

                }

            }

        }


        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)

        {

            return false; // we might not want to execute task that should schedule as high or low priority inline

        }

    }

}



查看完整回答
反對 回復 2021-11-28
  • 3 回答
  • 0 關注
  • 273 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號