tripleCC的技术博客

ʕ•̫͡•ʔ-̫͡-ʕ•͓͡•ʔ-̫͡-ʕ•̫͡•ʔ-̫͡-ʕ•͓͡•ʔ-̫͡-ʔ

ReactiveX中Using操作的应用

create a disposable resource that has the same lifespan as the Observable,即创建一个和Observable具有相同生命周期的disposable资源。
这是ReactiveX对于Using的描述。

可以看出,当一个ObserverA订阅Using返回的Observable时,Using会使用调用者传入的Resource工厂方法[resourceFactory]创建对应的资源,并且使用Observable工厂方法[observableFactory]创建ObserverA实际上想要订阅的Observable。当ObserverA终止时,对应的Resource也会被释放[dispose]。

下面是一个简单的例子(以下的代码都基于RxSwift):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MyDisposables: Disposable {
    func dispose() {
        print("dispose")
    }
}

......

let _ = Observable
    .using({ () -> MyDisposables in
        return MyDisposables()
    }) { _ in
        return Observable<Int>
            .interval(1, scheduler: MainScheduler.instance)
            .take(5)
    }
    .subscribe(onNext: {
        print($0)
    })

代码段对应的输出:

1
2
3
4
5
6
0
1
2
3
4
dispose

可以看到,当AnonymousObserver[匿名观察者]订阅using返回的Observable时,using内部创建了定期输出Int值的ObservableA,以及资源MyDisposables。在发送5个消息之后,ObservableA被终止,与此同时,MyDisposables资源被using释放。

理解起来还是比较简单的,但是在什么场景中会使用到这个操作呢?


监听Obervable

先看下RxSwift官方Demo中的一段关于GitHub登陆的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
let signingIn = ActivityIndicator()
self.signingIn = signingIn.asObservable()

let usernameAndPassword = Observable.combineLatest(input.username, input.password) { ($0, $1) }

signedIn = input.loginTaps.withLatestFrom(usernameAndPassword)
    .flatMapLatest { (username, password) in
        return API.signup(username, password: password)
            .observeOn(MainScheduler.instance)
            .catchErrorJustReturn(false)
            .trackActivity(signingIn)
    }
    .flatMapLatest { loggedIn -> Observable<Bool> in
        let message = loggedIn ? "Mock: Signed in to GitHub." : "Mock: Sign in to GitHub failed"
        return wireframe.promptFor(message, cancelAction: "OK", actions: [])
            // propagate original value
            .map { _ in
                loggedIn
            }
    }
    .shareReplay(1)

signingIn是当前是否正在登陆Observable;signedIn是当前登陆动作Observable。
signedIn体现的事件流如下:

  • 按下登陆按钮
  • 使用当前用户名及密码进行登陆
  • 展示登陆结果

其中涉及到的Rx相关操作(详细图示):

  • combineLatest: 合并最后的username和password,形成一个新的Observable
  • withLatestFrom: 形成一个以loginTaps发送事件时间为采样时间点,发送usernameAndPassword内容的Observable

困惑点

接下来当时比较困扰我的一个点:这段代码是如何做到监听当前是否正在登陆的?
其中涉及到记录开始登陆的操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
......

API.signup(username, password: password)
.observeOn(MainScheduler.instance)
.catchErrorJustReturn(false)
.trackActivity(signingIn)

......

public extension ObservableConvertibleType {
    public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<E> {
        return activityIndicator.trackActivityOfObservable(self)
    }
}

重点关注.trackActivity(signingIn)这个调用。当时我的困惑是这样的:

  • .trackActivity(signingIn)是在signup(username, password: password)后调用的,也就是说登陆事件已经结束了,程序才开始监听登陆动作?(这个理解是错误的)

上面的假设当然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?
最直白的想法应该就是下面三步:

  • 设置当前状态为正在执行登陆
  • 执行登陆操作
  • 设置当前状态为没有执行登陆

那么问题来了。首先,signup(username, password: password)生成了登陆动作Observable,当有Observer订阅这个Observable时,Observable就会执行登陆操作,并发送对应的结果。这就造成了.trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样做的话,刚好切合了上面的那个假设。所以.trackActivity(signingIn)应该做到以下几件事情:

  • A1、保留登陆动作ObservableA,返回自定义的一个ObservableB
  • A2、当外部Observer订阅ObservableB时,设置当前状态为正在执行登陆
  • A3、设置当前状态为正在执行登陆,然后让外部的Observer重新订阅到ObservableA
  • A4、登陆操作执行完毕后,设置当前状态为没有执行登陆

解惑

下面时signingIn所属类ActivityIndicator的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ActivityIndicator : DriverConvertibleType {
    public typealias E = Bool

    private let _lock = NSRecursiveLock()
    private let _variable = Variable(0)
    private let _loading: Driver<Bool>

    public init() {
        _loading = _variable.asDriver()
            .map { $0 > 0 }
            .distinctUntilChanged()
    }

    fileprivate func trackActivityOfObservable<O: ObservableConvertibleType>(_ source: O) -> Observable<O.E> {
        return Observable.using({ () -> ActivityToken<O.E> in
            self.increment()
            return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
        }) { t in
            return t.asObservable()
        }
    }

    private func increment() {
        _lock.lock()
        _variable.value = _variable.value + 1
        _lock.unlock()
    }

    private func decrement() {
        _lock.lock()
        _variable.value = _variable.value - 1
        _lock.unlock()
    }

    public func asDriver() -> Driver<E> {
        return _loading
    }
}

先看下_variable对应的Variable类型。
Variable实际上是BehaviorSubject的一层包装,不同的是它只暴露数据,不会被终止或者失败。
BehaviorSubject会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收BehaviorSubject随后发送的所有数据。
下面是一个Variable的例子:

