关于订阅的一些观察
(对不起,无法抗拒双关语!) IObservable
,几乎每个Rx运算符实现的接口,只有一个重要的方法:
IDisposable Subscribe(IObserver observer);
纯粹通过这种方法并处理它的返回值,观察者(实现IObserver
)可以确定订阅的开始和结束时间。
通常 (直接或间接)对作为链的一部分的Observable进行订阅时,这将导致在链中进一步订阅。 确切地说,如果发生这种情况,那么就是Observable。
在许多情况下,订阅收到的订阅之间的关系不是一对一的。 一个例子是Publish(),它最多只能有一个订阅源,无论它收到多少订阅。 这真的是Publish的重点。
在其他情况下,这种关系具有时间方面。 例如,Concat()不会订阅它的第二个流,直到第一个流具有OnCompleted()
– 这可能永远不会!
值得花一点时间来研究Rx设计指南 ,因为它们有一些非常相关的话要说:
Rx设计指南
4.4。 尽最大努力停止取消订阅的所有优秀工作。 当在可观察订阅上调用取消订阅时,可观察序列将尽最大努力尝试停止所有未完成的工作。 这意味着任何尚未启动的排队工作都将无法启动。
任何正在进行的工作仍可能完成,因为中止正在进行的工作并不总是安全的。 此工作的结果不会发送给任何先前订阅的观察者实例。
底线
注意这里的含义; 最重要的是, 当任何上游订阅可能被制作或处置时 ,它完全取决于Observable的实现 。 换句话说, 绝对不能保证处理订阅会导致Observable处置它直接或间接发出的任何或所有订阅。 这适用于运营商或其上游订阅使用的任何其他资源(例如计划的操作)。
您可以期待的最好的是每个上游运营商的作者确实尽最大努力阻止所有出色的工作。
回到问题(最后!)
在没有看到SomeMoreRXFunctions
的内容的SomeMoreRXFunctions
我无法确定,但看起来很可能是您所看到的exception是因为 – 尽管处置了您所知道的订阅 – 通过处置调度程序,您已经从下面撕下了地毯还在运行订阅的脚。 实际上,你是造成这样的:
void Main() { var scheduler = new EventLoopScheduler(); // Decide it's time to stop scheduler.Dispose(); // The next line will throw an ObjectDisposedException scheduler.Schedule(() => {}); }
编写一个完全合理的运算符很容易导致这个问题 – 即使是不直接使用调度程序的运算符! 考虑一下:
public static class ObservableExtensions { public static IObservable ReasonableDelay (this IObservable source, IObservable delay) { return Observable.Create(observer => { var subscription = new SerialDisposable(); subscription.Disposable = delay .IgnoreElements() .Subscribe(_ => {}, () => { Console.WriteLine("Waiting to subscribe to source"); // Artifical sleep to create a problem Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Subscribing to source"); // Is this line safe? subscription.Disposable = source.Subscribe(observer); }); return subscription; }); } }
一旦传递的延迟可观察完成,该运营商将订阅源。 看看它有多合理 – 它使用一个SerialDisposable
来正确地将两个潜在的单独订阅作为一个一次性的观察者呈现给它。
但是,破坏此运算符并使其导致exception是微不足道的:
void Main() { var scheduler = new EventLoopScheduler(); var rx = Observable.Range(0, 10, scheduler) .ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1))); var subs = rx.Subscribe(); Thread.Sleep(TimeSpan.FromSeconds(2)); subs.Dispose(); scheduler.Dispose(); }
这里发生了什么事? 我们在EventLoopScheduler上创建一个Range
,但是使用它的默认调度程序将我们的ReasonableDelay
与使用Timer
创建的延迟流相关联。
现在我们订阅,等到我们的延迟流完成,然后我们按照“正确的顺序”处理我们的订阅和EventLoopScheduler。
我使用Thread.Sleep
插入的人工延迟确保了一种可以轻易自然发生的竞争条件 – 延迟已经完成,订阅已经处理但是为了防止Range
操作员访问已处置的EventLoopScheduler为时已晚。
我们甚至可以加强我们的合理努力,以检查观察员是否在延迟部分完成后取消订阅:
// In the ReasonableDelay method .Subscribe(_ => {}, () => { if(!subscription.IsDisposed) // Check for unsubscribe { Console.WriteLine("Waiting to subscribe to source"); // Artifical sleep to create a problem Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Subscribing to source"); // Is this line safe? subscription.Disposable = source.Subscribe(observer); } });
它无济于事。 纯粹在此运算符的上下文中也无法使用锁定语义。
你做错了什么
你没有处理EventLoopScheduler的业务! 一旦将其传递给其他Rx操作员,您就已经承担了责任。 由Rx运营商遵循指南,尽可能及时地清理他们的订阅 – 这意味着直接或间接取消EventLoopScheduler上的任何待处理的计划项目并停止任何进一步的计划,以便它的队列清空可能。
在上面的示例中,您可以将问题归因于多个调度程序的有些人为的使用以及ReasonableDelay中的强制Sleep – 但是对于操作员无法立即清理的真实场景并不难。
本质上,通过配置Rx调度程序,您正在执行Rx等效的线程中止。 就像那个场景一样,你可能有例外处理!
正确的做法是拆开神秘的SomeMoreRXFunctions()
并确保它们尽可能地遵守指南。
部分解决了。 案件比较复杂,然后在这里显示。 链是这样的:
var published = someSubject.ObserveOn(m_eventLoopScheduler).SomeMoreRXFunctions()。Publish();
IDisposable disposable1 = published.Connect();
IDisposable disposable2 = published.Subscribe((something)=> something);
如果我同时处理了disposable1和disposable2,则SomeMoreRXFunctions()中的代码不再执行。 另一方面,尝试处理调度程序本身仍然会抛出相同的exception。
不幸的是,我无法用更简单的代码重现这个问题。 这可能表明我还缺少一些东西。
这是一个我们可以忍受的解决方案,但我仍然希望找到更好的方法,一次性关闭调度程序,没有exception的机会。
上述就是C#学习教程:抛出EventLoopScheduler后RX2.0:ObjectDisposedException分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—编程笔记