반응형프로그래밍

RxSwift(5)-TimeBasedOperators

2료일 2025. 5. 4. 17:43

시간에 기반한 오퍼레이터-> 시간 흐름을 지연시켜주거나 제어

Buffering Operators

Buffering 연산자들은 Observable 시퀀스에서 방출된 이벤트들을 일정 기간 동안 모았다가 한 번에 처리할 수 있게 해줍니다. 이를 통해 과거 이벤트를 재생하거나 일정 시간 동안 버퍼링하는 기능을 구현할 수 있습니다.

Replay(_:)

새로운 구독자에게 과거에 방출된 요소들의 일부를 다시 전달합니다

// 마지막 N개의 이벤트를 새 구독자에게 전달
let replayedObservable = Observable<Int>.interval(1,scheduler: .instance).replay(2)

// 첫 번째 구독자
replayedObservable.subscribe(onNext: { value in
    print("첫 번째 구독자: \(value)")
}).disposed(by: disposeBag)

// 나중에 두 번째 구독자 추가
// 이 구독자는 마지막 1개의 이벤트를 즉시 받게 됨
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    replayedObservable.subscribe(onNext: { value in
        print("두 번째 구독자: \(value)")
    }).disposed(by: disposeBag)

}
첫 번째 구독자: 0
첫 번째 구독자: 1
첫 번째 구독자: 2
첫 번째 구독자: 3
두 번째 구독자: 2
두 번째 구독자: 3
첫 번째 구독자: 4
두 번째 구독자: 4
첫 번째 구독자: 5
두 번째 구독자: 5

처음 구독하면 Observable이 시작되고 첫번재 구독자는 값을 받는다.

3초 후에 두번재 구독자가 구독을 시작할때 replay(2)에 의해 최근 2개의 값을 즉시 받습니다. 여기서는 2,3입니다

이 시점에 첫번재 구독자는 이미 0,1,2,3까지 받은 상태. replay(2)로 버퍼에는 최근값 2개만 저장. 

그이후로는 두 구독자 모두 동일한 값 실시간으로 받습니다.

replay는 ConnectableObservable을 반환하는데 이 특별한 Observable은 connect() method호출해야 이벤트 방출을 시작하고. 모든 구독자가 준비될 때까지 이벤트 방출을 지연시킬 수 있다는 능력이 있습니다

Scope Option

replay를 사용할때 메모리 관리를 위해 scope 옵션을 지정할 수 있습니다.

// .whileConnected: 모든 구독이 해제되면 버퍼 해제 (기본값)
let whileConnectedObservable = sourceObservable.replay(2, scope: .whileConnected)

// .forever: 구독자가 없어도 버퍼 유지
let foreverObservable = sourceObservable.replay(2, scope: .forever)

replayAll

이벤트의 크기가 없어 들어온 모든 이벤트를 반복시켜줍니다. 그렇기에 메모리 이슈에 신경써야합니다.

Buffer

일정 시간단위로 모아서 배열형태로 방출합니다.

// 최대 3개의 이벤트를 5초마다 배열로 방출
sourceObservable
    .buffer(timeSpan: .seconds(5), count: 3, scheduler: MainScheduler.instance)
    .subscribe(onNext: { batch in
        print("버퍼된 이벤트 그룹: \(batch)")
    })
    .disposed(by: disposeBag)

지정된 시간이 경과했거나 지정된 개수만큼 이벤트가 모였을때 이벤트를 배열로 방출합니다!!

// 사용자 액션을 배치로 서버에 전송
userActionObservable
    .buffer(timeSpan: .seconds(30), count: 10, scheduler: MainScheduler.instance)
    .filter { !$0.isEmpty }
    .subscribe(onNext: { actions in
        analyticsService.sendBatchEvents(actions)
    })
    .disposed(by: disposeBag)

현업에서는 빈번한 사용자 액션을 개별적으로 서버에 전송하는 대신 일정 단위로 묶어서 전송함으로써 네트워크 요청 횟수를 줄이고 성능을 최적화할수 있습니다. 

핵심 특징

