和ReactiveCocoa的实现类似,RxSwift也是通过不停地订阅上游的Observable来实现数据的流动。
Rx操作大体分为两种:
- 创建: create、just、of、from等
- 处理: map、flatMap、do等
接下来通过下面的操作来简单分析下代码执行过程:
1 | let _ = Observable.just(1) |
初探
以下是执行过程中创建实例的过程:
- just操作创建Just实例(Just类是一个Observable)
- map操作创建Map实例,Map实例保存了上游的Observable,这里是Just(Map类是一个Observable)
- Map实例的subscribe操作创建了AnonymousObserver实例(AnonymousObserver是一个Observer)
以下是执行Map实例执行subscribe后,代码的执行过程:
- AnonymousObserver通过Map实例的subscribeSafe方法订阅了Map实例
- Map实例通过subscribe方法间接调用了自身的run方法
- run方法创建了MapSink实例,MapSink保存了下游的Observer,即AnonymousObserver(MapSink是一个Observer);同时run方法让MapSink订阅Map实例保存的上游Observable,即Just。
- Just执行subscribe方法,在其中直接调用
observer.on(.next(_element))
向下游的Observer,即MapSink发送消息 - MapSink接收到消息进行处理,然后向下游的Observer发送消息,即AnonymousObserver
- AnonymousObserver执行最终处理
上面就是Rx操作执行过程的全部内容,可以总结两点:
- 创建操作的subscribe方法会直接向下游Observer发送消息
- 处理操作一般会创建两个实例,一个是Observable,一个是Observer。Observable用来保存上游Observable并且让下游Observer可以进行订阅,而Observer则用来保存下游的Observer以及订阅上游的Observable
接下来结合代码分析下实现。
1 | public func subscribe(file: String = #file, line: UInt = #line, function: String = #function, onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) |
Observable可以调用subscribe方法来设置对应的回调。其内部实现是创建一个匿名的Observer,然后
让这个匿名Observer订阅Observable。
1 | class Producer<Element> : Observable<Element> { |
Producer是Just和Map的父类,同时也是一个Observable。通过调用subscribe方法来设置对应的Observer。
1 | class Map<SourceType, ResultType>: Producer<ResultType> { |
其中的_source表示上游的Observable。run方法的实现如下:
1 | override func run<O: ObserverType>(_ observer: O) -> Disposable where O.E == ResultType { |
先是创建了MapSink,并保存了下游的Observer,然后让sink去订阅上游的Observable。
MapSink的实现如下:
1 | class MapSink<SourceType, O : ObserverType> : Sink<O>, ObserverType { |
MapSink继承自Sink,Sink的实现如下:
1 | class Sink<O : ObserverType> : SingleAssignmentDisposable { |
一旦上游调用了Observer的on方法,Observer会调用保存的下游Observer的on方法,从而触发一个链式调用。
Just的实现如下:
1 | class Just<Element> : Producer<Element> { |
很明显,Just是流的源头,所以它直接重载了subscribe方法,通过主动调用Observer的on方法,让数据能向下游流动。
总结
从just到subscribe,方法的调用方向大致如下:
首先是通过.
的方法调用,期间创建了各类Observable。
直到外部调用了subscribe,即订阅了Observable,在(1)中创建的Observable开始依次调用subscribe,期间创建了各类Observer。
最后subscribe到达源头,源头调用Observer的on方法,在(2)中创建的Observer开始依次调用on,最终把结果输出到subscribe回调中。