1
2
3
4
5
6
7
8
let v = Variable(0)
v.asObservable()
    .subscribe(onNext: {
        print($0)
    })

v.value = 1
v.value = 2

代码段对应的输出:

1
2
3
0
1
2

现在回过头来看下_variable_loading这两个属性。
_loading在ActivityIndicator的初始化方法中的赋值如下:

1
2
3
_loading = _variable.asDriver()
  .map { $0 > 0 }
  .distinctUntilChanged()

其中_variable的初始值为0。所以这部分的逻辑很容易理解:_loading通过_variable发送的值是否大于0来判断当前是否在执行动作,并且通过increment、decrement方法来设置_variable发送的值(改变当前正在执行的动作数)。

重点还是在trackActivityOfObservable方法:

1
2
3
4
5
6
7
8
fileprivate func trackActivityOfObservable<O: ObservableConvertibleType>(_ source: O) -> Observable<O.E> {
    return Observable.using({ () -> ActivityToken<O.E> in
        self.increment()
        return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
    }) { t in
        return t.asObservable()
    }
}

其中对应的resourceFactory:

1
2
3
4
{ () -> ActivityToken<O.E> in
        self.increment()
        return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
    }

observableFactory:

1
2
3
{ t in
        return t.asObservable()
    }

ActivityToken的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private struct ActivityToken<E> : ObservableConvertibleType, Disposable {
    private let _source: Observable<E>
    private let _dispose: Cancelable

    init(source: Observable<E>, disposeAction: @escaping () -> ()) {
        _source = source
        _dispose = Disposables.create(with: disposeAction)
    }

    func dispose() {
        _dispose.dispose()
    }

    func asObservable() -> Observable<E> {
        return _source
    }
}

可以看到,ActivityToken就是一个保存了当前需要监听的Observable的资源。
当外部Observer订阅trackActivityOfObservable返回的ObservableB时,using调用resourceFactory做了以下操作:

  • 增加当前正在执行的动作数
  • 使用ActivityToken保存需要监听的ObservableA,并且在ActivityToken释放时,恢复当前正在执行的动作数

接下来在调用observableFactory时,using把在resourceFactory中保存的ObservableA重新暴露给Observer。
通过这种方式,就能在ObservableA发送数据之前,执行额外的操作self.increment(),也就是上面.trackActivity(signingIn)应该做到的A2。并且因为using会在observableFactory返回的ObservableA终止时释放resourceFactory创建的资源,所以当ObservableA终止时,会执行self.decrement,也就是A4。
嗯,目前为止,上面的疑惑算是解决了。
总结一下,就是通过using操作hold主需要监听的Observable,然后在执行了想要的额外动作后,重新暴露Observable给外部的Observer。


using内部实现

最后,研究下using的内部实现:

1
2
3
public static func using<R: Disposable>(_ resourceFactory: @escaping () throws -> R, observableFactory: @escaping (R) throws -> Observable<E>) -> Observable<E> {
        return Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
    }

using实际上返回的是一个Using类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Using<SourceType, ResourceType: Disposable>: Producer<SourceType> {

    typealias E = SourceType

    typealias ResourceFactory = () throws -> ResourceType
    typealias ObservableFactory = (ResourceType) throws -> Observable<SourceType>

    fileprivate let _resourceFactory: ResourceFactory
    fileprivate let _observableFactory: ObservableFactory


    init(resourceFactory: @escaping ResourceFactory, observableFactory: @escaping ObservableFactory) {
        _resourceFactory = resourceFactory
        _observableFactory = observableFactory
    }

    override func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
        let sink = UsingSink(parent: self, observer: observer)
        sink.disposable = sink.run()
        return sink
    }
}

Using为Producer的子类,并且重载了run方法。
再看下Producer的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            return run(observer)
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                return self.run(observer)
            }
        }
    }

    func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        abstractMethod()
    }
}

Producer调用subscribe时,会调用子类的run,并传入当前的Oberver。回到Using的实现,Producer的run方法中创建了UsingSink实例,并调用它的run方法。那么来看下最关键的UsingSink:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class UsingSink<SourceType, ResourceType: Disposable, O: ObserverType> : Sink<O>, ObserverType where O.E == SourceType {

    typealias Parent = Using<SourceType, ResourceType>
    typealias E = O.E

    private let _parent: Parent

    init(parent: Parent, observer: O) {
        _parent = parent
        super.init(observer: observer)
    }

    func run() -> Disposable {
        var disposable = Disposables.create()

        do {
            let resource = try _parent._resourceFactory()
            disposable = resource
            let source = try _parent._observableFactory(resource)

            return Disposables.create(
                source.subscribe(self),
                disposable
            )
        } catch let error {
            return Disposables.create(
                Observable.error(error).subscribe(self),
                disposable
            )
        }
    }

    func on(_ event: Event<E>) {
        switch event {
        case let .next(value):
            forwardOn(.next(value))
        case let .error(error):
            forwardOn(.error(error))
            dispose()
        case .completed:
            forwardOn(.completed)
            dispose()
        }
    }
}

可以看到,在run方法中,UsingSink先是调用_resourceFactory()创建了资源resource,然后以resource为参数调用_observableFactory()来创建想要的Obervable。并且通过Disposables.create(source.subscribe(self),disposable)让resource的生命周期和Obervable一致。
实际上UsingSink只是在run中做了两件特殊的事情:

  • 在让source订阅自身前,创建了resource(一般会在这里做额外的操作)
  • 使用的source不是由上游给的,而是通过_observableFactory创建的(一般的操作比如map、flatMap等,都是由上游给的)

参考

.Net中关于Using的例子
Rx操作图示
官方文档中对于using的说明

Comments