RxJava官网
快速入门,推荐下面大神整理的教程:
给初学者的RxJava2.0教程(一)
给初学者的RxJava2.0教程(二)
…..
给初学者的RxJava2.0教程(九)
AndroidStudio配置:
implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.squareup.retrofit2:retrofit:2.3.0'
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
implementation 'com.squareup.okhttp3:okhttp:3.9.1'
implementation 'com.squareup.okhttp3:logging-interceptor:3.9.1'
具体版本信息可以查询:search.maven.org
接下来聊一些源码:
首先看一个最简单的rxjava例子
//创建被观察者
private Observable createObservable() {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
}
//创建观察者
private Observer createObserver() {
return new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
disposable = d;
}
@Override
public void onNext(Integer i) {
Log.i(TAG, "onNext: " + i);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
}
public void demo1(){
//观察者
Observer observer = createObserver();
//被观察者
Observable observable = createObservable();
//建立连接
observable.subscribe(observer);
}
运行结果:
/**
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onSubscribe:
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: subscribe:
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onNext: 1
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onNext: 2
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onNext: 3
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onComplete:
*/
上面的代码可能不太直观,上面的代码可以改成:
private void demo(){
Observable
.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
disposable = d;
}
@Override
public void onNext(Integer i) {
Log.i(TAG, "onNext: " + i);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
}
上面的例子可以看到,被观察者订阅了观察者,观察者通过ObservableOnSubscribe的subscribe(ObservableEmitter e),执行OnNext、onComplete方法
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
上面的流程,在源码中是怎么实现的?
下面看一张时序图
首先Observable通过create方法传入ObservableOnSubscribe
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable create(ObservableOnSubscribe source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}
然后将ObservableOnSubscribe作为参数,创建ObservableCreate,并返回
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe source;
public ObservableCreate(ObservableOnSubscribe source) {
this.source = source;
}
.....
}
由上面的代码可以看出
1.传入的OnservableOnSubscribe保存为source
2.ObservableCreate继承自Observable
所以
Observable
.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
返回得到的Observable是ObservableCreate对象。
接下来分析被观察者订阅观察者的过程
被观察者通过subscribe订阅了被观察者
下面分析subscribe源码:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
...
}
}
通过上面可以看出:
1. Observable(真正是ObservableCreate)通过subscribe,传入observer
2. subscribe代码中最后调用了subscribeActual(observer),即ObservableCreate对象的subscribeActual(observer)方法。
那么接下来的重点就放在ObservableCreate的subscribeActual(observer)方法
@Override
protected void subscribeActual(Observer super T> observer) {
CreateEmitter parent = new @Override
protected void subscribeActual(Observer super T> observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
通过代码可以看出
1.首先创建了一个CreateEmitter对象,将观察者传入了CreateEmitter
2.接下来source.subscribe(parent);
通过上面的分析可以知道source就是最开始创建的ObservableOnSubscribe对象
这句代码对应的是:
Observable
.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
3.然后通过在ObservableOnSubscribe的subscribe(ObservableEmitter e)方法中调用
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
onNext做了什么呢?
那就看一下ObservableEmitter的源码
implements ObservableEmitter<T>, Disposable {
final Observer super T> observer;
CreateEmitter(Observer super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}
发现e.onNext,最终就是obser.onNext的方法,从而实现了整个rxjava的流程,即: