1 回答

TA贡献1777条经验 获得超10个赞
一个 observable 代表一个通知流或事件流。当一个可观察的来源来自一个事件时,它们本质上是无穷无尽的。observable 连接到事件,引用对象,因此支持事件的对象永远不会超出范围。.NET/C# 没有提供一种方法来指示一个事件将不再被调用,因此直接连接到该事件的 observable 是无穷无尽的。
这并不少见;大多数基于事件的可观察对象从未OnCompleted明确调用过,而是对现实世界进行建模,在这个世界中很难明确地说某些事情永远不会再发生。
然而,这不是问题:Observables 意味着无限运行,并且不会造成任何损害。未订阅的 observable 不会占用太多资源。如果你对事件源 observable 不感兴趣,取消订阅所有订阅就可以了。
一种方法是使用其中一个Take运算符,例如TakeUntil运算符(如下所述)。尝试以下代码(使用您的Generator课程):
var g = new Generator<int>();
g.Items
.TakeUntil(i => i > 3)
.Subscribe(
i => Console.WriteLine($"OnNext: {i}"),
e => Console.WriteLine($"OnError: Message: {e.Message}"),
() => Console.WriteLine("OnCompleted")
);
g.Push(1);
g.Push(2);
g.Push(3);
g.Push(4);
g.Push(5);
g.Push(6);
输出:
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnCompleted
TakeUntil在有整数大于 3 的消息后取消订阅Itemsobservable。这就是为什么有 OnCompleted,没有 5、6 条消息的原因。
另外,正如 Enigmativity 所提到的,您的Generator<T>课程与 基本相同Subject<T>,我建议您使用它。
原答案:
从事件中制作另一个可观察的,然后使用.TakeUntil:
class Generator<T>
{
event Action<T> onPush;
event Action<Unit> onCompleted;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d)
.TakeUntil(Completion);
public IObservable<Unit> Completion =>
Observable.FromEvent<Unit>(d => onCompleted += d, d => onCompleted -= d);
public void Push(T item) => onPush?.Invoke(item);
public void Complete() => onCompleted?.Invoke(Unit.Default);
}
- 1 回答
- 0 关注
- 119 浏览
添加回答
举报