tripleCC的技术博客

ʕ•̫͡•ʔ-̫͡-ʕ•͓͡•ʔ-̫͡-ʕ•̫͡•ʔ-̫͡-ʕ•͓͡•ʔ-̫͡-ʔ

RxSwift实现初探

和ReactiveCocoa的实现类似,RxSwift也是通过不停地订阅上游的Observable来实现数据的流动。
Rx操作大体分为两种:

  • 创建: create、just、of、from等
  • 处理: map、flatMap、do等

接下来通过下面的操作来简单分析下代码执行过程:

1
2
3
4
5
let _ = Observable.just(1)
    .map { $0 }
    .subscribe(onNext: {
    print($0)
})

以下是执行过程中创建实例的过程:

  • just操作创建Just实例(Just类是一个Observable)
  • map操作创建Map实例,Map实例保存了上游的Observable,这里是Just(Map类是一个Observable)
  • Map实例的subscribe操作创建了AnonymousObserver实例(AnonymousObserver是一个Observer)

以下是执行Map实例执行subscribe后,代码的执行过程:

  • AnonymousObserver通过Map实例的subscribeSafe方法订阅了Map实例
  • Map实例通过subscribe方法间接调用了自身的run方法
  • run方法创建了MapSink实例,MapSink保存了下游的Observer,即AnonymousObserver(MapSink是一个Observer);同时run方法让MapSink订阅Map实例保存的上游Observable,即Just。
  • Just执行subscribe方法,在其中直接调用observer.on(.next(_element))向下游的Observer,即MapSink发送消息
  • MapSink接收到消息进行处理,然后向下游的Observer发送消息,即AnonymousObserver
  • AnonymousObserver执行最终处理

上面就是Rx操作执行过程的全部内容,可以总结两点:

  • 创建操作的subscribe方法会直接向下游Observer发送消息
  • 处理操作一般会创建两个实例,一个是Observable,一个是Observer。Observable用来保存上游Observable并且让下游Observer可以进行订阅,而Observer则用来保存下游的Observer以及订阅上游的Observable

接下来结合代码分析下实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public func subscribe(file: String = #file, line: UInt = #line, function: String = #function, onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
    -> Disposable {

    let disposable: Disposable

    if let disposed = onDisposed {
        disposable = Disposables.create(with: disposed)
    }
    else {
        disposable = Disposables.create()
    }
  // 创建匿名Observer
    let observer = AnonymousObserver<E> { e in
        switch e {
        // 调用对应的回调函数
        case .next(let value):
            onNext?(value)
        case .error(let e):
            if let onError = onError {
                onError(e)
            }
            else {
                print("Received unhandled error: \(file):\(line):\(function) -> \(e)")
            }
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
    }
    return Disposables.create(
        self.subscribeSafe(observer),
        disposable
    )
}

func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
  // 让observer订阅自身
    return self.asObservable().subscribe(observer)
}

Observable可以调用subscribe方法来设置对应的回调。其内部实现是创建一个匿名的Observer,然后 让这个匿名Observer订阅Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
      // 执行子类的run方法,在run方法中,一般会保存传入的Observer
        if !CurrentThreadScheduler.isScheduleRequired {
            return run(observer)
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                return self.run(observer)
            }
        }
    }

    func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        abstractMethod()
    }
}

Producer是Just和Map的父类,同时也是一个Observable。通过调用subscribe方法来设置对应的Observer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Map<SourceType, ResultType>: Producer<ResultType> {
    typealias Selector = (SourceType) throws -> ResultType
  // 上游的Obervable
    private let _source: Observable<SourceType>

    private let _selector: Selector

    init(source: Observable<SourceType>, selector: @escaping Selector) {
        _source = source
        _selector = selector

#if TRACE_RESOURCES
        let _ = AtomicIncrement(&numberOfMapOperators)
#endif
    }

    override func composeMap<R>(_ selector: @escaping (ResultType) throws -> R) -> Observable<R> {
        let originalSelector = _selector
        return Map<SourceType, R>(source: _source, selector: { (s: SourceType) throws -> R in
            let r: ResultType = try originalSelector(s)
            return try selector(r)
        })
    }

    override func run<O: ObserverType>(_ observer: O) -> Disposable where O.E == ResultType {
        let sink = MapSink(selector: _selector, observer: observer)
        sink.disposable = _source.subscribe(sink)
        return sink
    }

    #if TRACE_RESOURCES
    deinit {
        let _ = AtomicDecrement(&numberOfMapOperators)
    }
    #endif
}

其中的_source表示上游的Observable。run方法的实现如下:

1
2
3
4
5
6
7
override func run<O: ObserverType>(_ observer: O) -> Disposable where O.E == ResultType {
  // 创建Observer,并且保存下游的Observer
    let sink = MapSink(selector: _selector, observer: observer)
    // 订阅上游的Observable
    sink.disposable = _source.subscribe(sink)
    return sink
}

先是创建了MapSink,并保存了下游的Observer,然后让sink去订阅上游的Observable。

MapSink的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class MapSink<SourceType, O : ObserverType> : Sink<O>, ObserverType {
    typealias Selector = (SourceType) throws -> ResultType

    typealias ResultType = O.E
    typealias Element = SourceType

    private let _selector: Selector

    init(selector: @escaping Selector, observer: O) {
        _selector = selector
        super.init(observer: observer)
    }
  
  // 这里由上游进行调用,Observable或者Observer都可以
  // 不过在流的源头还是需要Observable手动调用_observer.on
    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                let mappedElement = try _selector(element)
                forwardOn(.next(mappedElement))
            }
            catch let e {
                forwardOn(.error(e))
                dispose()
            }
        case .error(let error):
            forwardOn(.error(error))
            dispose()
        case .completed:
            forwardOn(.completed)
            dispose()
        }
    }
}

MapSink继承自Sink,Sink的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Sink<O : ObserverType> : SingleAssignmentDisposable {
  // 保存的下游Observer
    fileprivate let _observer: O

    init(observer: O) {
#if TRACE_RESOURCES
        let _ = AtomicIncrement(&resourceCount)
#endif
        _observer = observer
    }

    final func forwardOn(_ event: Event<O.E>) {
        if isDisposed {
            return
        }
        // 调用子类的on方法时,会调用Sink的forwardOn方法,从而把事件传递到下游的Observer
        _observer.on(event)
    }

    final func forwarder() -> SinkForward<O> {
        return SinkForward(forward: self)
    }

    deinit {
#if TRACE_RESOURCES
       let _ =  AtomicDecrement(&resourceCount)
#endif
    }
}

一旦上游调用了Observer的on方法,Observer会调用保存的下游Observer的on方法,从而触发一个链式调用。

Just的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Just<Element> : Producer<Element> {
    private let _element: Element

    init(element: Element) {
        _element = element
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.next(_element))
        observer.on(.completed)
        return Disposables.create()
    }
}

很明显,Just是流的源头,所以它直接重载了subscribe方法,通过主动调用Observer的on方法,让数据能向下游流动。


总结

从just到subscribe,方法的调用方向大致如下:

首先是通过.的方法调用,期间创建了各类Observable。
直到外部调用了subscribe,即订阅了Observable,在(1)中创建的Observable开始依次调用subscribe,期间创建了各类Observer。
最后subscribe到达源头,源头调用Observer的on方法,在(2)中创建的Observer开始依次调用on,最终把结果输出到subscribe回调中。


参考

RACSignal的Subscription深入分析

Comments