我開發一個服務軟件A,A接收從通信軟件B發過來的包含很多個設備信息的數據包,B發送數據包的頻率很快(2s);我現在用一個單線程在服務軟件A中對B發過來的數據包進行解析,先把解析出來的設備信息保存在內存中的一個集合S中(只維持每個設備最新的一條數據),然后再把解析出來的數據發給不同的客戶端C(有多個C1,C2,C3。。。。。)。測試后發現在A中處理包的速度跟不上B發送包的速度,會導致B發過來的很多包得不到及時的處理。我考慮過用多線程,但還沒有什么思路,如果用多線程的話我這個內存的集合S要怎么來管理。另外,整個過程對數據的實時性要求很高(就是B發送包后,在客戶端C中就能很快的接收到該包),望各位大蝦指教!
2 回答

呼如林
TA貢獻1798條經驗 獲得超3個贊
典型的生產、消費模型,建議用 Queue<T> 非線程安全,需要自己同步。
1 public class Message 2 { 3 private string iD; 4 5 public string ID 6 { 7 get { return iD; } 8 set { iD = value; } 9 } 10 11 private int number; 12 13 public int Number 14 { 15 get { return number; } 16 set { number = value; } 17 } 18 }
1 ///<summary> 2 /// 信息處理單元,封裝了線程、狀態和同步對象 3 ///</summary> 4 class Worker 5 { 6 public AutoResetEvent State = new AutoResetEvent(false); 7 8 public Thread Thread = null; 9 10 ///<summary> 11 /// 表示線程是否在運行著 12 ///</summary> 13 public volatile bool IsBusy = false; 14 }
1 public class MessageScheduler 2 { 3 #region Private Fields 4 5 ///<summary> 6 /// 線程對象列表 7 ///</summary> 8 private List<Worker> workerList = new List<Worker>(); 9 10 ///<summary> 11 /// 消息隊列 12 ///</summary> 13 private Queue<Message> messageList = new Queue<Message>(); 14 15 ///<summary> 16 /// 消息隊列同步對象 17 ///</summary> 18 private readonly object syncObject = new object(); 19 20 ///<summary> 21 /// 狀態 22 ///</summary> 23 private volatile bool running; 24 25 #endregion 26 27 28 #region Constructors 29 30 ///<summary> 31 /// 創建調度器(線程數量不要太多,否則線程的切換損耗很大) 32 ///</summary> 33 ///<param name="threadCount">要創建的線程數量</param> 34 public MessageScheduler(int threadCount) 35 { 36 running = false; 37 InitializeWorkerCount(threadCount); 38 } 39 40 #endregion 41 42 #region Private Methods 43 44 ///<summary> 45 /// 創建線程 46 ///</summary> 47 ///<param name="threadCount">要創建線程的數量</param> 48 private void InitializeWorkerCount(int threadCount) 49 { 50 Worker worker; 51 for (int i = 0; i < threadCount; i++) 52 { 53 worker = new Worker(); 54 worker.Thread = new Thread(new ParameterizedThreadStart(this.ProcessMessage)); 55 worker.Thread.IsBackground = true; 56 workerList.Add(worker); 57 } 58 } 59 60 ///<summary> 61 /// 消息處理程序 62 ///</summary> 63 ///<param name="param"></param> 64 private void ProcessMessage(object param) 65 { 66 Worker worker = param as Worker; 67 Message message = null; 68 worker.State.WaitOne(); 69 while (true) 70 { 71 //消息處理程序一定不要放到lock里面,否則多線程的性能可能會比單線程性能還低(線程切換會帶來損耗) 72 lock (this.syncObject) 73 { 74 if (messageList.Count > 0) 75 { 76 worker.IsBusy = true; 77 message = messageList.Dequeue(); 78 } 79 } 80 81 if (message != null) 82 { 83 //進行消息處理,可能比較消耗CPU和時間。 84 //這里僅僅輸出消息的Number 85 Console.WriteLine(message.Number); 86 87 } 88 else 89 { 90 worker.IsBusy = false; 91 worker.State.WaitOne(); 92 } 93 } 94 } 95 96 #endregion 97 98 #region Public Methods 99 100 ///<summary> 101 /// 啟動調度器 102 ///</summary> 103 public void Start() 104 { 105 running = true; 106 foreach (Worker worker in workerList) 107 { 108 worker.Thread.Start(worker); 109 } 110 } 111 112 ///<summary> 113 /// 停止調度器 114 ///</summary> 115 public void Stop() 116 { 117 running = false; 118 foreach (Worker worker in workerList) 119 { 120 worker.Thread.Abort(); 121 } 122 } 123 124 ///<summary> 125 /// 消息調度 126 ///</summary> 127 ///<param name="message"></param> 128 public void DoWork(Message message) 129 { 130 if (!running) 131 { 132 return; 133 } 134 lock (this.syncObject) 135 { 136 this.messageList.Enqueue(message); 137 } 138 foreach (Worker worker in this.workerList) 139 { 140 //如果某線程處于等待,則通知繼續 141 if (!worker.IsBusy) 142 { 143 worker.State.Set(); 144 } 145 } 146 } 147 148 #endregion 149 }
1 class Program 2 { 3 static MessageScheduler scheduler = new MessageScheduler(5); 4 5 static void Main(string[] args) 6 { 7 scheduler.Start(); 8 System.Threading.Thread thread = new System.Threading.Thread(CreateMessage); 9 thread.IsBackground = true; 10 thread.Start(); 11 Console.ReadLine(); 12 } 13 14 static void CreateMessage() 15 { 16 Message message; 17 int i = 0; 18 while (true) 19 { 20 message = new Message(); 21 message.ID = Guid.NewGuid().ToString(); 22 message.Number = i; 23 scheduler.DoWork(message); 24 i++; 25 System.Threading.Thread.Sleep(1); 26 } 27 } 28 }

qq_笑_17
TA貢獻1818條經驗 獲得超7個贊
首先要肯定你的問題出在哪里。
是在解析數據上還是在發送到C哪里。
根據你說的情況,我初步估計你在把數據發給C時用的同步方法。
也就是說你需要等到C接收完數據,你才會處理下一次發送或接收。
你只需發送到C時用異步發送即可
- 2 回答
- 0 關注
- 559 瀏覽
添加回答
舉報
0/150
提交
取消