1. 모든 이벤트를 배열로 묶어서 하나의 이벤트로 방출

2. 지정된 시간이 경과 or 지정된 개수가 차면 방출

3. 배치처리, 네트워크 요청 최적화, 이벤트 누적분석에 사용할수 있겠다.

window

WINDOW는 이벤트들을 새로운 Observable 형태로 분리해준다. 각 Window는 독립된 Observable로 방출됩니다.

// 최대 3개의 이벤트를 5초마다 Observable로 분리
sourceObservable
    .window(timeSpan: .seconds(5), count: 3, scheduler: MainScheduler.instance)
    .flatMap { windowedObservable -> Observable<String> in
        print("새 윈도우 시작!")
        return windowedObservable.do(onCompleted: {
            print("윈도우 완료!")
        })
    }
    .subscribe(onNext: { value in
        print("값: \(value)")
    })
    .disposed(by: disposeBag)

소스가 1초마다 문자열을 방출한다고 가정해보자:

새 윈도우 시작!
값: A
값: B
값: C
윈도우 완료!  // 3개 이벤트로 윈도우 완료
새 윈도우 시작!
값: D
값: E
윈도우 완료!  // 5초 경과로 윈도우 완료
새 윈도우 시작!
값: F
값: G
값: H
윈도우 완료!  // 3개 이벤트로 윈도우 완료
...

이렇게 될수 있다.

핵심 특징:

1. 이벤트들을 독립된 Observable로 분리하여 방출(Observable<Element>)

2. Buffer와 동일하게 시간 똔느 개수 조건으로 분리

3. 독립적인 데이터 스트림 처리, 실시간 분석, 복잡한 그룹별 관리 

Time-Shifting Operators

Observable 이벤트의 타이밍을 조정합니다.

dealy

Observable의 모든이벤트를 지정된 시간만큼 지연시킵니다.

// 성공 메시지를 잠시 표시 후 화면 전환
saveButton.rx.tap
    .flatMapLatest { _ -> Observable<Void> in
        return saveData()
            .andThen(Observable.just(()))
            .do(onNext: { _ in
                self.showSuccessMessage()
            })
            .delay(.seconds(1), scheduler: MainScheduler.instance)
    }
    .subscribe(onNext: { _ in
        self.navigateToNextScreen()
    })
    .disposed(by: disposeBag)

사용자 액션 후 성공메시지나 로딩 인디케이터를 잠시 표시한 후 화면을 전환하는 등의 상황에서 유용할거 같다.

delaySubscription

구독 자체를 지연시킨다. 지정된 시간이 지난 후에 소스 Observable을 구독하게 된다.

// 화면 전환 애니메이션 완료 후 데이터 로딩 시작
func viewDidAppear(_ animated: Bool) {
    super.viewDidAppear(animated)
    
    dataService.fetchData()
        .delaySubscription(.milliseconds(300), scheduler: MainScheduler.instance)
        .subscribe(onNext: { data in
            self.updateUI(with: data)
        })
        .disposed(by: disposeBag)
}

이렇게 하면 화면 전환애니메이션과 데이터 로딩으로 인한 UI버벅임을 방지할 수 있다. 

그렇다면 delay와 어떤 차이점이 있을까?

Delay는 Observable에서 방출된 모든 이벤트를 받지만, 지연된 시간에 전달받는 것이고 delaySubscritiopn은 구독 자체가 지연되어 구독 전 이벤트를 받지 못한다.

Timer operators

interval

지정된 간격마다 정수값(0부터시작)을 방출하는 무한 Observable을 생성한다.

// 위치 기반 서비스의 주기적 업데이트
Observable.interval(.minutes(5), scheduler: MainScheduler.instance)
    .withUnretained(self)  // self 참조 관리
    .flatMapLatest { (owner, _) -> Observable<[NearbyPlace]> in
        return owner.locationManager.rx.currentLocation()
            .flatMap { location in
                return owner.placeService.fetchNearbyPlaces(location: location)
            }
    }
    .subscribe(onNext: { [weak self] places in
        self?.updateNearbyPlaces(places)
    })
    .disposed(by: disposeBag)

