前言
大家越用RxJava,越觉得它好用,所以不知不觉地发现代码里到处都是RxJava的身影。然而,RxJava也不是银弹,其中仍然有很多问题需要解决。这里,我简单地总结一下自己遇到的一些“坑”,内容上可能会比较松散。
一、考虑主线程的切换
RxJava中一个常用的使用方法是——在其他线程中做处理,然后切换到UI线程中去更新页面。其中,线程切换就是使用了observeOn()。后台下载文件,前台显示下载进度就可以使用这种方式完成。然而,实践发现这其中有坑。如果文件比较大,而下载包的粒度又比较小,这将导致很多通知积压下来,最终导致错误。
这种错误其实也是可以理解的,毕竟MainLooper是根据Message来工作的,Message过多必然会导致一些问题。当然,这还是比较想当然的想法,最终还是需要到源码中一探究竟。ObserveOn的原理在前面关于RxJava的文章已经有过分析,这里还是简单列一下代码。其中的重点还是OperatorObserveOn的内部类——ObserveOnSubscriber。其重要代码片段如下:
/** Observe through individual queue per observer. */ static final class ObserveOnSubscriberextends Subscriber implements Action0 { final Subscriber<&#63; super T> child; final Scheduler.Worker recursiveScheduler; final NotificationLite on; final boolean delayError; final Queue queue; /** The emission threshold that should trigger a replenishing request. */ final int limit; // the status of the current stream volatile boolean finished; final AtomicLong requested = new AtomicLong(); final AtomicLong counter = new AtomicLong(); /** * The single exception if not null, should be written before setting finished (release) and read after * reading finished (acquire). */ Throwable error; /** Remembers how many elements have been emitted before the requests run out. */ long emitted; // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should // not prevent anything downstream from consuming, which will happen if the Subscription is chained public ObserveOnSubscriber(Scheduler scheduler, Subscriber<&#63; super T> child, boolean delayError, int bufferSize) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; this.on = NotificationLite.instance(); int calculatedSize = (bufferSize > 0) &#63; bufferSize : RxRingBuffer.SIZE; // this formula calculates the 75% of the bufferSize, rounded up to the next integer this.limit = calculatedSize - (calculatedSize >> 2); if (UnsafeAccess.isUnsafeAvailable()) { queue = new SpscArrayQueue(calculatedSize); } else { queue = new SpscAtomicArrayQueue(calculatedSize); } // signal that this is an async operator capable of receiving this many request(calculatedSize); } void init() { // don't want this code in the constructor because `this` can escape through the // setProducer call Subscriber<&#63; super T> localChild = child; localChild.setProducer(new Producer() { @Override public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(requested, n); schedule(); } } }); localChild.add(recursiveScheduler); localChild.add(this); } @Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); } @Override public void onCompleted() { if (isUnsubscribed() || finished) { return; } finished = true; schedule(); } @Override public void onError(final Throwable e) { if (isUnsubscribed() || finished) { RxJavaHooks.onError(e); return; } error = e; finished = true; schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } }
关键点就在于这个queue成员,这个队列存放了需要进行发送给下行线程的消息。对于主线程来说,符合其实是比较重的,从消息的生产者和消费者的模式讲,过多过快的消息会导致消息阻塞。甚至,都到不了阻塞的情况,因为queue的大小会有上限,在onNext()
方法中的queue.offer()
可能会产生异常,这取决于queue的实现方式。但无论如何都不可能无限大,所以无法保证绝对不出异常。
解决这个问题的方法其实也很简单,可以在生产者降低消息的产生频率。也可以在消息处理的时候先不进行线程切换,而是通过判断,在必要的时候进行线程切换,比如使用runOnUIThread()
。
二、RxJava避免内存泄漏
RxJava的响应式机制本质上还是回调实现的,因此内存泄漏也是会出现的。倘若不对Subscription进行管理,内存泄漏会非常严重。对于Subscription,其实有几个比较广泛使用的方法,比如RxLifecycle,以及简单的CompositeSubscription。至于它们的使用方法,其实都非常简单,这里就不赘述了。
说到内存泄漏,就谈点题外话,动画也可能导致内存泄漏。其原因仍然是一些回调函数,这些回调函数实现的View变化的功能,但是在被撤销以后,回调函数没有取消掉,同时View可能持有Context信息,从而导致内存泄漏。最近才发现,LoadToastView这个开源库一直存在内存泄漏,其原因正如上文所说。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。