create a disposable resource that has the same lifespan as the Observable
,即创建一个和Observable具有相同生命周期的disposable资源。
这是ReactiveX对于Using的描述。
可以看出,当一个ObserverA订阅Using返回的Observable时,Using会使用调用者传入的Resource工厂方法[resourceFactory]创建对应的资源,并且使用Observable工厂方法[observableFactory]创建ObserverA实际上想要订阅的Observable。当ObserverA终止时,对应的Resource也会被释放[dispose]。
下面是一个简单的例子(以下的代码都基于RxSwift):
1 | class MyDisposables: Disposable { |
代码段对应的输出:
1 | 0 |
可以看到,当AnonymousObserver[匿名观察者]订阅using返回的Observable时,using内部创建了定期输出Int值的ObservableA,以及资源MyDisposables。在发送5个消息之后,ObservableA被终止,与此同时,MyDisposables资源被using释放。
理解起来还是比较简单的,但是在什么场景中会使用到这个操作呢?
监听Obervable
先看下RxSwift官方Demo中的一段关于GitHub登陆的代码:
1 | let signingIn = ActivityIndicator() |
signingIn是当前是否正在登陆Observable;signedIn是当前登陆动作Observable。
signedIn体现的事件流如下:
- 按下登陆按钮
- 使用当前用户名及密码进行登陆
- 展示登陆结果
其中涉及到的Rx相关操作(详细图示):
- combineLatest: 合并最后的username和password,形成一个新的Observable
- withLatestFrom: 形成一个以loginTaps发送事件时间为采样时间点,发送usernameAndPassword内容的Observable
困惑点
接下来当时比较困扰我的一个点:这段代码是如何做到监听当前是否正在登陆的?
其中涉及到记录开始登陆的操作如下:
1 | ...... |
重点关注.trackActivity(signingIn)
这个调用。当时我的困惑是这样的:
.trackActivity(signingIn)
是在signup(username, password: password)
后调用的,也就是说登陆事件已经结束了,程序才开始监听登陆动作?(这个理解是错误的)
上面的假设当然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?
最直白的想法应该就是下面三步:
- 设置当前状态为正在执行登陆
- 执行登陆操作
- 设置当前状态为没有执行登陆
那么问题来了。首先,signup(username, password: password)
生成了登陆动作Observable,当有Observer订阅这个Observable时,Observable就会执行登陆操作,并发送对应的结果。这就造成了.trackActivity(signingIn)
不能直接返回上游传递过来的事件流,因为这样做的话,刚好切合了上面的那个假设。所以.trackActivity(signingIn)
应该做到以下几件事情:
- A1、保留登陆动作ObservableA,返回自定义的一个ObservableB
- A2、当外部Observer订阅ObservableB时,设置当前状态为正在执行登陆
- A3、设置当前状态为正在执行登陆,然后让外部的Observer重新订阅到ObservableA
- A4、登陆操作执行完毕后,设置当前状态为没有执行登陆
解惑
下面时signingIn所属类ActivityIndicator的实现:
1 | public class ActivityIndicator : DriverConvertibleType { |
先看下_variable
对应的Variable类型。
Variable实际上是BehaviorSubject的一层包装,不同的是它只暴露数据,不会被终止或者失败。
BehaviorSubject会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收BehaviorSubject随后发送的所有数据。
下面是一个Variable的例子:
1 | let v = Variable(0) |
代码段对应的输出:
1 | 0 |
现在回过头来看下_variable
、_loading
这两个属性。_loading
在ActivityIndicator的初始化方法中的赋值如下:
1 | _loading = _variable.asDriver() |
其中_variable
的初始值为0。所以这部分的逻辑很容易理解:_loading
通过_variable
发送的值是否大于0来判断当前是否在执行动作,并且通过increment、decrement方法来设置_variable
发送的值(改变当前正在执行的动作数)。
重点还是在trackActivityOfObservable方法:
1 | fileprivate func trackActivityOfObservable<O: ObservableConvertibleType>(_ source: O) -> Observable<O.E> { |
其中对应的resourceFactory:
1 | { () -> ActivityToken<O.E> in |
observableFactory:
1 | { t in |
ActivityToken的实现如下:
1 | private struct ActivityToken<E> : ObservableConvertibleType, Disposable { |
可以看到,ActivityToken就是一个保存了当前需要监听的Observable的资源。
当外部Observer订阅trackActivityOfObservable返回的ObservableB时,using调用resourceFactory做了以下操作:
- 增加当前正在执行的动作数
- 使用ActivityToken保存需要监听的ObservableA,并且在ActivityToken释放时,恢复当前正在执行的动作数
接下来在调用observableFactory时,using把在resourceFactory中保存的ObservableA重新暴露给Observer。
通过这种方式,就能在ObservableA发送数据之前,执行额外的操作self.increment()
,也就是上面.trackActivity(signingIn)
应该做到的A2。并且因为using会在observableFactory返回的ObservableA终止时释放resourceFactory创建的资源,所以当ObservableA终止时,会执行self.decrement
,也就是A4。
嗯,目前为止,上面的疑惑算是解决了。
总结一下,就是通过using操作hold主需要监听的Observable,然后在执行了想要的额外动作后,重新暴露Observable给外部的Observer。
using内部实现
最后,研究下using的内部实现:
1 | public static func using<R: Disposable>(_ resourceFactory: @escaping () throws -> R, observableFactory: @escaping (R) throws -> Observable<E>) -> Observable<E> { |
using实际上返回的是一个Using类:
1 | class Using<SourceType, ResourceType: Disposable>: Producer<SourceType> { |
Using为Producer的子类,并且重载了run方法。
再看下Producer的实现:
1 | class Producer<Element> : Observable<Element> { |
Producer调用subscribe时,会调用子类的run,并传入当前的Oberver。回到Using的实现,Producer的run方法中创建了UsingSink实例,并调用它的run方法。那么来看下最关键的UsingSink:
1 | class UsingSink<SourceType, ResourceType: Disposable, O: ObserverType> : Sink<O>, ObserverType where O.E == SourceType { |
可以看到,在run方法中,UsingSink先是调用_resourceFactory()
创建了资源resource,然后以resource为参数调用_observableFactory()
来创建想要的Obervable。并且通过Disposables.create(source.subscribe(self),disposable)
让resource的生命周期和Obervable一致。
实际上UsingSink只是在run中做了两件特殊的事情:
- 在让source订阅自身前,创建了resource(一般会在这里做额外的操作)
- 使用的source不是由上游给的,而是通过
_observableFactory
创建的(一般的操作比如map、flatMap等,都是由上游给的)