热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RxJava源码分析(一)

RxJava官网快速入门,推荐下面大神整理的教程:给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)…..给初学者的RxJava2.0教程(九)Andro

RxJava官网

快速入门,推荐下面大神整理的教程:
给初学者的RxJava2.0教程(一)
给初学者的RxJava2.0教程(二)
…..
给初学者的RxJava2.0教程(九)

AndroidStudio配置:

    implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

implementation 'com.squareup.retrofit2:retrofit:2.3.0'
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
implementation 'com.squareup.okhttp3:okhttp:3.9.1'
implementation 'com.squareup.okhttp3:logging-interceptor:3.9.1'

具体版本信息可以查询:search.maven.org

接下来聊一些源码:
首先看一个最简单的rxjava例子

    //创建被观察者
private Observable createObservable() {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
}
    //创建观察者
private Observer createObserver() {
return new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
disposable = d;
}

@Override
public void onNext(Integer i) {
Log.i(TAG, "onNext: " + i);
}

@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}

@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
};
}
    public void demo1(){
//观察者
Observer observer = createObserver();

//被观察者
Observable observable = createObservable();

//建立连接
observable.subscribe(observer);

}

运行结果:

    /**
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onSubscribe:
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: subscribe:
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onNext: 1
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onNext: 2
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onNext: 3
12-06 14:40:40.207 18982-18982/com.study.rxjavademo I/SimpleDemos: onComplete:
*/

上面的代码可能不太直观,上面的代码可以改成:

    private void demo(){
Observable
.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
disposable = d;
}

@Override
public void onNext(Integer i) {
Log.i(TAG, "onNext: " + i);
}

@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: ");
}

@Override
public void onComplete() {
Log.i(TAG, "onComplete: ");
}
});
}

上面的例子可以看到,被观察者订阅了观察者,观察者通过ObservableOnSubscribe的subscribe(ObservableEmitter e),执行OnNext、onComplete方法

@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}

上面的流程,在源码中是怎么实现的?
下面看一张时序图
这里写图片描述

首先Observable通过create方法传入ObservableOnSubscribe

    @CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static Observable create(ObservableOnSubscribe source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}

然后将ObservableOnSubscribe作为参数,创建ObservableCreate,并返回

public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe source;

public ObservableCreate(ObservableOnSubscribe source) {
this.source = source;
}
.....

由上面的代码可以看出
1.传入的OnservableOnSubscribe保存为source
2.ObservableCreate继承自Observable

所以

        Observable
.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})

返回得到的Observable是ObservableCreate对象。

接下来分析被观察者订阅观察者的过程
被观察者通过subscribe订阅了被观察者
下面分析subscribe源码:

    @SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observersuper T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
...
}
}

通过上面可以看出:
1. Observable(真正是ObservableCreate)通过subscribe,传入observer
2. subscribe代码中最后调用了subscribeActual(observer),即ObservableCreate对象的subscribeActual(observer)方法。

那么接下来的重点就放在ObservableCreate的subscribeActual(observer)方法

    @Override
protected void subscribeActual(Observersuper T> observer) {
CreateEmitter parent = new @Override
protected void subscribeActual(Observersuper T> observer) {
CreateEmitter parent = new CreateEmitter(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

通过代码可以看出
1.首先创建了一个CreateEmitter对象,将观察者传入了CreateEmitter
2.接下来source.subscribe(parent);
通过上面的分析可以知道source就是最开始创建的ObservableOnSubscribe对象
这句代码对应的是:

        Observable
.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//这个方法进行通知观察者,进行一系列操作
Log.i(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})

3.然后通过在ObservableOnSubscribe的subscribe(ObservableEmitter e)方法中调用

                        e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();

onNext做了什么呢?
那就看一下ObservableEmitter的源码

    implements ObservableEmitter<T>, Disposable {

final ObserverT> observer;

CreateEmitter(ObserverT> observer) {
this.observer = observer;
}

@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
...

发现e.onNext,最终就是obser.onNext的方法,从而实现了整个rxjava的流程,即:
这里写图片描述


推荐阅读
author-avatar
mobiledu2502912017
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有