RxSwift(5)-TimeBasedOperators

2025. 5. 4. 17:43·반응형프로그래밍

Buffering Operators

시간 기반 작업을 할 때 가장 자주 마주치는 문제가 바로 "이벤트를 어떻게 모아서 처리할까?"인데요.

사용자가 버튼을 연타한다거나, API 호출을 너무 자주 하게 되는 상황을 생각해보시면 이해가 쉬울 거예요.

Replay(_:)

Replay는 새로운 구독자에게 과거에 방출된 이벤트들을 다시 전달해주는 연산자입니다.

왜 이게 필요할까요?

예를 들어 사용자가 앱에서 로그인을 했는데, 로그인 상태를 여러 화면에서 동시에 참조해야 하는 상황을 생각해보세요.

로그인 Observable이 이미 성공 이벤트를 방출했다면, 나중에 구독하는 화면들은 그 정보를 받을 수 없겠죠?

// 마지막 2개의 이벤트를 새 구독자에게 전달
let replayedObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .replay(2)

// 첫 번째 구독자
replayedObservable.subscribe(onNext: { value in
    print("첫 번째 구독자: \(value)")
}).disposed(by: disposeBag)

// connect()를 호출해야 실제 이벤트 방출 시작
replayedObservable.connect().disposed(by: disposeBag)

// 3초 후에 두 번째 구독자 추가
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    replayedObservable.subscribe(onNext: { value in
        print("두 번째 구독자: \(value)")
    }).disposed(by: disposeBag)
}
실행결과
첫 번째 구독자: 0
첫 번째 구독자: 1
첫 번째 구독자: 2
두 번째 구독자: 1  // replay(2)로 인해 최근 2개 이벤트 즉시 수신
두 번째 구독자: 2
첫 번째 구독자: 3
두 번째 구독자: 3

저는 처음에 replay를 쓸 때 왜 connect()를 따로 호출해야 하는지 이해가 안 됐는데요.

알고 보니 replay는 ConnectableObservable을 반환하기 때문이었어요.

이건 모든 구독자가 준비될 때까지 기다렸다가 한 번에 시작할 수 있게 해주는 특별한 Observable입니다.

Scope 옵션으로 메모리 관리하기

replay를 사용할때 메모리 관리를 위해 scope 옵션을 지정할 수 있습니다.

// .whileConnected: 모든 구독이 해제되면 버퍼 해제 (기본값)
let whileConnectedObservable = sourceObservable.replay(2, scope: .whileConnected)

// .forever: 구독자가 없어도 버퍼 유지
let foreverObservable = sourceObservable.replay(2, scope: .forever)


.forever 옵션은 정말 조심해서 써야 해요.

ReplayAll - 모든 이벤트 저장

let replayAllObservable = sourceObservable.replayAll()

이벤트의 크기가 없어 들어온 모든 이벤트를 반복시켜줍니다. 그렇기에 메모리 이슈에 신경써야합니다.

Buffer

Buffer는 일정 시간이나 개수 단위로 이벤트를 모아서 배열로 방출하는 연산자입니다.

 왜 이게 필요할까요?

실제 프로젝트에서 사용자의 클릭 이벤트를 개별적으로 서버에 전송하면 네트워크 요청이 너무 많아져서 성능 문제가 생길수 있습니다.

그래서 Buffer를 사용해서 30초마다 또는 10개씩 모아서 한 번에 전송하도록 개선할수 있는거죠

// 최대 3개의 이벤트를 5초마다 배열로 방출
sourceObservable
    .buffer(timeSpan: .seconds(5), count: 3, scheduler: MainScheduler.instance)
    .subscribe(onNext: { batch in
        print("버퍼된 이벤트 그룹: \(batch)")
    })
    .disposed(by: disposeBag)

동작 원리:

  • 5초가 지나거나 3개 이벤트가 모이면 배열로 방출
  • 빈 배열도 방출될 수 있으니 filter로 걸러주는 게 좋아요

 

// 사용자 액션을 배치로 서버에 전송
userActionObservable
    .buffer(timeSpan: .seconds(30), count: 10, scheduler: MainScheduler.instance)
    .filter { !$0.isEmpty }
    .subscribe(onNext: { actions in
        analyticsService.sendBatchEvents(actions)
    })
    .disposed(by: disposeBag)

현업에서는 빈번한 사용자 액션을 개별적으로 서버에 전송하는 대신 일정 단위로 묶어서 전송함으로써 네트워크 요청 횟수를 줄이고 성능을 최적화할수 있습니다. 

핵심 특징

1. 모든 이벤트를 배열로 묶어서 하나의 이벤트로 방출

2. 지정된 시간이 경과 or 지정된 개수가 차면 방출

3. 배치처리, 네트워크 요청 최적화, 이벤트 누적분석에 사용할수 있겠다.

window

Window는 Buffer와 비슷하지만, 배열 대신 독립된 Observable을 방출한다는 점이 다릅니다.

언뜻 보면 "왜 굳이?"라고 생각할 수 있는데, 각 그룹을 독립적으로 처리해야 할 때 정말 유용해요.

Buffer vs Window - 핵심 차이점

Buffer: 이벤트들을 배열로 모아서 한 번에 방출

// Buffer 결과: [1, 2, 3], [4, 5, 6], [7, 8]
sourceObservable.buffer(timeSpan: .seconds(3), count: 3, scheduler: MainScheduler.instance)
    .subscribe(onNext: { batch in
        print("배치: \(batch)")  // [Int] 타입
        // 배열 전체를 한 번에 처리
        processBatch(batch)
    })

Window: 이벤트들을 독립된 Observable로 분할해서 방출 

// Window 결과: Observable<1,2,3>, Observable<4,5,6>, Observable<7,8>
sourceObservable.window(timeSpan: .seconds(3), count: 3, scheduler: MainScheduler.instance)
    .subscribe(onNext: { windowObservable in
        print("새 윈도우 Observable 시작!")  // Observable<Int> 타입
        
        // 각 윈도우를 독립적으로 처리
        windowObservable.subscribe(onNext: { value in
            print("윈도우 내 값: \(value)")
        }).disposed(by: disposeBag)
    })

언제 Window를 써야 할까?

1. 각 그룹별로 다른 연산을 적용해야 할 때

// 실시간 주식 가격을 5초 단위로 나눠서 각각 다른 분석 적용
stockPriceObservable
    .window(timeSpan: .seconds(5), scheduler: MainScheduler.instance)
    .enumerated()  // 윈도우 번호 추가
    .flatMap { (index, windowObservable) -> Observable<TechnicalAnalysis> in
        print("윈도우 \(index) 분석 시작")
        
        // 각 윈도우마다 독립적인 기술적 분석 적용
        return Observable.zip(
            windowObservable.reduce(0.0, accumulator: +),  // 합계
            windowObservable.scan(0) { count, _ in count + 1 },  // 개수
            windowObservable.reduce(Double.greatestFiniteMagnitude, accumulator: min),  // 최솟값
            windowObservable.reduce(0.0, accumulator: max)  // 최댓값
        )
        .map { sum, count, min, max in
            return TechnicalAnalysis(
                average: sum / Double(count),
                volatility: max - min,
                windowIndex: index
            )
        }
    }
    .subscribe(onNext: { analysis in
        print("윈도우 분석 결과: \(analysis)")
        updateChart(with: analysis)
    })
    .disposed(by: disposeBag)

Buffer로는 이런 복잡한 분석을 할 수 없어요. 배열로 받으면 RxSwift 연산자들을 체이닝할 수 없거든요.

스트리밍 데이터를 실시간으로 그룹 처리할 때

Time-Shifting Operators

시간을 조정하는 연산자들은 UX를 개선하는 데 정말 중요해요.

사용자가 "뭔가 느리다" 또는 "너무 급작스럽다"고 느끼는 순간들을 해결해줄 수 있거든요.

Delay

모든 이벤트를 지정된 시간만큼 뒤로 미루는 연산자입니다. 성공 메시지를 보여준 후 자동으로 화면을 닫는 상황에서 자주 사용해요

// 성공 메시지를 잠시 표시 후 화면 전환
saveButton.rx.tap
    .flatMapLatest { _ -> Observable<Void> in
        return saveData()
            .andThen(Observable.just(()))
            .do(onNext: { _ in
                self.showSuccessMessage()
            })
            .delay(.seconds(1), scheduler: MainScheduler.instance)
    }
    .subscribe(onNext: { _ in
        self.navigateToNextScreen()
    })
    .disposed(by: disposeBag)

사용자가 "저장 완료" 메시지를 읽을 시간을 주는 거예요.

너무 빨리 화면이 바뀌면 뭔가 제대로 저장됐는지 확신할 수 없거든요.

 

DelaySubscription

구독 자체를 지연시키는 연산자입니다. Delay와의 차이점은 뭘까요?

  • Delay: 이벤트는 받지만 전달을 지연
  • DelaySubscription: 아예 구독을 안 하니까 그동안의 이벤트를 놓침
// 화면 전환 애니메이션 완료 후 데이터 로딩 시작
func viewDidAppear(_ animated: Bool) {
    super.viewDidAppear(animated)
    
    dataService.fetchData()
        .delaySubscription(.milliseconds(300), scheduler: MainScheduler.instance)
        .subscribe(onNext: { data in
            self.updateUI(with: data)
        })
        .disposed(by: disposeBag)
}

화면 전환 애니메이션(보통 300ms)이 끝나고 나서 데이터를 가져오면 부드러운 UX를 만들 수 있어요.

애니메이션과 네트워크 작업이 동시에 실행되면 버벅거릴 수 있거든요 

 

 

Timer operators

시간 기반 이벤트를 만드는 연산자들이에요. 주기적인 작업이나 타임아웃 처리에 필수적입니다.

Interval

지정된 간격마다 정수값을 방출하는 무한 Observable입니다. 0부터 시작해서 1씩 증가해요.

// 위치 기반 서비스의 주기적 업데이트
Observable.interval(.minutes(5), scheduler: MainScheduler.instance)
    .withUnretained(self)  // self 참조 관리
    .flatMapLatest { (owner, _) -> Observable<[NearbyPlace]> in
        return owner.locationManager.rx.currentLocation()
            .flatMap { location in
                return owner.placeService.fetchNearbyPlaces(location: location)
            }
    }
    .subscribe(onNext: { [weak self] places in
        self?.updateNearbyPlaces(places)
    })
    .disposed(by: disposeBag)

이 패턴은 실시간 데이터 업데이트, 배경 동기화, 주기적인 상태 점검 등 다양한 상황에서 활용할 수 있다

Timer

Interval보다 더 유연한 타이머예요. 첫 번째 이벤트까지의 지연 시간과 반복 주기를 따로 설정할 수 있어요.

// 3초 후에 첫 번째 값을 방출하고, 그 후 1초마다 값 방출
Observable<Int>
    .timer(.seconds(3), period: .seconds(1), scheduler: MainScheduler.instance)
    .subscribe(onNext: { value in
        print("타이머 값: \(value)")
    })
    .disposed(by: disposeBag)

// 5초 후에 한 번만 값을 방출하고 완료
Observable<Int>
    .timer(.seconds(5), scheduler: MainScheduler.instance)
    .subscribe(
        onNext: { _ in
            print("타이머 발동!")
        },
        onCompleted: {
            print("타이머 완료!")
        }
    )
    .disposed(by: disposeBag)

1. 구독과 첫 번째 값 방출 사이의 지연 시간 설정이 가능하다

2. 반복 주기 설정 가능

3. 반복 주기를 설정하지 않았다면 한번만 값 방출하고 완료

Timeout

지정된 시간 내에 이벤트가 오지 않으면 에러를 발생시키거나 다른 Observable로 전환하는 연산자입니다.

// 5초 내에 다음 이벤트가 없으면 대체 Observable로 전환
sourceObservable
    .timeout(.seconds(5), other: Observable.just("타임아웃 발생"), scheduler: MainScheduler.instance)
    .subscribe(onNext: { value in
        print("값: \(value)")
    })
    .disposed(by: disposeBag)

 즉 이것은 API요청 타임아웃 처리에 유용하다

🔥 Hot Observable vs ❄️ Cold Observable

이 개념을 이해하지 못하면 time-based 작업에서 예상과 다른 결과가 나올 수 있어요.

처음에 저도 헷갈렸는데, 실제 사례로 설명해드릴게요.

Hot Observable

구독 여부와 관계없이 이벤트를 방출하는 Observable이에요.

TV 방송처럼 지금 이 순간에도 계속 진행되고 있어서, 언제 채널을 돌리느냐에 따라 방송의 일부분만 볼 수 있어요.

특징:

  • 구독 시점 이후의 이벤트만 받음
  • 여러 구독자가 동일한 데이터 스트림을 공유
  • 구독자가 없어도 이벤트 방출 가능
  • Subject 타입들(PublishSubject, BehaviorSubject 등)이 대표적

주요 사용 사례:

  • UI 이벤트 처리 (버튼 탭, 스크롤 등)
  • 센서 데이터 스트림 (위치, 가속도 등)
  • WebSocket 연결
  • NotificationCenter 이벤트
// Hot Observable 예시: UI 버튼 탭 이벤트
let buttonTaps = button.rx.tap.asObservable()

// 첫 번째 구독자
buttonTaps.subscribe(onNext: { _ in
    print("버튼 탭 감지: 구독자 1")
}).disposed(by: disposeBag)

// 사용자가 버튼을 여러 번 탭한 후...

// 두 번째 구독자 (나중에 추가됨)
buttonTaps.subscribe(onNext: { _ in
    print("버튼 탭 감지: 구독자 2")
}).disposed(by: disposeBag)

// 구독자 2는 구독 이후의 탭 이벤트만 받음

Cold Observable

Cold Observable은 구독할 때마다 데이터 스트림을 처음부터 새로 시작하는 Observable입니다.

마치 넷플릭스에서 영화를 보는 것처럼, 각 시청자는 언제든 처음부터 콘텐츠를 볼 수 있습니다.

특징:

  • 구독 시점에 데이터 스트림이 시작됨
  • 각 구독자마다 독립적인 데이터 스트림 생성
  • 구독자가 없으면 데이터가 생성되지 않음
  • 대부분의 기본 Observable 생성 연산자(just, from, create 등)가 Cold Observable 생성

주요 사용 사례:

  • API 요청 및 네트워크 통신
  • 파일 읽기/쓰기 작업
  • 데이터베이스 쿼리
  • 시간 기반 시퀀스(interval, timer)
// Cold Observable 예시: 구독할 때마다 새로운 API 요청 발생
func fetchUserProfile() -> Observable<UserProfile> {
    return apiClient.get("/users/profile")
        .map { response in
            return UserProfile(from: response)
        }
}

Hot과 Cold Observable 변환하기

Cold를 Hot으로 변환하기

실제 개발에서는 Cold Observable을 Hot으로 변환해서 여러 구독자가 같은 데이터를 공유하게 만들어야 할 때가 많아요.

share() - 가장 간단한 방법

let sharedProfile = fetchUserProfile().share()

// 두 구독자가 같은 API 결과를 공유
sharedProfile.subscribe(onNext: { profile in
    self.updateProfileUI(profile)
}).disposed(by: disposeBag)

sharedProfile.subscribe(onNext: { profile in
    self.cacheProfile(profile)
}).disposed(by: disposeBag)

shareReplay() - 버퍼링까지 포함

let cachedProfile = fetchUserProfile().shareReplay(1)

// 첫 번째 구독자가 API 요청 시작
cachedProfile.subscribe(onNext: { profile in
    self.updateProfileUI(profile)
}).disposed(by: disposeBag)

// 나중에 추가된 구독자는 캐시된 결과를 즉시 받음
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    cachedProfile.subscribe(onNext: { profile in
        self.showDetailView(profile)
    }).disposed(by: disposeBag)
}

이 패턴은 사용자 프로필처럼 자주 참조되는 데이터에 특히 유용해요. API 요청을 한 번만 하고 여러 곳에서 결과를 재사용할 수 있거든요 

 

실제 개발에서의 응용 지침

Hot Observable 작업 시 고려사항

  1. 중요한 초기 이벤트를 놓치지 않기 위한 전략:
    • replay 또는 shareReplay를 사용하여 중요한 이벤트를 버퍼링
    • 앱 시작 시 중요한 Hot Observable은 즉시 구독하고 결과를 캐싱
  2. 타이밍 조정 주의사항:
    • delay보다 throttle이나 debounce를 우선 고려 (이벤트 빈도 제어)
    • delaySubscription은 놓쳐도 괜찮은 이벤트에만 사용

Cold Observable 작업 시 고려사항

  1. 공유 및 재사용 최적화:
    • 동일한 데이터가 여러 곳에서 필요하면 replay().refCount() 또는 share(replay:) 사용
    • API 요청은 기본적으로 Cold이므로, 중복 요청 방지를 위해 공유 메커니즘 적용
  2. 타이밍 조정 활용법:
    • 화면 전환 애니메이션 후 데이터 로딩은 delaySubscription 사용
    • 순차적 로딩이 필요한 경우 concat + delay 조합 활

요약: Hot vs Cold의 실전 선택 가이드

시나리오권장 Observable 유형적용 패턴

UI 이벤트 처리 Hot Subject 또는 UI.rx 확장 사용
네트워크 요청 Cold + 공유 메커니즘 share(replay:) 적용
사용자 상태 관리 Hot + 버퍼링 BehaviorRelay 또는 BehaviorSubject
주기적 데이터 업데이트 Cold → Hot 변환 interval(...).publish()
센서 데이터 스트림 Hot 이벤트 필터링/샘플링 추가

RxSwift에서 Hot과 Cold Observable의 특성을 제대로 이해하고 활용하면, 앱의 데이터 흐름을 더 효율적으로 설계하고 불필요한 중복 작업을 방지할 수 있습니다

'반응형프로그래밍' 카테고리의 다른 글

RxSwift(7)-BehaviorRelay  (0) 2025.05.06
RxSwift(6)-RxCocoa(bind, drive, DelegateProxy)  (0) 2025.05.06
RxSwift(4)-Combining Operators  (0) 2025.05.02
RxSwift(3)-Filtering Operators & TransForming Operators  (0) 2025.04.29
RxSwift(2)-Subject  (0) 2025.04.29
'반응형프로그래밍' 카테고리의 다른 글
  • RxSwift(7)-BehaviorRelay
  • RxSwift(6)-RxCocoa(bind, drive, DelegateProxy)
  • RxSwift(4)-Combining Operators
  • RxSwift(3)-Filtering Operators & TransForming Operators
2료일
2료일
좌충우돌 모든것을 다 정리하려고 노력하는 J가 되려고 하는 세미개발자의 블로그입니다. 편하게 보고 가세요
  • 2료일
    GPT에게서 살아남기
    2료일
  • 전체
    오늘
    어제
    • 분류 전체보기 (120)
      • SWIFT개발 (29)
      • 알고리즘 (25)
      • Design (6)
      • ARkit (1)
      • 면접준비 (30)
      • UIkit (2)
      • Vapor-Server with swift (3)
      • 디자인패턴 (5)
      • 반응형프로그래밍 (12)
      • CS (3)
      • 도서관 (1)
  • 인기 글

  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.4
2료일
RxSwift(5)-TimeBasedOperators
상단으로

티스토리툴바