篇首语:本文由编程笔记#小编为大家整理,主要介绍了浅谈RxJava源码解析(观察者),创建(createfromjust),变换(MapflatMap)线程调度相关的知识,希望对你有一定的参考价值。
一、创建操作:
1、观察者模式:
RxJava的世界里,我们有四种角色:
Observable
Subscriber(订阅者)、Subject
Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
2、回调方法:
Subscribe方法用于将观察者连接到Observable,你的观察者需要实现以下方法:
onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。
onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。
onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
3、添加依赖:
Observable.create(new Observable.OnSubscribe
@Override
public void call(Subscriber super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("RxJava");
subscriber.onCompleted();
}
}).subscribe(new Observer
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, "OnNext=" + s);
}
});
Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的,只是多了onStart()方法,作为异步调用之前的操作:
.subscribe(new Subscriber() {
@Override
public void onStart() {
Log.i(TAG, "onStart");
}
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "OnNext=" + o);
}
});
*****************************************************源码解析订阅**************************************
public final Subscription subscribe(Subscriber super T> subscriber) { *****************************************************源码解析订阅************************************** 5、from(): String[] arrays={"Hello","RxJava"}; *****************************************************源码解析订阅************************************** public final class{ public final class OnSubscribeFromIterable *****************************************************源码解析订阅************************************** 6、just(): *****************************************************源码解析订阅************************************** public final static *****************************************************源码解析订阅************************************** 二、变换操作:(重点:当然还是要先创建被观察者) *****************************************************源码解析订阅************************************** public class Observable *****************************************************源码解析订阅************************************** 2、flatMap(): 比较flatMap与map: 假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现: Observable.just(4, 2, 1, 7, 5) ***************************************************源码解析**************************************************************** public final Observable public final class OperatorFilter ***************************************************源码解析**************************************************************** 4、线程调度: .subscribeOn(Schedulers.io()): ***************************************************源码解析**************************************************************** public final Observable public final class Schedulers { public class OperatorSubscribeOn ***************************************************源码解析**************************************************************** .observeOn(AndroidSchedulers.mainThread()): (被观察者)Observable ——> 订阅 subscribe(Observer (观察者)Observer 应用场景还没有,希望多交流多指正!
return Observable.subscribe(subscriber, this);
}
private static
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.OnSubscribe== null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that‘s not the appropriate approach
* so I won‘t mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));//捕获异常并回调onError()
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren‘t we throwing the hook‘s return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
Observable.from(arrays)
.subscribe(new Subscriber() {
@Override
public void onStart() {
Log.i(TAG, "onStart");
}
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "OnNext=" + o);
}
});
...
//此处create()就不多做解释了,...call()...
public final static
return create(new OnSubscribeFromIterable
}
...
}
final Iterable extends T> is;
public OnSubscribeFromIterable(Iterable extends T> iterable) {
if (iterable == null) {
throw new NullPointerException("iterable must not be null");
}
this.is = iterable;
}
@Override
public void call(final Subscriber super T> o) {
final Iterator extends T> it = is.iterator();
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
o.setProducer(new IterableProducer
}
private static final class IterableProducer
/** */
private static final long serialVersiOnUID= -8730475647105475802L;
private final Subscriber super T> o;
private final Iterator extends T> it;
private IterableProducer(Subscriber super T> o, Iterator extends T> it) {
this.o = o;
this.it = it;
}
@Override
public void request(long n) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
fastpath();
} else
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
slowpath(n);
}
}
void slowpath(long n) {
// backpressure is requested
final Subscriber super T> o = this.o;
final Iterator extends T> it = this.it;
long r = n;
while (true) {
/*
* This complicated logic is done to avoid touching the
* volatile `requested` value during the loop itself. If
* it is touched during the loop the performance is
* impacted significantly.
*/
long numToEmit = r;
while (true) {if (o.isUnsubscribed()) { return;} else if (it.hasNext()) { if (--numToEmit >= 0) { o.onNext(it.next()); } else break;} else if (!o.isUnsubscribed()) { o.onCompleted(); return;} else { // is unsubscribed return;}
}
r = addAndGet(-r);
if (r == 0L) {// we‘re done emitting the number requested so// returnreturn;
}
}
}
void fastpath() {
// fast-path without backpressure
final Subscriber super T> o = this.o;
final Iterator extends T> it = this.it;
while (true) {
if (o.isUnsubscribed()) {return;
} else if (it.hasNext()) {o.onNext(it.next());
} else if (!o.isUnsubscribed()) {o.onCompleted();return;
} else {// is unsubscribedreturn;
}
}
}
}
}
return from(Arrays.asList(t1, t2));
}
1、Map(): (多输入,单输出的概念,用代理模式去理解map()方法执行过程,简单说就是Observable和OnSubscribe被新的取代了)
...
//订阅,跟上面一样
private static
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.OnSubscribe== null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that‘s not the appropriate approach
* so I won‘t mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//发射OnSubscrible中的call();注意,此时已替换了,用代理思维去理解。
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren‘t we throwing the hook‘s return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
public final
return new Observable
@Override
public void call(Subscriber super R> o) {
try {Subscriber super T> st = hook.onLift(operator).call(o);//回调替换的部分逻辑try { // new Subscriber created and being subscribed with so ‘onStart‘ it st.onStart(); onSubscribe.call(st);//发射新的} catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e);}
} catch (Throwable e) {Exceptions.throwIfFatal(e);// if the lift function failed all we can do is pass the error to the final Subscriber// as we don‘t have the operator available to uso.onError(e);
}
}
});
}
...
}
同上差不多,可以看做是扁平化的一种map(二次转换)
Student[] students = ...;
Subscriber
@Override
public void onNext(Student student) {
List
for (int i = 0; i
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
而flatMap:
Student[] students = ...;
Subscriber
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1
@Override
public Observable
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
3、Filter():
.filter(new Func1
@Override
public Boolean call(Integer integer) {
return integer > 3;
}
})
.subscribe(new Observer
@Override
public void onCompleted() {
Log.i("wxl", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i("wxl", "OnNext=" + integer);
}
});
return lift(new OperatorFilter
}
private final Func1 super T, Boolean> predicate;
public OperatorFilter(Func1 super T, Boolean> predicate) {
this.predicate = predicate;
}
@Override
public Subscriber super T> call(final Subscriber super T> child) {
return new Subscriber
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
try {if (predicate.call(t)) {//判断 child.onNext(t);} else { // TODO consider a more complicated version that batches these request(1);}
} catch (Throwable e) {Exceptions.throwOrReport(e, child, t);
}
}
};
}
}
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable
}
return nest().lift(new OperatorSubscribeOn
}
//
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;
private static final Schedulers INSTANCE = new Schedulers();
....
}
private final Scheduler scheduler;
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Subscriber super Observable
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber
@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(final Observable
inner.schedule(new Action0() {//在选择的线程里做处理
@Overridepublic void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber
@Override public void onCompleted() { subscriber.onCompleted(); }
@Override public void onError(Throwable e) { subscriber.onError(e); }
@Override public void onNext(T t) { subscriber.onNext(t); }
@Override public void setProducer(final Producer producer) { subscriber.setProducer(new Producer() {
@Override public void request(final long n) { if (Thread.currentThread() == t) { // don‘t schedule if we‘re already on the thread (primarily for first setProducer call) // see unit test ‘testSetProducerSynchronousRequest‘ for more context on this producer.request(n); } else { inner.schedule(new Action0() {
@Override public void call() { producer.request(n); } }); } }
}); }
});}
});
}
};
}
}
参考OperatorObserveOn中源码;
基本思路:最好用代理思维去理解