본문 바로가기
RxSwift

[RxSwift] Operators - 유틸리티(6)

by 고고 2021. 11. 29.

Obserable과 함께 동작하는 유용한 도우미 연산자들 - Delay, Do, Materialize / Dematerialize, ObserveOn, subscribe, SubscribeOn, Timeout, Using

안녕하세요 ◠‿◠ 고고입니다.

 

1. Delay

  • Delay — Observable의 배출을 특정 시간동안 미룬다
  • Next 이벤트가 전달되는 시점을 지연시킨다
  • 구독시점을 지연시키지는 않는다

구현부

public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
            return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
    }

예시

Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(6)
    .debug()
    .delay(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print($0) }
    .disposed(by: bag)


----- RESULT -----
2020-01-08 16:16:00.366: delay.playground:36 (__lldb_expr_9) -> subscribed
2020-01-08 16:16:01.379: delay.playground:36 (__lldb_expr_9) -> Event next(0)
2020-01-08 16:16:02.380: delay.playground:36 (__lldb_expr_9) -> Event next(1)
2020-01-08 16:16:03.379: delay.playground:36 (__lldb_expr_9) -> Event next(2)
2020-01-08 16:16:04.379: delay.playground:36 (__lldb_expr_9) -> Event next(3)
next(0)
2020-01-08 16:16:05.379: delay.playground:36 (__lldb_expr_9) -> Event next(4)
next(1)
2020-01-08 16:16:06.379: delay.playground:36 (__lldb_expr_9) -> Event next(5)
2020-01-08 16:16:06.379: delay.playground:36 (__lldb_expr_9) -> Event completed
2020-01-08 16:16:06.379: delay.playground:36 (__lldb_expr_9) -> isDisposed
next(2)
next(3)
next(4)
next(5)
completed

 

- delaySubscription

  • 구독이 시작되는 시점을 지연시킨다
  • Next 이벤트는 지연시키지 않고 바로 전달한다

구현부

public func delaySubscription(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
        DelaySubscription(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
    }

예시

Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(6)
    .debug()
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print($0) }
    .disposed(by: bag)


----- RESULT -----
2020-01-08 16:18:14.426: delaySubscription.playground:36 (__lldb_expr_11) -> subscribed
2020-01-08 16:18:15.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(0)
next(0)
2020-01-08 16:18:16.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(1)
next(1)
2020-01-08 16:18:17.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(2)
next(2)
2020-01-08 16:18:18.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(3)
next(3)
2020-01-08 16:18:19.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(4)
next(4)
2020-01-08 16:18:20.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event next(5)
next(5)
2020-01-08 16:18:20.429: delaySubscription.playground:36 (__lldb_expr_11) -> Event completed
completed
2020-01-08 16:18:20.429: delaySubscription.playground:36 (__lldb_expr_11) -> isDisposed

 

 

Delay, DelaySubsciption 예시 출처 : https://kyungmosung.github.io/2020/01/09/rxswift-conditional-timebased-operators/

 

2. Do

  • Do — Observable의 생명주기 동안 발생하는 여러 이벤트에서 실행 될 액션을 등록한다

구현부

public func `do`(onNext: ((Element) throws -> Void)? = nil, afterNext: ((Element) throws -> Void)? = nil, onError: ((Swift.Error) throws -> Void)? = nil, afterError: ((Swift.Error) throws -> Void)? = nil, onCompleted: (() throws -> Void)? = nil, afterCompleted: (() throws -> Void)? = nil, onSubscribe: (() -> Void)? = nil, onSubscribed: (() -> Void)? = nil, onDispose: (() -> Void)? = nil)
        -> Observable<Element> {
            return Do(source: self.asObservable(), eventHandler: { e in
                switch e {
                case .next(let element):
                    try onNext?(element)
                case .error(let e):
                    try onError?(e)
                case .completed:
                    try onCompleted?()
                }
            }, afterEventHandler: { e in
                switch e {
                case .next(let element):
                    try afterNext?(element)
                case .error(let e):
                    try afterError?(e)
                case .completed:
                    try afterCompleted?()
                }
            }, onSubscribe: onSubscribe, onSubscribed: onSubscribed, onDispose: onDispose)
    }

예시

 let source = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

source.do(onNext: { event in
	print("doOn next: \(event)")
}).subscribe {
	print($0)
}

// doOn next: 0
// subscribe: next(0)
// doOn next: 1
// subscribe: next(1)
// doOn next: 2
// subscribe: next(2)

 