이 패턴은 실시간 데이터 업데이트, 배경 동기화, 주기적인 상태 점검 등 다양한 상황에서 활용할 수 있다

timer

// 3초 후에 첫 번째 값을 방출하고, 그 후 1초마다 값 방출
Observable<Int>
    .timer(.seconds(3), period: .seconds(1), scheduler: MainScheduler.instance)
    .subscribe(onNext: { value in
        print("타이머 값: \(value)")
    })
    .disposed(by: disposeBag)

// 5초 후에 한 번만 값을 방출하고 완료
Observable<Int>
    .timer(.seconds(5), scheduler: MainScheduler.instance)
    .subscribe(
        onNext: { _ in
            print("타이머 발동!")
        },
        onCompleted: {
            print("타이머 완료!")
        }
    )
    .disposed(by: disposeBag)

1. 구독과 첫 번째 값 방출 사이의 지연 시간 설정이 가능하다

2. 반복 주기 설정 가능

3. 반복 주기를 설정하지 않았다면 한번만 값 방출하고 완료

timeout

지정된 시간 내에 이벤트가 방출되지 않으면 에러 혹은 다른 Observable로 전환이 가능합니다.

// 5초 내에 다음 이벤트가 없으면 대체 Observable로 전환
sourceObservable
    .timeout(.seconds(5), other: Observable.just("타임아웃 발생"), scheduler: MainScheduler.instance)
    .subscribe(onNext: { value in
        print("값: \(value)")
    })
    .disposed(by: disposeBag)

 즉 이것은 API요청 타임아웃 처리에 유용하다

🔥 Hot Observable vs ❄️ Cold Observable

Time-based를 위해서는 이 개념의 차이를 알아야 합니다. 갑자기 뭔 뜬끔없이 뜨겁고 차가운건데?이럴수 잇죠 하지만 

Hot Observable

Hot Observable은 구독 여부와 관계없이 이벤트를 방출하는 데이터 스트림입니다. 마치 실시간으로 진행 중인 TV 방송과 같이, 시청자(구독자)가 언제 채널을 돌리느냐에 따라 방송의 일부분만 볼 수 있습니다.

특징:

  • 구독 시점 이후의 이벤트만 받음
  • 여러 구독자가 동일한 데이터 스트림을 공유
  • 구독자가 없어도 이벤트 방출 가능
  • Subject 타입들(PublishSubject, BehaviorSubject 등)이 대표적

주요 사용 사례:

  • UI 이벤트 처리 (버튼 탭, 스크롤 등)
  • 센서 데이터 스트림 (위치, 가속도 등)
  • WebSocket 연결
  • NotificationCenter 이벤트
// Hot Observable 예시: UI 버튼 탭 이벤트
let buttonTaps = button.rx.tap.asObservable()

// 첫 번째 구독자
buttonTaps.subscribe(onNext: { _ in
    print("버튼 탭 감지: 구독자 1")
}).disposed(by: disposeBag)

// 사용자가 버튼을 여러 번 탭한 후...

// 두 번째 구독자 (나중에 추가됨)
buttonTaps.subscribe(onNext: { _ in
    print("버튼 탭 감지: 구독자 2")
}).disposed(by: disposeBag)

// 구독자 2는 구독 이후의 탭 이벤트만 받음

Cold Observable

Cold Observable은 구독할 때마다 데이터 스트림을 처음부터 새로 시작하는 Observable입니다. 마치 넷플릭스에서 영화를 보는 것처럼, 각 시청자는 언제든 처음부터 콘텐츠를 볼 수 있습니다.

특징:

  • 구독 시점에 데이터 스트림이 시작됨
  • 각 구독자마다 독립적인 데이터 스트림 생성
  • 구독자가 없으면 데이터가 생성되지 않음
  • 대부분의 기본 Observable 생성 연산자(just, from, create 등)가 Cold Observable 생성

주요 사용 사례:

  • API 요청 및 네트워크 통신
  • 파일 읽기/쓰기 작업
  • 데이터베이스 쿼리
  • 시간 기반 시퀀스(interval, timer)
