热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:浅谈RxJava源码解析(观察者),创建(createfromjust),变换(MapflatMap)线程调度

篇首语:本文由编程笔记#小编为大家整理,主要介绍了浅谈RxJava源码解析(观察者),创建(createfromjust),变换(MapflatMap)线程调度相关的知识,希望对你有一定的参考价

篇首语:本文由编程笔记#小编为大家整理,主要介绍了浅谈RxJava源码解析(观察者),创建(createfromjust),变换(MapflatMap)线程调度相关的知识,希望对你有一定的参考价值。


一、创建操作:

1、观察者模式:
RxJava的世界里,我们有四种角色:

    Observable(被观察者)、Observer(观察者)
    Subscriber(订阅者)、Subject
    Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

2、回调方法:
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法:




    • onNext(T item)
      Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。






    • onError(Exception ex)
      当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。






    • onComplete
      正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。












3、添加依赖:


  compile ‘io.reactivex:rxandroid:1.1.0‘

  compile ‘io.reactivex:rxjava:1.1.0‘

4、create():

Observable.create(new Observable.OnSubscribe() {


@Override
public void call(Subscriber subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("RxJava");
subscriber.onCompleted();
}
}).subscribe(new Observer() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, "OnNext=" + s);
}
});
Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的,只是多了onStart()方法,作为异步调用之前的操作:


.subscribe(new Subscriber() {
@Override
public void onStart() {
Log.i(TAG, "onStart");
}
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "OnNext=" + o);
}
});
*****************************************************源码解析订阅**************************************

public final Subscription subscribe(Subscriber subscriber) {
return Observable.subscribe(subscriber, this);
}
private static Subscription subscribe(Subscriber subscriber, Observable observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.OnSubscribe== null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that‘s not the appropriate approach
* so I won‘t mention that in the exception
*/
}

// new Subscriber so onStart it
subscriber.onStart();

/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));//捕获异常并回调onError()
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren‘t we throwing the hook‘s return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}

*****************************************************源码解析订阅**************************************

5、from():

String[] arrays={"Hello","RxJava"};
Observable.from(arrays)
.subscribe(new Subscriber() {
@Override
public void onStart() {
Log.i(TAG, "onStart");
}
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "OnNext=" + o);
}
});

*****************************************************源码解析订阅**************************************

public final class{
  ...
  //此处create()就不多做解释了,...call()...
  public final static Observable from(Iterable iterable) {        
    return create(new OnSubscribeFromIterable(iterable));    
  }
  ...
}

public final class OnSubscribeFromIterable implements OnSubscribe {
final Iterable is;
public OnSubscribeFromIterable(Iterable iterable) {
if (iterable == null) {
throw new NullPointerException("iterable must not be null");
}
this.is = iterable;
}
@Override
public void call(final Subscriber o) {
final Iterator it = is.iterator();
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
o.setProducer(new IterableProducer(o, it));//未执行完继续迭代
}
private static final class IterableProducer extends AtomicLong implements Producer {
/** */
private static final long serialVersiOnUID= -8730475647105475802L;
private final Subscriber o;
private final Iterator it;
private IterableProducer(Subscriber o, Iterator it) {
this.o = o;
this.it = it;
}
@Override
public void request(long n) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
fastpath();
} else
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
slowpath(n);
}
}
void slowpath(long n) {
// backpressure is requested
final Subscriber o = this.o;
final Iterator it = this.it;
long r = n;
while (true) {
/*
* This complicated logic is done to avoid touching the
* volatile `requested` value during the loop itself. If
* it is touched during the loop the performance is
* impacted significantly.
*/
long numToEmit = r;
while (true) {if (o.isUnsubscribed()) { return;} else if (it.hasNext()) { if (--numToEmit >= 0) { o.onNext(it.next()); } else break;} else if (!o.isUnsubscribed()) { o.onCompleted(); return;} else { // is unsubscribed return;}
}
r = addAndGet(-r);
if (r == 0L) {// we‘re done emitting the number requested so// returnreturn;
}
}
}
void fastpath() {
// fast-path without backpressure
final Subscriber o = this.o;
final Iterator it = this.it;
while (true) {
if (o.isUnsubscribed()) {return;
} else if (it.hasNext()) {o.onNext(it.next());
} else if (!o.isUnsubscribed()) {o.onCompleted();return;
} else {// is unsubscribedreturn;
}
}
}
}
}

*****************************************************源码解析订阅**************************************

6、just():


