좀 더 정확히 제어되는 구독 역학을 가진 전문 Observable들 - Connect, Publish, RefCount, Replay
1. Publish
- Publish — 일반 Observable을 Connectable Observable로 변환한다
구현부
public func publish() -> ConnectableObservable<Element> {
self.multicast { PublishSubject() }
}
2. Connect
- Connect — 구독자가 항목 배출을 시작할 수 있도록 Connectable Observable에게 명령을 내린다
구현부
override func connect() -> Disposable {
return self.lock.performLocked {
if let connection = self.connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self.lock, subscription: singleAssignmentDisposable)
self.connection = connection
let subscription = self.source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
3. RefCount
- RefCount — 일반 Observable처럼 동작하는 Connectable Observable을 만든다
구현부
public func refCount() -> Observable<Element> {
RefCount(source: self)
}
4. Replay
- Replay — 비록 옵저버가 Observable이 항목 배출을 시작한 후에 구독을 했다 하더라도 배출된 모든 항목들을 볼 수 있도록 한다
구현부
public func replay(_ bufferSize: Int)
-> ConnectableObservable<Element> {
self.multicast { ReplaySubject.create(bufferSize: bufferSize) }
}
예제
let timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
let replay = timer.replay(2)
_ = replay.connect()
replay.subscribe(onNext: { item in
print("first subscription \(item)")
}).disposed(by: disposeBag)
replay.delaySubscription(3, scheduler: MainScheduler.instance).subscribe(onNext: { item in
print("second subscription \(item)")
}).disposed(by: disposeBag)
// first subscription 0
// first subscription 1
// first subscription 2
// second subscription 1
// second subscription 2
// first subscription 3
버퍼로 지정한 크기 2만큼 저장되어 이벤트가 공유된 것을 확인 할 수 있습니다.
+)
5. Share
let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
.rx.data(request: URLRequest(url: url))
requestObservable.subscribe(onNext: {
print($0)
})
requestObservable.subscribe(onNext: {
print($0)
})
위 코드의 결과는 두 번의 요청입니다. 왜냐하면 Observable은 subscribe될 때마다 create 클로저를 호출해 Observable을 생성하기 때문입니다.
.share()을 추가하였습니다.
let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
.rx.data(request: URLRequest(url: url))
.share()
requestObservable.subscribe(onNext: {
print($0)
})
requestObservable.subscribe(onNext: {
print($0)
})
결과는 단 한번의 요청입니다.
share()는 subscribe가 처음 호출될 때에만 Subscription을 생성하고, 이후 두번째 새번째 subscribe에서는 새로운 Subscription을 생성하지 않고 이미 만들어진 Subscription를 공유해 사용합니다!
왜 Share가 나오지? 싶겠지만 share() == publish().refCount()이기 때문입니다.
publish() 연산자가 적용되면 Observable은 ConnectableObservable이 됩니다. ConnectableObservable은 요소를 구독할 때가 아니라 connect() 연산자가 호출될 때에만 요소를 생성하기 시작합니다.
refcount()는 ConnectableObservable의 Subscription count를 계속 세고 있다가 Subscription의 개수가 0 -> 1 개가 되는 시점에 connect()를 수행하고 Subscription이 0이 되면 disconnect()를 수행합니다.
connect 예시
let myObservable = Observable.just(1).publish()
myObservable.subscribe(onNext: {
print("first = \($0)")
})
myObservable.subscribe(onNext: {
print("second = \($0)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("Calling connect after 3 seconds")
myObservable.connect()
}
// Calling connect after 3 seconds
// first = 1
// second = 1
- share(replay: Int, scope: SubjectLifetimeScope)
구현부
public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
-> Observable<Element> {
switch scope {
case .forever:
switch replay {
case 0: return self.multicast(PublishSubject()).refCount()
default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
}
case .whileConnected:
switch replay {
case 0: return ShareWhileConnected(source: self.asObservable())
case 1: return ShareReplay1WhileConnected(source: self.asObservable())
default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
}
}
}
출처 : http://reactivex.io/documentation/ko/operators.html
'RxSwift' 카테고리의 다른 글
[RxSwift] Operator 결정 트리 (0) | 2021.12.14 |
---|---|
[Rxswift] Operators - To (0) | 2021.12.14 |
[RxSwift] Operators - 수학과 집계(8) (0) | 2021.11.29 |
[RxSwift] Operators - 조건과 Boolean(7) (0) | 2021.11.29 |
[RxSwift] Operators - 유틸리티(6) (0) | 2021.11.29 |
댓글