본문 바로가기
RxSwift

[Rxswift] Operators - 연결(9)

by 고고 2021. 12. 14.

좀 더 정확히 제어되는 구독 역학을 가진 전문 Observable들 - Connect, Publish, RefCount, Replay

 

1. Publish

  • Publish — 일반 Observable을 Connectable Observable로 변환한다

구현부

 public func publish() -> ConnectableObservable<Element> {
        self.multicast { PublishSubject() }
    }

 

2. Connect

  • Connect — 구독자가 항목 배출을 시작할 수 있도록 Connectable Observable에게 명령을 내린다

구현부

override func connect() -> Disposable {
        return self.lock.performLocked {
            if let connection = self.connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self.lock, subscription: singleAssignmentDisposable)
            self.connection = connection
            let subscription = self.source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }

 

3. RefCount

  • RefCount — 일반 Observable처럼 동작하는 Connectable Observable을 만든다

구현부

public func refCount() -> Observable<Element> {
        RefCount(source: self)
    }

 

4. Replay

  • Replay — 비록 옵저버가 Observable이 항목 배출을 시작한 후에 구독을 했다 하더라도 배출된 모든 항목들을 볼 수 있도록 한다

구현부

 public func replay(_ bufferSize: Int)
        -> ConnectableObservable<Element> {
        self.multicast { ReplaySubject.create(bufferSize: bufferSize) }
    }

예제

let timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance) 

let replay = timer.replay(2)

_ = replay.connect()

replay.subscribe(onNext: { item in
	print("first subscription \(item)")
}).disposed(by: disposeBag)

replay.delaySubscription(3, scheduler: MainScheduler.instance).subscribe(onNext: { item in
	print("second subscription \(item)")
}).disposed(by: disposeBag)

// first subscription 0
// first subscription 1
// first subscription 2
// second subscription 1
// second subscription 2
// first subscription 3

버퍼로 지정한 크기 2만큼 저장되어 이벤트가 공유된 것을 확인 할 수 있습니다.

 

 

+)

5. Share

let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
    .rx.data(request: URLRequest(url: url))
    
requestObservable.subscribe(onNext: {
    print($0)
})

requestObservable.subscribe(onNext: {
    print($0)
})

위 코드의 결과는 두 번의 요청입니다. 왜냐하면 Observable은 subscribe될 때마다 create 클로저를 호출해 Observable을 생성하기 때문입니다.

 

 

.share()을 추가하였습니다.

let url = URL(string: "https://habrahabr.ru/")!
let requestObservable = URLSession.shared
    .rx.data(request: URLRequest(url: url))
    .share()
requestObservable.subscribe(onNext: {
    print($0)
})
requestObservable.subscribe(onNext: {
    print($0)
})

결과는 단 한번의 요청입니다.

share()는 subscribe가 처음 호출될 때에만 Subscription을 생성하고, 이후 두번째 새번째 subscribe에서는 새로운 Subscription을 생성하지 않고 이미 만들어진 Subscription를 공유해 사용합니다!

 

 

왜 Share가 나오지? 싶겠지만 share() == publish().refCount()이기 때문입니다.

 

publish() 연산자가 적용되면 Observable은 ConnectableObservable이 됩니다. ConnectableObservable은 요소를 구독할 때가 아니라 connect() 연산자가 호출될 때에만 요소를 생성하기 시작합니다.

refcount()는 ConnectableObservable의 Subscription count를 계속 세고 있다가 Subscription의 개수가 0 -> 1 개가 되는 시점에 connect()를 수행하고 Subscription이 0이 되면 disconnect()를 수행합니다.

 

connect 예시

let myObservable = Observable.just(1).publish()

myObservable.subscribe(onNext: {
    print("first = \($0)")
})

myObservable.subscribe(onNext: {
    print("second = \($0)")
})

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    print("Calling connect after 3 seconds")
    myObservable.connect()
}

// Calling connect after 3 seconds
// first = 1
// second = 1

 

 

- share(replay: Int, scope: SubjectLifetimeScope)

구현부

public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
        -> Observable<Element> {
        switch scope {
        case .forever:
            switch replay {
            case 0: return self.multicast(PublishSubject()).refCount()
            default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
            }
        case .whileConnected:
            switch replay {
            case 0: return ShareWhileConnected(source: self.asObservable())
            case 1: return ShareReplay1WhileConnected(source: self.asObservable())
            default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
            }
        }
    }

 

 

 

출처 : http://reactivex.io/documentation/ko/operators.html

 

ReactiveX - Operators

연산자 소개 ReactiveX를 지원하는 언어 별 구현체들은 다양한 연산자들을 제공하는데, 이 중에는 공통적으로 제공되는 연산자도 있지만 반대로 특정 구현체에서만 제공하는 연산자들도 존재한다

reactivex.io

 

'RxSwift' 카테고리의 다른 글

[RxSwift] Operator 결정 트리  (0) 2021.12.14
[Rxswift] Operators - To  (0) 2021.12.14
[RxSwift] Operators - 수학과 집계(8)  (0) 2021.11.29
[RxSwift] Operators - 조건과 Boolean(7)  (0) 2021.11.29
[RxSwift] Operators - 유틸리티(6)  (0) 2021.11.29

댓글