Observable.just("Hello","RxJava")

  .subscribe(new Observer() {

    @Override

    public void onCompleted() {

      Log.i("wxl", "onCompleted");

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onNext(String s) {

      Log.i("wxl", "OnNext=" + s);

    }

  });

*****************************************************源码解析订阅**************************************

public final static Observable just(T t1, T t2) { //然而源码解析完之后你会觉得 同上from()
return from(Arrays.asList(t1, t2));
}

*****************************************************源码解析订阅**************************************

二、变换操作:(重点:当然还是要先创建被观察者)
1、Map(): (多输入,单输出
的概念,用代理模式去理解map()方法执行过程,简单说就是Observable和OnSubscribe被新的取代了)


Observable.just("Hello", "RxJava")

  .map(new Func1() {

    @Override

    public String call(String s) {

      return s.toUpperCase();

    }

  }).subscribe(new Subscriber() {

    @Override

    public void onCompleted() {

      Log.i("wxl", "onCompleted");

    }

 

    @Override

    public void onError(Throwable e) {

 

    }

 

    @Override

    public void onNext(String s) {

      Log.i("wxl", "OnNext=" + s);

    }

});

*****************************************************源码解析订阅**************************************

public class Observable {
...
  //订阅,跟上面一样
private static Subscription subscribe(Subscriber subscriber, Observable observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.OnSubscribe== null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that‘s not the appropriate approach
* so I won‘t mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();

/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//发射OnSubscrible中的call();注意,此时已替换了,用代理思维去理解。
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren‘t we throwing the hook‘s return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
public final Observable lift(final Operator operator) {
return new Observable(new OnSubscribe() { //重新创建了新的Observable和OnSubscribe
@Override
public void call(Subscriber o) {
try {Subscriber st = hook.onLift(operator).call(o);//回调替换的部分逻辑try { // new Subscriber created and being subscribed with so ‘onStart‘ it st.onStart(); onSubscribe.call(st);//发射新的} catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e);}
} catch (Throwable e) {Exceptions.throwIfFatal(e);// if the lift function failed all we can do is pass the error to the final Subscriber// as we don‘t have the operator available to uso.onError(e);
}
}
});
}
...
}

*****************************************************源码解析订阅**************************************

2、flatMap():


同上差不多,可以看做是扁平化的一种map(二次转换)

 比较flatMap与map:

假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:
Student[] students = ...;
Subscriber subscriber = new Subscriber() {
  @Override
  public void onNext(Student student) {
    List courses = student.getCourses();
    for (int i = 0; i       Course course = courses.get(i);
   Log.d(tag, course.getName());
   }
  }
...
};
Observable.from(students)
  .subscribe(subscriber);
而flatMap:
Student[] students = ...;
Subscriber subscriber = new Subscriber() {
  @Override
  public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
  .flatMap(new Func1>() {
    @Override
    public Observable call(Student student) {
  return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
3、Filter():

Observable.just(4, 2, 1, 7, 5)
.filter(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer > 3;
}
})
.subscribe(new Observer() {
@Override
public void onCompleted() {
Log.i("wxl", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i("wxl", "OnNext=" + integer);
}
});

***************************************************源码解析****************************************************************

public final Observable filter(Func1 predicate) {
return lift(new OperatorFilter(predicate));
}

public final class OperatorFilter implements Operator {
private final Func1 predicate;
public OperatorFilter(Func1 predicate) {
this.predicate = predicate;
}
@Override
public Subscriber call(final Subscriber child) {
return new Subscriber(child) {
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
try {if (predicate.call(t)) {//判断 child.onNext(t);} else { // TODO consider a more complicated version that batches these request(1);}
} catch (Throwable e) {Exceptions.throwOrReport(e, child, t);
}
}
};
}
}

***************************************************源码解析****************************************************************

4、线程调度:

.subscribeOn(Schedulers.io()):

***************************************************源码解析****************************************************************

public final Observable subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn(scheduler));
}

 

public final class Schedulers {
  //
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;
private static final Schedulers INSTANCE = new Schedulers();
   ....
}

public class OperatorSubscribeOn implements Operator> {
private final Scheduler scheduler;
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Subscriber> call(final Subscriber subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber>(subscriber) {
@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable emitted to onNext
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(final Observable o) {
inner.schedule(new Action0() {//在选择的线程里做处理
@Overridepublic void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber(subscriber) {
@Override public void onCompleted() { subscriber.onCompleted(); }
@Override public void onError(Throwable e) { subscriber.onError(e); }
@Override public void onNext(T t) { subscriber.onNext(t); }
@Override public void setProducer(final Producer producer) { subscriber.setProducer(new Producer() {
@Override public void request(final long n) { if (Thread.currentThread() == t) { // don‘t schedule if we‘re already on the thread (primarily for first setProducer call) // see unit test ‘testSetProducerSynchronousRequest‘ for more context on this producer.request(n); } else { inner.schedule(new Action0() {
@Override public void call() { producer.request(n); } }); } }
}); }
});}
});
}
};
}
}

