1 回答

TA貢獻1772條經驗 獲得超8個贊
Take()阻塞,直到有一個項目可以從 中刪除BlockingCollection<int>或者您調用CompleteAdding()它。
ProcessedValues()鑒于您當前的實現,您訂閱并執行循環的線程while將永遠不會完成。
您應該BlockingCollection<int>在單獨的線程上使用它。Task例如,您可以在ProcessedValues()調用時創建消耗。考慮以下實現,它也處理BlockingCollection<int>:
public sealed class WorkingClass : IDisposable
{
private BlockingCollection<int> _collection = new BlockingCollection<int>(1);
private List<Task> _consumerTasks = new List<Task>();
public WorkingClass(IObservable<int> rawValues)
{
rawValues.Subscribe(x => _collection.Add(x));
}
public IObservable<int> ProcessedValues()
{
return Observable.Create<int>(observer =>
{
_consumerTasks.Add(Task.Factory.StartNew(() => Consume(observer), TaskCreationOptions.LongRunning));
return Disposable.Empty;
});
}
private void Consume(IObserver<int> observer)
{
try
{
foreach (int value in _collection.GetConsumingEnumerable())
{
Thread.Sleep(1000); //Simulate long work
observer.OnNext(value * 2);
}
}
catch (Exception ex)
{
observer.OnError(ex);
}
}
public void Dispose()
{
_collection.CompleteAdding();
Task.WaitAll(_consumerTasks.ToArray());
_collection.Dispose();
}
}
可以使用以下代碼進行測試:
var sourceValuesScheduler = new TestScheduler();
var source = sourceValuesScheduler.CreateHotObservable(
new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));
var observer = sourceValuesScheduler.CreateObserver<int>();
using (var sut = new WorkingClass(source))
{
sourceValuesScheduler.AdvanceTo(1000); //add to collection
sut.ProcessedValues().Subscribe(observer); //consume
} //...and wait until the loop exists
observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));
- 1 回答
- 0 關注
- 113 瀏覽
添加回答
舉報