// Cold Observable 예시: 구독할 때마다 새로운 API 요청 발생
func fetchUserProfile() -> Observable<UserProfile> {
    return apiClient.get("/users/profile")
        .map { response in
            return UserProfile(from: response)
        }
}

Hot과 Cold Observable 변환하기

Cold->Hot

실제 앱 개발에서는 Cold Observable을 Hot으로 변환하여 여러 구독자가 동일한 데이터 스트림을 공유하게 만들어야 할 때가 있습니다.

  • publish() + connect()
     
    let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    let hotObservable = coldObservable.publish()
    
    // 구독자 추가
    hotObservable.subscribe(onNext: { value in
        print("구독자 1: \(value)")
    }).disposed(by: disposeBag)
    
    // 연결 시작 - 이 시점부터 데이터 방출
    hotObservable.connect().disposed(by: disposeBag)
    
    // 3초 후 두 번째 구독자 추가
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        hotObservable.subscribe(onNext: { value in
            print("구독자 2: \(value)")
        }).disposed(by: disposeBag)
    }
  • share()
     
let sharedObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .share()  // publish().refCount()의 간편 버전

// 첫 번째 구독자가 추가되면 자동으로 시작
sharedObservable.subscribe(onNext: { value in
    print("구독자 1: \(value)")
}).disposed(by: disposeBag)

 

  • replay(:)
// 최근 2개 이벤트를 새 구독자에게 제공
let replayedObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .replay(2)

replayedObservable.connect().disposed(by: disposeBag)

replay(:) 연산자는 Hot Observable로 변환하면서 지정한 버퍼 크기만큼의 최근 이벤트를 저장하고 새로운 구독자에게 제공했습니다. 

Hot Observable: replay를 적용하면 구독 전 발생한 대부분의 이벤트는 여전히 놓치지만 버퍼 크기만큼의 최근 이벤트들은 받을 수 있었습니다.

Cold Observable: 원래의 Cold 특성이 사라지고 모든 구독자가 동일한 스트림을 공유하며, 새 구독자는 버퍼에 있는 이벤트들로부터 받습니다. 

실제 개발에서의 응용 지침

Hot Observable 작업 시 고려사항

  1. 중요한 초기 이벤트를 놓치지 않기 위한 전략:
    • replay 또는 shareReplay를 사용하여 중요한 이벤트를 버퍼링
    • 앱 시작 시 중요한 Hot Observable은 즉시 구독하고 결과를 캐싱
  2. 타이밍 조정 주의사항:
    • delay보다 throttle이나 debounce를 우선 고려 (이벤트 빈도 제어)
    • delaySubscription은 놓쳐도 괜찮은 이벤트에만 사용

Cold Observable 작업 시 고려사항

  1. 공유 및 재사용 최적화:
    • 동일한 데이터가 여러 곳에서 필요하면 replay().refCount() 또는 share(replay:) 사용
    • API 요청은 기본적으로 Cold이므로, 중복 요청 방지를 위해 공유 메커니즘 적용
  2. 타이밍 조정 활용법:
    • 화면 전환 애니메이션 후 데이터 로딩은 delaySubscription 사용
    • 순차적 로딩이 필요한 경우 concat + delay 조합 활

요약: Hot vs Cold의 실전 선택 가이드

시나리오권장 Observable 유형적용 패턴

UI 이벤트 처리 Hot Subject 또는 UI.rx 확장 사용
네트워크 요청 Cold + 공유 메커니즘 share(replay:) 적용
사용자 상태 관리 Hot + 버퍼링 BehaviorRelay 또는 BehaviorSubject
주기적 데이터 업데이트 Cold → Hot 변환 interval(...).publish()
센서 데이터 스트림 Hot 이벤트 필터링/샘플링 추가

RxSwift에서 Hot과 Cold Observable의 특성을 제대로 이해하고 활용하면, 앱의 데이터 흐름을 더 효율적으로 설계하고 불필요한 중복 작업을 방지할 수 있습니다