RxSwift(5)-TimeBasedOperators
시간에 기반한 오퍼레이터-> 시간 흐름을 지연시켜주거나 제어
Buffering Operators
Buffering 연산자들은 Observable 시퀀스에서 방출된 이벤트들을 일정 기간 동안 모았다가 한 번에 처리할 수 있게 해줍니다. 이를 통해 과거 이벤트를 재생하거나 일정 시간 동안 버퍼링하는 기능을 구현할 수 있습니다.
Replay(_:)
새로운 구독자에게 과거에 방출된 요소들의 일부를 다시 전달합니다
// 마지막 N개의 이벤트를 새 구독자에게 전달
let replayedObservable = Observable<Int>.interval(1,scheduler: .instance).replay(2)
// 첫 번째 구독자
replayedObservable.subscribe(onNext: { value in
print("첫 번째 구독자: \(value)")
}).disposed(by: disposeBag)
// 나중에 두 번째 구독자 추가
// 이 구독자는 마지막 1개의 이벤트를 즉시 받게 됨
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
replayedObservable.subscribe(onNext: { value in
print("두 번째 구독자: \(value)")
}).disposed(by: disposeBag)
}
첫 번째 구독자: 0
첫 번째 구독자: 1
첫 번째 구독자: 2
첫 번째 구독자: 3
두 번째 구독자: 2
두 번째 구독자: 3
첫 번째 구독자: 4
두 번째 구독자: 4
첫 번째 구독자: 5
두 번째 구독자: 5
처음 구독하면 Observable이 시작되고 첫번재 구독자는 값을 받는다.
3초 후에 두번재 구독자가 구독을 시작할때 replay(2)에 의해 최근 2개의 값을 즉시 받습니다. 여기서는 2,3입니다
이 시점에 첫번재 구독자는 이미 0,1,2,3까지 받은 상태. replay(2)로 버퍼에는 최근값 2개만 저장.
그이후로는 두 구독자 모두 동일한 값 실시간으로 받습니다.
replay는 ConnectableObservable을 반환하는데 이 특별한 Observable은 connect() method호출해야 이벤트 방출을 시작하고. 모든 구독자가 준비될 때까지 이벤트 방출을 지연시킬 수 있다는 능력이 있습니다
Scope Option
replay를 사용할때 메모리 관리를 위해 scope 옵션을 지정할 수 있습니다.
// .whileConnected: 모든 구독이 해제되면 버퍼 해제 (기본값)
let whileConnectedObservable = sourceObservable.replay(2, scope: .whileConnected)
// .forever: 구독자가 없어도 버퍼 유지
let foreverObservable = sourceObservable.replay(2, scope: .forever)
replayAll
이벤트의 크기가 없어 들어온 모든 이벤트를 반복시켜줍니다. 그렇기에 메모리 이슈에 신경써야합니다.
Buffer
일정 시간단위로 모아서 배열형태로 방출합니다.
// 최대 3개의 이벤트를 5초마다 배열로 방출
sourceObservable
.buffer(timeSpan: .seconds(5), count: 3, scheduler: MainScheduler.instance)
.subscribe(onNext: { batch in
print("버퍼된 이벤트 그룹: \(batch)")
})
.disposed(by: disposeBag)
지정된 시간이 경과했거나 지정된 개수만큼 이벤트가 모였을때 이벤트를 배열로 방출합니다!!
// 사용자 액션을 배치로 서버에 전송
userActionObservable
.buffer(timeSpan: .seconds(30), count: 10, scheduler: MainScheduler.instance)
.filter { !$0.isEmpty }
.subscribe(onNext: { actions in
analyticsService.sendBatchEvents(actions)
})
.disposed(by: disposeBag)
현업에서는 빈번한 사용자 액션을 개별적으로 서버에 전송하는 대신 일정 단위로 묶어서 전송함으로써 네트워크 요청 횟수를 줄이고 성능을 최적화할수 있습니다.
핵심 특징
1. 모든 이벤트를 배열로 묶어서 하나의 이벤트로 방출
2. 지정된 시간이 경과 or 지정된 개수가 차면 방출
3. 배치처리, 네트워크 요청 최적화, 이벤트 누적분석에 사용할수 있겠다.
window
WINDOW는 이벤트들을 새로운 Observable 형태로 분리해준다. 각 Window는 독립된 Observable로 방출됩니다.
// 최대 3개의 이벤트를 5초마다 Observable로 분리
sourceObservable
.window(timeSpan: .seconds(5), count: 3, scheduler: MainScheduler.instance)
.flatMap { windowedObservable -> Observable<String> in
print("새 윈도우 시작!")
return windowedObservable.do(onCompleted: {
print("윈도우 완료!")
})
}
.subscribe(onNext: { value in
print("값: \(value)")
})
.disposed(by: disposeBag)
소스가 1초마다 문자열을 방출한다고 가정해보자:
새 윈도우 시작!
값: A
값: B
값: C
윈도우 완료! // 3개 이벤트로 윈도우 완료
새 윈도우 시작!
값: D
값: E
윈도우 완료! // 5초 경과로 윈도우 완료
새 윈도우 시작!
값: F
값: G
값: H
윈도우 완료! // 3개 이벤트로 윈도우 완료
...
이렇게 될수 있다.
핵심 특징:
1. 이벤트들을 독립된 Observable로 분리하여 방출(Observable<Element>)
2. Buffer와 동일하게 시간 똔느 개수 조건으로 분리
3. 독립적인 데이터 스트림 처리, 실시간 분석, 복잡한 그룹별 관리
Time-Shifting Operators
Observable 이벤트의 타이밍을 조정합니다.
dealy
Observable의 모든이벤트를 지정된 시간만큼 지연시킵니다.
// 성공 메시지를 잠시 표시 후 화면 전환
saveButton.rx.tap
.flatMapLatest { _ -> Observable<Void> in
return saveData()
.andThen(Observable.just(()))
.do(onNext: { _ in
self.showSuccessMessage()
})
.delay(.seconds(1), scheduler: MainScheduler.instance)
}
.subscribe(onNext: { _ in
self.navigateToNextScreen()
})
.disposed(by: disposeBag)
사용자 액션 후 성공메시지나 로딩 인디케이터를 잠시 표시한 후 화면을 전환하는 등의 상황에서 유용할거 같다.
delaySubscription
구독 자체를 지연시킨다. 지정된 시간이 지난 후에 소스 Observable을 구독하게 된다.
// 화면 전환 애니메이션 완료 후 데이터 로딩 시작
func viewDidAppear(_ animated: Bool) {
super.viewDidAppear(animated)
dataService.fetchData()
.delaySubscription(.milliseconds(300), scheduler: MainScheduler.instance)
.subscribe(onNext: { data in
self.updateUI(with: data)
})
.disposed(by: disposeBag)
}
이렇게 하면 화면 전환애니메이션과 데이터 로딩으로 인한 UI버벅임을 방지할 수 있다.
그렇다면 delay와 어떤 차이점이 있을까?
Delay는 Observable에서 방출된 모든 이벤트를 받지만, 지연된 시간에 전달받는 것이고 delaySubscritiopn은 구독 자체가 지연되어 구독 전 이벤트를 받지 못한다.
Timer operators
interval
지정된 간격마다 정수값(0부터시작)을 방출하는 무한 Observable을 생성한다.
// 위치 기반 서비스의 주기적 업데이트
Observable.interval(.minutes(5), scheduler: MainScheduler.instance)
.withUnretained(self) // self 참조 관리
.flatMapLatest { (owner, _) -> Observable<[NearbyPlace]> in
return owner.locationManager.rx.currentLocation()
.flatMap { location in
return owner.placeService.fetchNearbyPlaces(location: location)
}
}
.subscribe(onNext: { [weak self] places in
self?.updateNearbyPlaces(places)
})
.disposed(by: disposeBag)
이 패턴은 실시간 데이터 업데이트, 배경 동기화, 주기적인 상태 점검 등 다양한 상황에서 활용할 수 있다
timer
// 3초 후에 첫 번째 값을 방출하고, 그 후 1초마다 값 방출
Observable<Int>
.timer(.seconds(3), period: .seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { value in
print("타이머 값: \(value)")
})
.disposed(by: disposeBag)
// 5초 후에 한 번만 값을 방출하고 완료
Observable<Int>
.timer(.seconds(5), scheduler: MainScheduler.instance)
.subscribe(
onNext: { _ in
print("타이머 발동!")
},
onCompleted: {
print("타이머 완료!")
}
)
.disposed(by: disposeBag)
1. 구독과 첫 번째 값 방출 사이의 지연 시간 설정이 가능하다
2. 반복 주기 설정 가능
3. 반복 주기를 설정하지 않았다면 한번만 값 방출하고 완료
timeout
지정된 시간 내에 이벤트가 방출되지 않으면 에러 혹은 다른 Observable로 전환이 가능합니다.
// 5초 내에 다음 이벤트가 없으면 대체 Observable로 전환
sourceObservable
.timeout(.seconds(5), other: Observable.just("타임아웃 발생"), scheduler: MainScheduler.instance)
.subscribe(onNext: { value in
print("값: \(value)")
})
.disposed(by: disposeBag)
즉 이것은 API요청 타임아웃 처리에 유용하다
🔥 Hot Observable vs ❄️ Cold Observable
Time-based를 위해서는 이 개념의 차이를 알아야 합니다. 갑자기 뭔 뜬끔없이 뜨겁고 차가운건데?이럴수 잇죠 하지만
Hot Observable
Hot Observable은 구독 여부와 관계없이 이벤트를 방출하는 데이터 스트림입니다. 마치 실시간으로 진행 중인 TV 방송과 같이, 시청자(구독자)가 언제 채널을 돌리느냐에 따라 방송의 일부분만 볼 수 있습니다.
특징:
- 구독 시점 이후의 이벤트만 받음
- 여러 구독자가 동일한 데이터 스트림을 공유
- 구독자가 없어도 이벤트 방출 가능
- Subject 타입들(PublishSubject, BehaviorSubject 등)이 대표적
주요 사용 사례:
- UI 이벤트 처리 (버튼 탭, 스크롤 등)
- 센서 데이터 스트림 (위치, 가속도 등)
- WebSocket 연결
- NotificationCenter 이벤트
// Hot Observable 예시: UI 버튼 탭 이벤트
let buttonTaps = button.rx.tap.asObservable()
// 첫 번째 구독자
buttonTaps.subscribe(onNext: { _ in
print("버튼 탭 감지: 구독자 1")
}).disposed(by: disposeBag)
// 사용자가 버튼을 여러 번 탭한 후...
// 두 번째 구독자 (나중에 추가됨)
buttonTaps.subscribe(onNext: { _ in
print("버튼 탭 감지: 구독자 2")
}).disposed(by: disposeBag)
// 구독자 2는 구독 이후의 탭 이벤트만 받음
Cold Observable
Cold Observable은 구독할 때마다 데이터 스트림을 처음부터 새로 시작하는 Observable입니다. 마치 넷플릭스에서 영화를 보는 것처럼, 각 시청자는 언제든 처음부터 콘텐츠를 볼 수 있습니다.
특징:
- 구독 시점에 데이터 스트림이 시작됨
- 각 구독자마다 독립적인 데이터 스트림 생성
- 구독자가 없으면 데이터가 생성되지 않음
- 대부분의 기본 Observable 생성 연산자(just, from, create 등)가 Cold Observable 생성
주요 사용 사례:
- API 요청 및 네트워크 통신
- 파일 읽기/쓰기 작업
- 데이터베이스 쿼리
- 시간 기반 시퀀스(interval, timer)
// Cold Observable 예시: 구독할 때마다 새로운 API 요청 발생
func fetchUserProfile() -> Observable<UserProfile> {
return apiClient.get("/users/profile")
.map { response in
return UserProfile(from: response)
}
}
Hot과 Cold Observable 변환하기
Cold->Hot
실제 앱 개발에서는 Cold Observable을 Hot으로 변환하여 여러 구독자가 동일한 데이터 스트림을 공유하게 만들어야 할 때가 있습니다.
- publish() + connect()
let coldObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance) let hotObservable = coldObservable.publish() // 구독자 추가 hotObservable.subscribe(onNext: { value in print("구독자 1: \(value)") }).disposed(by: disposeBag) // 연결 시작 - 이 시점부터 데이터 방출 hotObservable.connect().disposed(by: disposeBag) // 3초 후 두 번째 구독자 추가 DispatchQueue.main.asyncAfter(deadline: .now() + 3) { hotObservable.subscribe(onNext: { value in print("구독자 2: \(value)") }).disposed(by: disposeBag) }
- share()
let sharedObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.share() // publish().refCount()의 간편 버전
// 첫 번째 구독자가 추가되면 자동으로 시작
sharedObservable.subscribe(onNext: { value in
print("구독자 1: \(value)")
}).disposed(by: disposeBag)
- replay(:)
// 최근 2개 이벤트를 새 구독자에게 제공
let replayedObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.replay(2)
replayedObservable.connect().disposed(by: disposeBag)
replay(:) 연산자는 Hot Observable로 변환하면서 지정한 버퍼 크기만큼의 최근 이벤트를 저장하고 새로운 구독자에게 제공했습니다.
Hot Observable: replay를 적용하면 구독 전 발생한 대부분의 이벤트는 여전히 놓치지만 버퍼 크기만큼의 최근 이벤트들은 받을 수 있었습니다.
Cold Observable: 원래의 Cold 특성이 사라지고 모든 구독자가 동일한 스트림을 공유하며, 새 구독자는 버퍼에 있는 이벤트들로부터 받습니다.
실제 개발에서의 응용 지침
Hot Observable 작업 시 고려사항
- 중요한 초기 이벤트를 놓치지 않기 위한 전략:
- replay 또는 shareReplay를 사용하여 중요한 이벤트를 버퍼링
- 앱 시작 시 중요한 Hot Observable은 즉시 구독하고 결과를 캐싱
- 타이밍 조정 주의사항:
- delay보다 throttle이나 debounce를 우선 고려 (이벤트 빈도 제어)
- delaySubscription은 놓쳐도 괜찮은 이벤트에만 사용
Cold Observable 작업 시 고려사항
- 공유 및 재사용 최적화:
- 동일한 데이터가 여러 곳에서 필요하면 replay().refCount() 또는 share(replay:) 사용
- API 요청은 기본적으로 Cold이므로, 중복 요청 방지를 위해 공유 메커니즘 적용
- 타이밍 조정 활용법:
- 화면 전환 애니메이션 후 데이터 로딩은 delaySubscription 사용
- 순차적 로딩이 필요한 경우 concat + delay 조합 활
요약: Hot vs Cold의 실전 선택 가이드
시나리오권장 Observable 유형적용 패턴
UI 이벤트 처리 | Hot | Subject 또는 UI.rx 확장 사용 |
네트워크 요청 | Cold + 공유 메커니즘 | share(replay:) 적용 |
사용자 상태 관리 | Hot + 버퍼링 | BehaviorRelay 또는 BehaviorSubject |
주기적 데이터 업데이트 | Cold → Hot 변환 | interval(...).publish() |
센서 데이터 스트림 | Hot | 이벤트 필터링/샘플링 추가 |
RxSwift에서 Hot과 Cold Observable의 특성을 제대로 이해하고 활용하면, 앱의 데이터 흐름을 더 효율적으로 설계하고 불필요한 중복 작업을 방지할 수 있습니다