3. Materialize, Dematerialize

  • Materialize/Dematerialize — 배출된 항목이 어떤 알림을 통해 옵저버에게 전달 됐는지를 표현하며, 그 반대 과정을 수행할 수 있다
  • RxSwift 에서 쓰이는 Event Sequence를 제어할 수 있게 해준다.
  • 보통 Materialize / Dematerialize 는 함께 사용한다.
  • Observable을 분해할 수 있기 때문에 신중하게 사용해야 한다.

Materialize

  • Sequence 를 Event<Element> Sequence로 변환한다.
 

구현부

public func materialize() -> Observable<Event<Element>> {
        Materialize(source: self.asObservable())
    }

Dematerialize

  • Event<Element> Sequence 를 다시 Sequence로 변환한다.

구현부

public func dematerialize() -> Observable<Element.Element> {
        Dematerialize(source: self.asObservable())
    }

예시

let observable = Observable<Int>
    .create { observer -> Disposable in
        observer.onNext(1)
        observer.onNext(2)
        observer.onNext(3)
        observer.onError(NSError(domain: "", code: 100, userInfo: nil))
        observer.onError(NSError(domain: "", code: 200, userInfo: nil))
        return Disposables.create { }
}

observable
    .materialize()
    .map { event -> Event<Int> in
        switch event {
        case .error:
            return .next(999)
        default:
            return event
        }
    }
    .dematerialize()
    .subscribe { print($0) }

// next(1)
// next(2)
// next(3)
// next(999)
// completed

 

4. ObseveOn

  • ObserveOn — 옵저버가 어느 스케줄러 상에서 Observable을 관찰할지 명시한다

구현부

public func observe(on scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        guard let serialScheduler = scheduler as? SerialDispatchQueueScheduler else {
            return ObserveOn(source: self.asObservable(), scheduler: scheduler)
        }

        return ObserveOnSerialDispatchQueue(source: self.asObservable(),
                                            scheduler: serialScheduler)
    }

예시

let array = ["rabbit","fox"]

Observable.from(array).observe(on: MainScheduler.instance)
	.do(onNext: { _ in
		print("doOnNext \(Thread.isMainThread)")
	}).observe(on: ConcurrentDispatchQueueScheduler(qos: .background))
	.subscribe(onNext: { _ in
		print("subscribeNext \(Thread.isMainThread)")
	})
    
// doOnNext true
// doOnNext true
// subscribeNext false
// subscribeNext false

 

5. Subscribe

  • Subscribe — Observable이 배출하는 항목과 알림을 기반으로 동작한다

Observable의 방출 및 알림에 따라 작동

subscribe 연산자는 Observable에 observer를 연결하는 접착제이다. observer가 Observable에서 내보내는 항목을 보거나 Observable에서 오류 또는 완료된 알림을 수신하려면 먼저 이 연산자를 사용하여 해당 Observable을 구독(subscribe)해야 합니다.

 

Subscribe 연산자의 일반적인 구현은 1~3개의 메서드(이후 observer를 구성함)를 허용하거나 다음 세 가지 메서드를 포함하는 인터페이스를 구현 하는 객체(또는 Observer 또는 Subscriber 라고도 함)를 허용할 수 있습니다 .

onNext

Observable은 Observable이 항목을 방출할 때마다 이 메서드를 호출합니다. 이 메소드는 Observable이 내보낸 것을 매개변수로 사용합니다.

onError

Observable은 이 메서드를 호출하여 예상 데이터 생성에 실패했거나 다른 오류가 발생했음을 나타냅니다. 이것은 Observable을 중지하고 더 이상 onNext 또는 onCompleted 호출하지 않습니다.  onError메서드는 오류를 일으킨 원인에 대한 표시(때로는 Exception 또는 Throwable과 같은 객체, 구현에 따라 다른 경우에는 단순 문자열)를 매개변수로 사용합니다.

onCompleted

Observable은 오류가 발생하지 않은 경우 마지막으로 onNext를 호출한 후 이 메서드를 호출합니다.

 

Observable은 관찰자가 구독할 때까지 항목을 방출하기 시작하지 않으면 Cold Observable이라고 합니다. Observable이 언제든지 항목을 방출하기 시작할 수 있고 구독자가 구독 시점까지 이전에 방출된 항목을 놓치면서 시작 후 어느 시점에서 방출된 항목의 순서를 관찰하기 시작할 수 있는 경우 Observable을 Hot Observable이라고 합니다.

더보기

구현부

public func subscribe(
        onNext: ((Element) -> Void)? = nil,
        onError: ((Swift.Error) -> Void)? = nil,
        onCompleted: (() -> Void)? = nil,
        onDisposed: (() -> Void)? = nil
    ) -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }

예시

let numbers = [1,2,3,4,5]

