Obserable과 함께 동작하는 유용한 도우미 연산자들 - Delay, Do, Materialize / Dematerialize, ObserveOn, subscribe, SubscribeOn, Timeout, Using
안녕하세요 ◠‿◠ 고고입니다.
1. Delay
- Delay — Observable의 배출을 특정 시간동안 미룬다
- Next 이벤트가 전달되는 시점을 지연시킨다
- 구독시점을 지연시키지는 않는다
구현부
public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}
예시
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(6)
.debug()
.delay(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print($0) }
.disposed(by: bag)
----- RESULT -----
2020-01-08 16:16:00.366: delay.playground:36 (__lldb_expr_9) -> subscribed
2020-01-08 16:16:01.379: delay.playground:36 (__lldb_expr_9) -> Event next(0)
2020-01-08 16:16:02.380: delay.playground:36 (__lldb_expr_9) -> Event next(1)
2020-01-08 16:16:03.379: delay.playground:36 (__lldb_expr_9) -> Event next(2)
2020-01-08 16:16:04.379: delay.playground:36 (__lldb_expr_9) -> Event next(3)
next(0)
2020-01-08 16:16:05.379: delay.playground:36 (__lldb_expr_9) -> Event next(4)
next(1)
2020-01-08 16:16:06.379: delay.playground:36 (__lldb_expr_9) -> Event next(5)
2020-01-08 16:16:06.379: delay.playground:36 (__lldb_expr_9) -> Event completed
2020-01-08 16:16:06.379: delay.playground:36 (__lldb_expr_9) -> isDisposed
next(2)
next(3)
next(4)
next(5)
completed
- delaySubscription
- 구독이 시작되는 시점을 지연시킨다
- Next 이벤트는 지연시키지 않고 바로 전달한다
구현부
public func delaySubscription(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
DelaySubscription(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}
예시
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(6)
.debug()
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print($0) }
.disposed(by: bag)
----- RESULT -----
2020-01-08 16:18:14.426: delaySubscription.playground:36 (__lldb_expr_11) -> subscribed
2020-01-08 16:18:15.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(0)
next(0)
2020-01-08 16:18:16.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(1)
next(1)
2020-01-08 16:18:17.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(2)
next(2)
2020-01-08 16:18:18.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(3)
next(3)
2020-01-08 16:18:19.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(4)
next(4)
2020-01-08 16:18:20.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(5)
next(5)
2020-01-08 16:18:20.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event completed
completed
2020-01-08 16:18:20.429: delaySubscription.playground:36 (__lldb_expr_11) -> isDisposed
Delay, DelaySubsciption 예시 출처 : https://kyungmosung.github.io/2020/01/09/rxswift-conditional-timebased-operators/
2. Do
- Do — Observable의 생명주기 동안 발생하는 여러 이벤트에서 실행 될 액션을 등록한다
구현부
public func `do`(onNext: ((Element) throws -> Void)? = nil, afterNext: ((Element) throws -> Void)? = nil, onError: ((Swift.Error) throws -> Void)? = nil, afterError: ((Swift.Error) throws -> Void)? = nil, onCompleted: (() throws -> Void)? = nil, afterCompleted: (() throws -> Void)? = nil, onSubscribe: (() -> Void)? = nil, onSubscribed: (() -> Void)? = nil, onDispose: (() -> Void)? = nil)
-> Observable<Element> {
return Do(source: self.asObservable(), eventHandler: { e in
switch e {
case .next(let element):
try onNext?(element)
case .error(let e):
try onError?(e)
case .completed:
try onCompleted?()
}
}, afterEventHandler: { e in
switch e {
case .next(let element):
try afterNext?(element)
case .error(let e):
try afterError?(e)
case .completed:
try afterCompleted?()
}
}, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose)
}
예시
let source = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
source.do(onNext: { event in
print("doOn next: \(event)")
}).subscribe {
print($0)
}
// doOn next: 0
// subscribe: next(0)
// doOn next: 1
// subscribe: next(1)
// doOn next: 2
// subscribe: next(2)
3. Materialize, Dematerialize
- Materialize/Dematerialize — 배출된 항목이 어떤 알림을 통해 옵저버에게 전달 됐는지를 표현하며, 그 반대 과정을 수행할 수 있다
- RxSwift 에서 쓰이는 Event Sequence를 제어할 수 있게 해준다.
- 보통 Materialize / Dematerialize 는 함께 사용한다.
- Observable을 분해할 수 있기 때문에 신중하게 사용해야 한다.
Materialize
- Sequence 를 Event<Element> Sequence로 변환한다.
구현부
public func materialize() -> Observable<Event<Element>> {
Materialize(source: self.asObservable())
}
Dematerialize
- Event<Element> Sequence 를 다시 Sequence로 변환한다.
구현부
public func dematerialize() -> Observable<Element.Element> {
Dematerialize(source: self.asObservable())
}
예시
let observable = Observable<Int>
.create { observer -> Disposable in
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onError(NSError(domain: "", code: 100, userInfo: nil))
observer.onError(NSError(domain: "", code: 200, userInfo: nil))
return Disposables.create { }
}
observable
.materialize()
.map { event -> Event<Int> in
switch event {
case .error:
return .next(999)
default:
return event
}
}
.dematerialize()
.subscribe { print($0) }
// next(1)
// next(2)
// next(3)
// next(999)
// completed
4. ObseveOn
- ObserveOn — 옵저버가 어느 스케줄러 상에서 Observable을 관찰할지 명시한다
구현부
public func observe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
guard let serialScheduler = scheduler as? SerialDispatchQueueScheduler else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
return ObserveOnSerialDispatchQueue(source: self.asObservable(),
scheduler: serialScheduler)
}
예시
let array = ["rabbit","fox"]
Observable.from(array).observe(on: MainScheduler.instance)
.do(onNext: { _ in
print("doOnNext \(Thread.isMainThread)")
}).observe(on: ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { _ in
print("subscribeNext \(Thread.isMainThread)")
})
// doOnNext true
// doOnNext true
// subscribeNext false
// subscribeNext false
5. Subscribe
- Subscribe — Observable이 배출하는 항목과 알림을 기반으로 동작한다
Observable의 방출 및 알림에 따라 작동
subscribe 연산자는 Observable에 observer를 연결하는 접착제이다. observer가 Observable에서 내보내는 항목을 보거나 Observable에서 오류 또는 완료된 알림을 수신하려면 먼저 이 연산자를 사용하여 해당 Observable을 구독(subscribe)해야 합니다.
Subscribe 연산자의 일반적인 구현은 1~3개의 메서드(이후 observer를 구성함)를 허용하거나 다음 세 가지 메서드를 포함하는 인터페이스를 구현 하는 객체(또는 Observer 또는 Subscriber 라고도 함)를 허용할 수 있습니다 .
onNext
Observable은 Observable이 항목을 방출할 때마다 이 메서드를 호출합니다. 이 메소드는 Observable이 내보낸 것을 매개변수로 사용합니다.
onError
Observable은 이 메서드를 호출하여 예상 데이터 생성에 실패했거나 다른 오류가 발생했음을 나타냅니다. 이것은 Observable을 중지하고 더 이상 onNext 또는 onCompleted를 호출하지 않습니다. 이 onError메서드는 오류를 일으킨 원인에 대한 표시(때로는 Exception 또는 Throwable과 같은 객체, 구현에 따라 다른 경우에는 단순 문자열)를 매개변수로 사용합니다.
onCompleted
Observable은 오류가 발생하지 않은 경우 마지막으로 onNext를 호출한 후 이 메서드를 호출합니다.
Observable은 관찰자가 구독할 때까지 항목을 방출하기 시작하지 않으면 Cold Observable이라고 합니다. Observable이 언제든지 항목을 방출하기 시작할 수 있고 구독자가 구독 시점까지 이전에 방출된 항목을 놓치면서 시작 후 어느 시점에서 방출된 항목의 순서를 관찰하기 시작할 수 있는 경우 Observable을 Hot Observable이라고 합니다.
더보기
구현부
public func subscribe(
onNext: ((Element) -> Void)? = nil,
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
예시
let numbers = [1,2,3,4,5]
let source = Observable.from(numbers)
source.subscribe {
print($0)
}
// next(1)
// next(2)
// next(3)
// next(4)
// next(5)
// completed
6. SubscribeOn
- SubscribeOn — Observable을 구독할 때 사용할 스케줄러를 명시한다
구현부
public func subscribe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
SubscribeOn(source: self, scheduler: scheduler)
}
예시
let array = ["rabbit","fox"]
Observable.from(array)
.do(onNext: { _ in
print("doOnNext \(Thread.isMainThread)")
})
.observe(on: ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { _ in
print("subscribeNext \(Thread.isMainThread)")
}).subscribe(on: MainScheduler.instance)
// doOnNext true
// doOnNext true
// subscribeNext false
// subscribeNext false
- ObserveOn vs SubscribeOn
- observeOn은 특정 작업의 스케쥴러를 변경할 수 있어 여러번 사용하고, subscribeOn은 Observable이 동작하는 스케쥴러를 바꾸기 때문에 가급적 한번만 사용하는 것이 좋습니다.
예시
let source = Observable<String>.create { observer in
for count in 1...3 {
print("observable \(Thread.isMainThread)")
observer.on(.next("\(count)"))
}
observer.on(.completed)
return Disposables.create {
print("dispose")
}
}
source
.observe(on: MainScheduler.instance)
.map{ (intValue) -> String in
print("map \(Thread.isMainThread)")
return "\(intValue)"
}
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))
.do(onNext: { event in
print("doOn next \(Thread.isMainThread)")
})
.observe(on: ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe{ event in
print("subscribe next \(Thread.isMainThread)")
}
// - obsableble 은 subscribeOn에 의해 background thread 에서 동작한다.
// observable false
// observable false
// observable false
// dispose
// - map 은 observeOn 에 의해 main thread 에서 이벤트를 가공한다. -> doOn 에 체인
// map true
// doOn next true
// map true
// doOn next true
// map true
// doOn next true
// - subscribe 는 observeOn에 의해 background thread 에서 이벤트를 수신한다.
// subscribe false next(1)
// subscribe false next(2)
// subscribe false next(3)
// subscribe false completed
7. Timeout
- Timeout — 소스 Obvservable을 그대로 전달하지만, 대신 특정 시간 동안 배출된 항목이 없으면 오류 알림을 보낸다
구현부
public func timeout(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
return Timeout(source: self.asObservable(), dueTime: dueTime, other: Observable.error(RxError.timeout), scheduler: scheduler)
}
예시
let source = Observable<Int>
.interval(0.5, scheduler: MainScheduler.instance)
.filter { $0 < 2 }
source
.timeout(1, scheduler: MainScheduler.instance)
.do(onNext: { item in
print(item)
}, onError: { _ in
print("error")
}, onCompleted: nil)
.subscribe(onNext: {
print($0)
})
// 0
// 0
// 1
// 1
// error
8. Using
- Using — 소스 Observable과 동일한 생명주기를 갖는 Observable을 생성하는데, 이 Observable은 생명주기가 완료되면 리소스를 종료하고 반환한다
- observableFactory: resouce에 연결된 Observable 을 반환하는 팩토리 함수
- resourceFactory: 리소스 객체를 얻기위한 팩토리 함수
구현부
public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
}
예시
class ResouceDisposable: Disposable {
func dispose() {
print("dispose!")
}
}
Observable.using({ () -> ResouceDisposable in
return ResouceDisposable()
}) { disposable in
return Observable<Int>.interval(1, scheduler: MainScheduler.instance)
}.debug()
.take(3)
.subscribe()
// (usingTest()) -> subscribed
// (usingTest()) -> Event next(0)
// (usingTest()) -> Event next(1)
// (usingTest()) -> Event next(2)
// (usingTest()) -> isDisposed
// dispose!
예시 참고 : https://brunch.co.kr/@tilltue/11
출처 : http://reactivex.io/documentation/ko/operators.html
'RxSwift' 카테고리의 다른 글
[RxSwift] Operators - 수학과 집계(8) (0) | 2021.11.29 |
---|---|
[RxSwift] Operators - 조건과 Boolean(7) (0) | 2021.11.29 |
[RxSwift] Operators - 오류 처리(5) (0) | 2021.11.20 |
[RxSwift] Operators - 결합(4) (0) | 2021.11.20 |
[RxSwift] Operators - 필터링(3) (0) | 2021.11.19 |
댓글