1 回答

TA貢獻1777條經驗 獲得超10個贊
一個 observable 代表一個通知流或事件流。當一個可觀察的來源來自一個事件時,它們本質上是無窮無盡的。observable 連接到事件,引用對象,因此支持事件的對象永遠不會超出范圍。.NET/C# 沒有提供一種方法來指示一個事件將不再被調用,因此直接連接到該事件的 observable 是無窮無盡的。
這并不少見;大多數基于事件的可觀察對象從未OnCompleted明確調用過,而是對現實世界進行建模,在這個世界中很難明確地說某些事情永遠不會再發生。
然而,這不是問題:Observables 意味著無限運行,并且不會造成任何損害。未訂閱的 observable 不會占用太多資源。如果你對事件源 observable 不感興趣,取消訂閱所有訂閱就可以了。
一種方法是使用其中一個Take運算符,例如TakeUntil運算符(如下所述)。嘗試以下代碼(使用您的Generator課程):
var g = new Generator<int>();
g.Items
.TakeUntil(i => i > 3)
.Subscribe(
i => Console.WriteLine($"OnNext: {i}"),
e => Console.WriteLine($"OnError: Message: {e.Message}"),
() => Console.WriteLine("OnCompleted")
);
g.Push(1);
g.Push(2);
g.Push(3);
g.Push(4);
g.Push(5);
g.Push(6);
輸出:
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
TakeUntil在有整數大于 3 的消息后取消訂閱Itemsobservable。這就是為什么有 OnCompleted,沒有 5、6 條消息的原因。
另外,正如 Enigmativity 所提到的,您的Generator<T>課程與 基本相同Subject<T>,我建議您使用它。
原答案:
從事件中制作另一個可觀察的,然后使用.TakeUntil:
class Generator<T>
{
event Action<T> onPush;
event Action<Unit> onCompleted;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d)
.TakeUntil(Completion);
public IObservable<Unit> Completion =>
Observable.FromEvent<Unit>(d => onCompleted += d, d => onCompleted -= d);
public void Push(T item) => onPush?.Invoke(item);
public void Complete() => onCompleted?.Invoke(Unit.Default);
}
- 1 回答
- 0 關注
- 109 瀏覽
添加回答
舉報