let source = Observable.from(numbers)

source.subscribe {
    print($0)
}

// next(1)
// next(2)
// next(3)
// next(4)
// next(5)
// completed

 

6. SubscribeOn

  • SubscribeOn — Observable을 구독할 때 사용할 스케줄러를 명시한다

구현부

public func subscribe(on scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        SubscribeOn(source: self, scheduler: scheduler)
    }

예시

let array = ["rabbit","fox"]

Observable.from(array)
	.do(onNext: { _ in
		print("doOnNext \(Thread.isMainThread)")
	})
	.observe(on: ConcurrentDispatchQueueScheduler(qos: .background))
	.subscribe(onNext: { _ in
		print("subscribeNext \(Thread.isMainThread)")
	}).subscribe(on: MainScheduler.instance)
    
// doOnNext true
// doOnNext true
// subscribeNext false
// subscribeNext false

 

- ObserveOn vs SubscribeOn

  • observeOn은 특정 작업의 스케쥴러를 변경할 수 있어 여러번 사용하고, subscribeOn은 Observable이 동작하는 스케쥴러를 바꾸기 때문에 가급적 한번만 사용하는 것이 좋습니다.

예시

let source = Observable<String>.create { observer in
	for count in 1...3 {
		print("observable \(Thread.isMainThread)")
		observer.on(.next("\(count)"))
	}
	
    observer.on(.completed)

	return Disposables.create {
		print("dispose")
	}
}

source
	.observe(on: MainScheduler.instance)
	.map{ (intValue) -> String in
		print("map \(Thread.isMainThread)")
		return "\(intValue)"
	}
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .background))
	.do(onNext: { event in
		print("doOn next \(Thread.isMainThread)")
	})
	.observe(on: ConcurrentDispatchQueueScheduler(qos: .background))
	.subscribe{ event in
		print("subscribe next \(Thread.isMainThread)")
	}

// - obsableble 은 subscribeOn에 의해 background thread 에서 동작한다.
// observable false
// observable false
// observable false
// dispose
// - map 은 observeOn 에 의해 main thread 에서 이벤트를 가공한다. -> doOn 에 체인
// map true
// doOn next true
// map true
// doOn next true
// map true
// doOn next true
// - subscribe 는 observeOn에 의해 background thread 에서 이벤트를 수신한다.
// subscribe false next(1)
// subscribe false next(2)
// subscribe false next(3)
// subscribe false completed

 

7. Timeout

  • Timeout — 소스 Obvservable을 그대로 전달하지만, 대신 특정 시간 동안 배출된 항목이 없으면 오류 알림을 보낸다

구현부

public func timeout(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
            return Timeout(source: self.asObservable(), dueTime: dueTime, other: Observable.error(RxError.timeout), scheduler: scheduler)
    }

예시

let source = Observable<Int>
	.interval(0.5, scheduler: MainScheduler.instance)
	.filter { $0 < 2 }

source
	.timeout(1, scheduler: MainScheduler.instance)
	.do(onNext: { item in
		print(item)
	}, onError: { _ in
		print("error")
	}, onCompleted: nil)
    .subscribe(onNext: {
		print($0)
	})

// 0
// 0
// 1
// 1
// error

 

8. Using

  • Using — 소스 Observable과 동일한 생명주기를 갖는 Observable을 생성하는데, 이 Observable은 생명주기가 완료되면 리소스를 종료하고 반환한다
  • observableFactory: resouce에 연결된 Observable 을 반환하는 팩토리 함수
  • resourceFactory: 리소스 객체를 얻기위한 팩토리 함수

구현부

public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
        Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
    }

예시

class ResouceDisposable: Disposable {
	func dispose() {
		print("dispose!")
	}
}        

Observable.using({ () -> ResouceDisposable in
		return ResouceDisposable()
	}) { disposable in
		return Observable<Int>.interval(1, scheduler: MainScheduler.instance)
	}.debug()
	.take(3)
	.subscribe()



// (usingTest()) -> subscribed
// (usingTest()) -> Event next(0)
// (usingTest()) -> Event next(1)
// (usingTest()) -> Event next(2)
// (usingTest()) -> isDisposed
// dispose!

 

 

예시 참고 : https://brunch.co.kr/@tilltue/11

 

 

출처 : http://reactivex.io/documentation/ko/operators.html

 

ReactiveX - Operators

연산자 소개 ReactiveX를 지원하는 언어 별 구현체들은 다양한 연산자들을 제공하는데, 이 중에는 공통적으로 제공되는 연산자도 있지만 반대로 특정 구현체에서만 제공하는 연산자들도 존재한다

reactivex.io

 

댓글