***************************************************源码解析****************************************************************

.observeOn(AndroidSchedulers.mainThread()):
参考
OperatorObserveOn中源码;
基本思路:最好用代理思维去理解

(被观察者)Observable:(订阅者)OnSubscribe extends Action1 extends Action extends Function

——> 订阅 subscribe(Observer) 回调处理

(观察者)Observer:Subscriber implements Observer

 应用场景还没有,希望多交流多指正!

 








































































































































































































































































































































































































































































































































推荐阅读
  • 本文将从基础概念入手,详细探讨SpringMVC框架中DispatcherServlet如何通过HandlerMapping进行请求分发,以及其背后的源码实现细节。 ... [详细]
  • H5技术实现经典游戏《贪吃蛇》
    本文将分享一个使用HTML5技术实现的经典小游戏——《贪吃蛇》。通过H5技术,我们将探讨如何构建这款游戏的两种主要玩法:积分闯关和无尽模式。 ... [详细]
  • 本文详细介绍了 `org.apache.tinkerpop.gremlin.structure.VertexProperty` 类中的 `key()` 方法,并提供了多个实际应用的代码示例。通过这些示例,读者可以更好地理解该方法在图数据库操作中的具体用途。 ... [详细]
  • 深入解析WebP图片格式及其应用
    随着互联网技术的发展,无论是PC端还是移动端,图片数据流量占据了很大比重。尤其在高分辨率屏幕普及的背景下,如何在保证图片质量的同时减少文件大小,成为了亟待解决的问题。本文将详细介绍Google推出的WebP图片格式,探讨其在实际项目中的应用及优化策略。 ... [详细]
  • 处理Android EditText中数字输入与parseInt方法
    本文探讨了如何在Android应用中从EditText组件安全地获取并解析用户输入的数字,特别是用于设置端口号的情况。通过示例代码和异常处理策略,展示了有效的方法来避免因非法输入导致的应用崩溃。 ... [详细]
  • Docker安全策略与管理
    本文探讨了Docker的安全挑战、核心安全特性及其管理策略,旨在帮助读者深入理解Docker安全机制,并提供实用的安全管理建议。 ... [详细]
  • 在1995年,Simon Plouffe 发现了一种特殊的求和方法来表示某些常数。两年后,Bailey 和 Borwein 在他们的论文中发表了这一发现,这种方法被命名为 Bailey-Borwein-Plouffe (BBP) 公式。该问题要求计算圆周率 π 的第 n 个十六进制数字。 ... [详细]
  • 使用TabActivity实现Android顶部选项卡功能
    本文介绍如何通过继承TabActivity来创建Android应用中的顶部选项卡。通过简单的步骤,您可以轻松地添加多个选项卡,并实现基本的界面切换功能。 ... [详细]
  • 本文介绍了SIP(Session Initiation Protocol,会话发起协议)的基本概念、功能、消息格式及其实现机制。SIP是一种在IP网络上用于建立、管理和终止多媒体通信会话的应用层协议。 ... [详细]
  • Beetl是一款先进的Java模板引擎,以其丰富的功能、直观的语法、卓越的性能和易于维护的特点著称。它不仅适用于高响应需求的大型网站,也适合功能复杂的CMS管理系统,提供了一种全新的模板开发体验。 ... [详细]
  • 问题描述现在,不管开发一个多大的系统(至少我现在的部门是这样的),都会带一个日志功能;在实际开发过程中 ... [详细]
  • 洛谷 P4009 汽车加油行驶问题 解析
    探讨了经典算法题目——汽车加油行驶问题,通过网络流和费用流的视角,深入解析了该问题的解决方案。本文将详细阐述如何利用最短路径算法解决这一问题,并提供详细的代码实现。 ... [详细]
  • 本文详细探讨了在Java中如何将图像对象转换为文件和字节数组(Byte[])的技术。虽然网络上存在大量相关资料,但实际操作时仍需注意细节。本文通过使用JMSL 4.0库中的图表对象作为示例,提供了一种实用的方法。 ... [详细]
  • 本文将详细介绍如何在二进制和十六进制之间进行准确的转换,并提供实际的代码示例来帮助理解这一过程。 ... [详细]
  • 探索百度WebFE团队打造的强大HTML5上传插件Web Uploader
    本文将详细介绍由百度WebFE团队开发的Web Uploader,这是一款集成了HTML5与Flash技术的上传组件,以其卓越的用户体验和强大的功能著称。 ... [详细]
author-avatar
卟抛棄D
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有