亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用 TestSchedulers、Rx 和 BlockingCollection 進行死鎖測試

使用 TestSchedulers、Rx 和 BlockingCollection 進行死鎖測試

C#
莫回無 2023-09-16 15:01:30
我有以下類,它基本上訂閱int observable 并將值乘以 2。出于現實目的,我添加了 Thread.Sleep 來模擬繁重的處理。public class WorkingClass{    private BlockingCollection<int> _collection = new BlockingCollection<int>(1);    public WorkingClass(IObservable<int> rawValues)    {        rawValues.Subscribe(x => _collection.Add(x));    }    public IObservable<int> ProcessedValues()    {        return Observable.Create<int>(observer =>        {            while (true)            {                int value;                try                {                    value = _collection.Take();                }                catch (Exception ex)                {                    observer.OnError(ex);                    break;                }                Thread.Sleep(1000); //Simulate long work                observer.OnNext(value * 2);            }            return Disposable.Empty;        });    }}我在測試它時遇到了麻煩,在下面的測試中我只想斷言如果源流發出值 1,SUT 將發出值 2:[Test]public void SimpleTest(){    var sourceValuesScheduler = new TestScheduler();    var newThreadScheduler = new TestScheduler();    var source = sourceValuesScheduler.CreateHotObservable(         new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));    var sut = new WorkingClass(source);    var observer = sourceValuesScheduler.CreateObserver<int>();    sut.ProcessedValues()        .SubscribeOn(newThreadScheduler) //The cold part (i.e, the while loop) of the ProcessedValues Observable should run in a different thread        .Subscribe(observer);    sourceValuesScheduler.AdvanceTo(1000);    observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));}如果我運行此測試,則斷言會失敗,因為 newThreadScheduler 從未啟動,因此從未創建 ProcessedValues observable。如果我這樣做: sourceValuesScheduler.AdvanceTo(1000); newThreadScheduler.AdvanceTo(1000); 它也不起作用,因為 newThreadScheduler 使用與 sourceValuesScheduler 相同的線程,因此測試將在處理后的值被發出后立即掛起,在以下行:value = _collection.Take();有沒有辦法讓多個 TestScheduler 在不同的線程上運行?否則我怎么能測試這樣的課程呢?
查看完整描述

1 回答

?
料青山看我應如是

TA貢獻1772條經驗 獲得超8個贊

Take()阻塞,直到有一個項目可以從 中刪除BlockingCollection<int>或者您調用CompleteAdding()它。


ProcessedValues()鑒于您當前的實現,您訂閱并執行循環的線程while將永遠不會完成。


您應該BlockingCollection<int>在單獨的線程上使用它。Task例如,您可以在ProcessedValues()調用時創建消耗。考慮以下實現,它也處理BlockingCollection<int>:


public sealed class WorkingClass : IDisposable

{

    private BlockingCollection<int> _collection = new BlockingCollection<int>(1);

    private List<Task> _consumerTasks = new List<Task>();


    public WorkingClass(IObservable<int> rawValues)

    {

        rawValues.Subscribe(x => _collection.Add(x));

    }


    public IObservable<int> ProcessedValues()

    {

        return Observable.Create<int>(observer =>

        {

            _consumerTasks.Add(Task.Factory.StartNew(() => Consume(observer), TaskCreationOptions.LongRunning));

            return Disposable.Empty;

        });

    }


    private void Consume(IObserver<int> observer)

    {

        try

        {

            foreach (int value in _collection.GetConsumingEnumerable())

            {

                Thread.Sleep(1000); //Simulate long work

                observer.OnNext(value * 2);

            }

        }

        catch (Exception ex)

        {

            observer.OnError(ex);

        }

    }


    public void Dispose()

    {

        _collection.CompleteAdding();

        Task.WaitAll(_consumerTasks.ToArray());

        _collection.Dispose();

    }

}

可以使用以下代碼進行測試:


var sourceValuesScheduler = new TestScheduler();


var source = sourceValuesScheduler.CreateHotObservable(

    new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));


var observer = sourceValuesScheduler.CreateObserver<int>();


using (var sut = new WorkingClass(source))

{

    sourceValuesScheduler.AdvanceTo(1000); //add to collection

    sut.ProcessedValues().Subscribe(observer); //consume

} //...and wait until the loop exists


observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));



查看完整回答
反對 回復 2023-09-16
  • 1 回答
  • 0 關注
  • 113 瀏覽

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號