热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

C#学习教程:抛出EventLoopScheduler后RX2.0:ObjectDisposedException分享

抛出EventLoopScheduler后RX2.0:ObjectDisposedException我们最近将系统从RX1.11111移植到RX2.0并发现了这个问题。我们对Obs

抛出EventLoopScheduler后RX2.0:ObjectDisposedException

我们最近将系统从RX 1.11111移植到RX 2.0并发现了这个问题。 我们对ObserveOn使用EventLoopScheduler,如下所示:

IDisposable subscription = someSubject .ObserveOn(m_eventLoopScheduler) .SomeMoreRXFunctions() .Subscribe((something)=>something) 

调度程序m_eventLoopScheduler.Dispose在应用程序出口( m_eventLoopScheduler.Dispose )上。 在此之前,我们处理observable( subscription.Dispose )的所有subscription.Dispose

尽管如此,我们在EventLoopScheduler.Schedule中得到了一个ObjectDisposedException 。 捕获该exception是不可能的,因为它起源于RX线程。 这几乎就像Dispose没有摆脱某些队列中的所有项目。

我们试图删除对EventLoopScheduler.Dispose的调用,exception消失了。 但是, SomeMoreRXFunctions()的代码执行了大约10次,尽管所有订阅都被处理掉了。

还有其他方法可以正确关闭EventLoopScheduler吗?

关于订阅的一些观察

(对不起,无法抗拒双关语!) IObservable ,几乎每个Rx运算符实现的接口,只有一个重要的方法:

 IDisposable Subscribe(IObserver observer); 

纯粹通过这种方法并处理它的返回值,观察者(实现IObserver )可以确定订阅的开始和结束时间。

通常 (直接或间接)对作为链的一部分的Observable进行订阅时,这将导致在链中进一步订阅。 确切地说,如果发生这种情况,那么就是Observable。

在许多情况下,订阅收到的订阅之间的关系不是一对一的。 一个例子是Publish(),它最多只能有一个订阅源,无论它收到多少订阅。 这真的是Publish的重点。

在其他情况下,这种关系具有时间方面。 例如,Concat()不会订阅它的第二个流,直到第一个流具有OnCompleted() – 这可能永远不会!

值得花一点时间来研究Rx设计指南 ,因为它们有一些非常相关的话要说:

Rx设计指南

4.4。 尽最大努力停止取消订阅的所有优秀工作。 当在可观察订阅上调用取消订阅时,可观察序列将尽最大努力尝试停止所有未完成的工作。 这意味着任何尚未启动的排队工作都将无法启动。

任何正在进行的工作仍可能完成,因为中止正在进行的工作并不总是安全的。 此工作的结果不会发送给任何先前订阅的观察者实例。

底线

注意这里的含义; 最重要的是, 当任何上游订阅可能被制作或处置时 ,它完全取决于Observable的实现 。 换句话说, 绝对不能保证处理订阅会导致Observable处置它直接或间接发出的任何或所有订阅。 这适用于运营商或其上游订阅使用的任何其他资源(例如计划的操作)。

您可以期待的最好的是每个上游运营商的作者确实尽最大努力阻止所有出色的工作。

回到问题(最后!)

在没有看到SomeMoreRXFunctions的内容的SomeMoreRXFunctions我无法确定,但看起来很可能是您所看到的exception是因为 – 尽管处置了您所知道的订阅 – 通过处置调度程序,您已经从下面撕下了地毯还在运行订阅的脚。 实际上,你是造成这样的:

 void Main() { var scheduler = new EventLoopScheduler(); // Decide it's time to stop scheduler.Dispose(); // The next line will throw an ObjectDisposedException scheduler.Schedule(() => {}); } 

编写一个完全合理的运算符很容易导致这个问题 – 即使是不直接使用调度程序的运算符! 考虑一下:

 public static class ObservableExtensions { public static IObservable ReasonableDelay (this IObservable source, IObservable delay) { return Observable.Create(observer => { var subscription = new SerialDisposable(); subscription.Disposable = delay .IgnoreElements() .Subscribe(_ => {}, () => { Console.WriteLine("Waiting to subscribe to source"); // Artifical sleep to create a problem Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Subscribing to source"); // Is this line safe? subscription.Disposable = source.Subscribe(observer); }); return subscription; }); } } 

一旦传递的延迟可观察完成,该运营商将订阅源。 看看它有多合理 – 它使用一个SerialDisposable来正确地将两个潜在的单独订阅作为一个一次性的观察者呈现给它。

但是,破坏此运算符并使其导致exception是微不足道的:

 void Main() { var scheduler = new EventLoopScheduler(); var rx = Observable.Range(0, 10, scheduler) .ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1))); var subs = rx.Subscribe(); Thread.Sleep(TimeSpan.FromSeconds(2)); subs.Dispose(); scheduler.Dispose(); } 

这里发生了什么事? 我们在EventLoopScheduler上创建一个Range ,但是使用它的默认调度程序将我们的ReasonableDelay与使用Timer创建的延迟流相关联。

现在我们订阅,等到我们的延迟流完成,然后我们按照“正确的顺序”处理我们的订阅和EventLoopScheduler。

我使用Thread.Sleep插入的人工延迟确保了一种可以轻易自然发生的竞争条件 – 延迟已经完成,订阅已经处理但是为了防止Range操作员访问已处置的EventLoopScheduler为时已晚。

我们甚至可以加强我们的合理努力,以检查观察员是否在延迟部分完成后取消订阅:

 // In the ReasonableDelay method .Subscribe(_ => {}, () => { if(!subscription.IsDisposed) // Check for unsubscribe { Console.WriteLine("Waiting to subscribe to source"); // Artifical sleep to create a problem Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Subscribing to source"); // Is this line safe? subscription.Disposable = source.Subscribe(observer); } }); 

它无济于事。 纯粹在此运算符的上下文中也无法使用锁定语义。

你做错了什么

你没有处理EventLoopScheduler的业务! 一旦将其传递给其他Rx操作员,您就已经承担了责任。 由Rx运营商遵循指南,尽可能及时地清理他们的订阅 – 这意味着直接或间接取消EventLoopScheduler上的任何待处理的计划项目并停止任何进一步的计划,以便它的队列清空可能。

在上面的示例中,您可以将问题归因于多个调度程序的有些人为的使用以及ReasonableDelay中的强制Sleep – 但是对于操作员无法立即清理的真实场景并不难。

本质上,通过配置Rx调度程序,您正在执行Rx等效的线程中止。 就像那个场景一样,你可能有例外处理!

正确的做法是拆开神秘的SomeMoreRXFunctions()并确保它们尽可能地遵守指南。

刚刚注意到这个问题作为这个问题的链接: 处理后的Reactive Rx 2.0 EventLoopScheduler ObjectDisposedException

应该在这里重新发布我在那里所做的事情 – 我不知道有什么方法可以“刷新”调度程序,但是你可以用这种方式包装/处理不可避免的“对象处置”exception:

 EventLoopScheduler scheduler = new EventLoopScheduler(); var wrappedScheduler = scheduler.Catch((ex) => { Console.WriteLine("Got an exception:" + ex.ToString()); return true; }); for (int i = 0; i <100; ++i) { var handle = Observable.Interval(TimeSpan.FromMilliseconds(1)) .ObserveOn(wrappedScheduler) .Subscribe(Observer.Create((x) => Thread.Sleep(1000))); handles.Add(handle); } 

我们遇到了同样的问题,并最终执行以下操作来处置EventLoopScheduler,没有例外:

scheduler.Schedule(() => scheduler.Dispose());

如果你在执行此操作之前正确处理了所有订阅(你说你做了),那么在调用Dispose之前,Dipose()调用是最后一次调度操作,所有其他挂起操作都可以完成。

为了使其更加健壮/可重用,您可以创建自己的IScheduler实现,包含EventLoopScheduler,它将所有操作委托给它+实现Dispose,如上所示。 最重要的是,您可以在Schedule方法中实现防护,以防止在调用Dispose后调度操作(例如,如果您忘记取消订阅某些观察者)。

部分解决了。 案件比较复杂,然后在这里显示。 链是这样的:

var published = someSubject.ObserveOn(m_eventLoopScheduler).SomeMoreRXFunctions()。Publish();

IDisposable disposable1 = published.Connect();

IDisposable disposable2 = published.Subscribe((something)=> something);

如果我同时处理了disposable1和disposable2,则SomeMoreRXFunctions()中的代码不再执行。 另一方面,尝试处理调度程序本身仍然会抛出相同的exception。

不幸的是,我无法用更简单的代码重现这个问题。 这可能表明我还缺少一些东西。

这是一个我们可以忍受的解决方案,但我仍然希望找到更好的方法,一次性关闭调度程序,没有exception的机会。

上述就是C#学习教程:抛出EventLoopScheduler后RX2.0:ObjectDisposedException分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注&#8212;编程笔记


推荐阅读
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • C# 7.0 新特性:基于Tuple的“多”返回值方法
    本文介绍了C# 7.0中基于Tuple的“多”返回值方法的使用。通过对C# 6.0及更早版本的做法进行回顾,提出了问题:如何使一个方法可返回多个返回值。然后详细介绍了C# 7.0中使用Tuple的写法,并给出了示例代码。最后,总结了该新特性的优点。 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • 本文介绍了使用cacti监控mssql 2005运行资源情况的操作步骤,包括安装必要的工具和驱动,测试mssql的连接,配置监控脚本等。通过php连接mssql来获取SQL 2005性能计算器的值,实现对mssql的监控。详细的操作步骤和代码请参考附件。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 本文介绍了如何将CIM_DateTime解析为.Net DateTime,并分享了解析过程中可能遇到的问题和解决方法。通过使用DateTime.ParseExact方法和适当的格式字符串,可以成功解析CIM_DateTime字符串。同时还提供了关于WMI和字符串格式的相关信息。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • 本文介绍了RPC框架Thrift的安装环境变量配置与第一个实例,讲解了RPC的概念以及如何解决跨语言、c++客户端、web服务端、远程调用等需求。Thrift开发方便上手快,性能和稳定性也不错,适合初学者学习和使用。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 本文介绍了深入浅出Linux设备驱动编程的重要性,以及两种加载和删除Linux内核模块的方法。通过一个内核模块的例子,展示了模块的编译和加载过程,并讨论了模块对内核大小的控制。深入理解Linux设备驱动编程对于开发者来说非常重要。 ... [详细]
author-avatar
mobiledu2502858407
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有