RxSwift(4)-Combining Operators

2025. 5. 2. 02:53·반응형프로그래밍

이번에는 여러 Observable 시퀀스를 하나로 결합하거나, Observable과 다른 소스의 데이를 통합하는 연산자를 알아보겠습니다~

이는 여러 이벤트 스트림을 관리하고 동기화해야 하는 복잡한 상황에서 유용합니다.

startWith(_:)

생성되어 방출되는 옵저버블에 대해 초기값을 줄 수 잇습니다.

// 네트워크에서 데이터를 로드하기 전에 캐시된 데이터 먼저 표시
loadFromNetwork()
    .startWith(loadFromCache())
    .bind(to: tableView.rx.items)
    .disposed(by: disposeBag)

초기 상태 또는 기본값을 제공하는데 유용할것같다.

concat(_:)

여러 옵저버블 시퀀스를 순서대로 합쳐줍니다.사실 합쳐진다느낌보다는 순서대로 진행한다? 가 더 맞는 표현이라고 한다. 

// 로컬 캐시를 먼저 확인하고, 없으면 네트워크에서 로드
let cachedData = loadFromCache()
    .catch { _ in Observable.empty() }
    
let remoteData = loadFromNetwork()

Observable.concat(cachedData, remoteData)
    .take(1) // 첫 번째 유효한 결과만 사용
    .bind(to: collectionView.rx.items)
    .disposed(by: disposeBag)

이런식으로 순차적인데이터 로딩이 필요한 경우 사용할수 있습니다.

아 여기서 두가지가 있다고한다!!! 현재 위에꺼는 static function이다. 

이번엔 인스턴스 메서드 concat(_:)을 알아보자.

기존 Observable이 다 emit될때까지 기다린 다음 concat으로 받아온 후 Observable을 등록하여 사용한다고 한다.

// Observable 인스턴스에서 호출
let first = Observable.of(1, 2, 3)
let second = Observable.of(4, 5, 6)

// first Observable에 second를 연결
first.concat(second)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

이게 instance메서드방식. 첫번재꺼에 두번째거를 붙히는 형식이라고 생각하면된다.

merge

여러 observable 이벤트를 소스 구분 없이 하나의 Observable로 결합합니다. 순서가 없는것! 그냥 흐름상 맞게 합쳐줍니다

source sequence와 모든 내부 sequence들이 완료되었을 때 끝납니다. 

만약 Merged된 observable중에 onError나 onCompleted로 잡았다면 그 때 terminated된다.

// 여러 입력 소스의 이벤트를 하나의 스트림으로 처리
let buttonTaps = button.rx.tap.map { _ in "버튼 탭됨" }
let gestureTaps = gestureRecognizer.rx.event.map { _ in "제스처 감지됨" }
let menuSelections = menuItem.rx.tap.map { _ in "메뉴 선택됨" }

Observable.merge(buttonTaps, gestureTaps, menuSelections)
    .subscribe(onNext: { actionSource in
        analytics.trackUserAction(source: actionSource)
    })
    .disposed(by: disposeBag)

여러소스에서 동일한 유형의 이벤트를 처리할 때 유용합니다.

또 여기에는 최대 합칠수 있는 카운트를 정해줄 수 있다. -> merge(maxCocurrent:)

이는 limit에 도달하면 observable을 대기 큐에 넣고 merge하고 있는 각각의 시퀀스 중에 만약 어떤것이 종료되어 끝날때 큐에 있는 다른것을 꺼내서 merge도와줌. 

combineLatest의 기본 원리

combineLatest는 여러 Observable 소스에서 가장 최근에 방출된 항목들을 결합합니다. 주요 특징:

  • 각 소스가 최소 하나의 항목을 방출해야 결과가 생성됨
  • 어떤 소스에서든 새 항목이 방출되면 모든 소스의 최신 값을 사용하여 새 결과 생성
  • 결합 방식은 제공된 클로저에 의해 결정됨

.combineLatest(::resultSelector:)

내부결합된 sequence들은 값을 방출할 때마다, 제공한 클로저를 호출하며 우리는 각각의 내부 sequence들의 받는다.

언제사용할수있을까? 여러 textFiled들을 한번에 관찰하고 값을 결합하거나 여러 소스들 상태를 보고 싶을 때.

let numbers = PublishSubject<Int>()
let strings = PublishSubject<String>()

Observable.combineLatest(numbers, strings) { number, string in
    return "\(string): \(number)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

numbers.onNext(1)
strings.onNext("A")  // 출력: "A: 1"
numbers.onNext(2)    // 출력: "A: 2"

위 예시를 보면 combineLatest를 사용해서 만든 Observable은 combineLatest의 파라미터로 들어가는 두 Observable타입과 달라도 된다고 한다. 왜??? 내가 클로저에서 새로 결합해서 만들어가지고 새로운 Observable을 만드는 것이기에

combineLatest 를 사용하면 맨 처음에는.! → observable들에서 각각 최소 한개 이상의 event를 emit할 때 까지 계속 대기를 타서 대기를 타는 동안 클로저를 타지않는다.

그래서 만약 다른 한 쪽의 operator에서 emit되는데까지 너무 오랜시간이 걸려 기다려야한다면, startWith같이 앞쪽에 붙여 사용하는 Operator를 통해서 초기 observable을 주는 것도 나쁘지않음.

.combineLatest(::_:resultSelector:)

let numbers = PublishSubject<Int>()
let strings = PublishSubject<String>()
let booleans = PublishSubject<Bool>()

Observable.combineLatest(numbers, strings, booleans) { number, string, boolean in
    return "\(string): \(number) - \(boolean)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

numbers.onNext(1)
strings.onNext("A")
booleans.onNext(true)  // 출력: "A: 1 - true"
numbers.onNext(2)      // 출력: "A: 2 - true"

.combineLatest(_:) (배열 기반)

let observables = [
    Observable.just(1),
    Observable.just(2),
    Observable.just(3)
]

Observable.combineLatest(observables) { array in
    return array.reduce(0, +)
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 출력: 6

뭐 이밖에도 튜플도 가능하다.

ZIP

잉? 위의것과 먼가 비슷하면서 다르다. 잘 주의해야합니다.

zip은 여러 Observable에서 방출된 항목을 일대일로 대응시켜 조합합니다.

주요 특징:

  1. 순서 보장: 각 Observable의 항목을 순서대로 결합
  2. 동기화 필수: 모든 소스 Observable이 해당 위치(n번째)의 항목을 방출해야만 결합 결과 생성
  3. 완료 조건: 어느 하나의 Observable이 완료되면 전체 시퀀스도 완료됨
// 예제
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.zip(stringSubject, intSubject) { str, int in
    "\(str): \(int)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

stringSubject.onNext("A")  // 아직 아무것도 방출하지 않음
intSubject.onNext(1)  // "A: 1"
stringSubject.onNext("B")  // 아직 아무것도 방출하지 않음
stringSubject.onNext("C")  // 아직 아무것도 방출하지 않음
intSubject.onNext(2)  // "B: 2"

실제 활용: 서로 관련된 데이터 스트림을 순차적으로 처리할 때 유용합니다.

// 애니메이션 좌표와 색상 변경을 동기화
let positions = Observable<CGPoint>.interval(.milliseconds(100), scheduler: MainScheduler.instance)
    .map { CGPoint(x: $0 * 10, y: $0 * 15) }
    .take(10)
    
let colors = Observable<UIColor>.interval(.milliseconds(100), scheduler: MainScheduler.instance)
    .map { [.red, .green, .blue, .yellow][$0 % 4] }
    .take(10)

Observable.zip(positions, colors)
    .subscribe(onNext: { position, color in
        animatedView.center = position
        animatedView.backgroundColor = color
    })
    .disposed(by: disposeBag)

zip vs combineLatest: 결정적 차이점

zip과 combineLatest는 모두 여러 Observable을 결합하지만, 작동 방식에 중요한 차이가 있습니다:

1. 결합 트리거

  • zip: 모든 Observable에서 새 값이 방출되어야 함 (일대일 대응)
  • combineLatest: 어느 하나의 Observable에서 새 값이 방출되면 됨 (각 Observable의 최신 값 사용)

2. 완료 처리

  • zip: 어느 하나의 Observable이 완료되면 더 이상 결합할 짝이 없으므로 전체 시퀀스도 완료됨
  • combineLatest: 모든 Observable이 완료될 때까지 계속 값을 방출 (한쪽이 완료되어도 다른 쪽에서 값이 방출되면 계속 결합됨)
let sourceA = Observable.of("A", "B", "C", "D")
let sourceB = Observable.of(1, 2)  // 항목이 더 적음

Observable.zip(sourceA, sourceB) { a, b in
    return "\(a)\(b)"
}
.subscribe(onNext: { print($0) })
// 출력: "A1", "B2" (C와 D는 버려짐)

3. 메모리 사용

  • zip: 더 느린 Observable을 기다리기 위해 빠른 Observable의 항목을 버퍼링
  • combineLatest: 각 Observable의 최신 값만 

zip은 느린 Observable을 기다리기 위해 빠른 Observable의 항목을 버퍼링하므로 메모리 사용량이 증가할 수 있습니다:

 흠 그러면 실제 언제 zip을 쓸까?
1. 순서가 중요한경우: 두 시퀀스 항목이 일대일로 정확히 대응되어야할때
2. 단계적 처리가 필요한 경우: 여러 단계가 순차적으로 진행되어야 할때
3. 제한된 요소 셋의 일괄 처리 등
 

Trigger

 

 

한 Observable의 이벤트가 다른 Observable의 동작을 제어하는 패턴입니다 

withLatestFrom

파라미터에 들어간 textField의 트리거 역할을 button tap 이벤트로 정한 이미지! 그래서 탭 될때마다 last값 가져와서 새로운 Observable에 내보낸다.

주 Observable에서 이벤트가 발생할 때마다 보조 Observable의 최신 값을 방출

트리거가 발생할때마다 보조 Observable의 값을 가져온다고 생각하면된다

example(of: "withLatestFrom") {
     // 1
     let button = PublishSubject<Void>()
     let textField = PublishSubject<String>()

     // 2
     let observable = button.withLatestFrom(textField)  // textField의 트리거역할을 button 으로 정한다.
     _ = observable.subscribe(onNext: { print($0) })

     // 3
     textField.onNext("Suno")
     textField.onNext("ddfqef")
     textField.onNext("Paris")
     button.onNext(())
     button.onNext(())
 }


/*
prints

Paris
Paris
*/

자 보면 마지막 값인 Paris만 방출됨. 전의 값들은 무시

Sample(_:)

  • withLatestFrom과 유사하지만, 중복 값을 방출하지 않음
  • 보조 Observable에서 새 값이 방출되어야 트리거 이벤트가 다시 샘플링됨
let source = PublishSubject<String>()
let trigger = PublishSubject<Void>()

// withLatestFrom
trigger.withLatestFrom(source)
    .subscribe(onNext: { print("withLatestFrom: \($0)") })
    .disposed(by: disposeBag)

// sample
source.sample(trigger)
    .subscribe(onNext: { print("sample: \($0)") })
    .disposed(by: disposeBag)

source.onNext("A")
trigger.onNext(())  // 둘 다 "A" 출력
trigger.onNext(())  // withLatestFrom만 "A" 다시 출력, sample은 출력 안 함
source.onNext("B")
trigger.onNext(())  // 둘 다 "B" 출력
 
아 근데 주의해야할것이 withLatestFrom = right.withLatestFrom(left) => left의 트리거를 right가 한다.
하지만 sample은 left.withLatestFrom(right) => left의 트리거를 right가 한다. 

Switches

subscriber가 런타임에 수신할 sequence를 결정할 수 있도록 하는 연산자들

amb(_:)

먼저 이벤트를 방출하는 Observable로 결정한다. 먼저오는놈이 임자? 한번 결정되면 이후에 변경 X

=? 그러면 결국 해당 스트리밍은 어디껀지 모르자나? 런타임에 결정되겟구나

뭐 그러면 사용자 취소나 타임아웃 두개 중 먼저 발생하는 것을 처리할때 좋겟다

// 작업 타임아웃 또는 사용자 취소 중 먼저 발생하는 것 처리
let task = performLongRunningTask()
    .map { _ in TaskResult.completed }

let timeout = Observable<TaskResult>.timer(.seconds(10), scheduler: MainScheduler.instance)
    .map { _ in TaskResult.timedOut }

let userCancellation = cancelButton.rx.tap
    .map { _ in TaskResult.cancelled }

Observable.amb([task, timeout, userCancellation])
    .take(1)  // 첫 결과만 처리
    .subscribe(onNext: { result in
        switch result {
        case .completed:
            self.showSuccess()
        case .timedOut:
            self.showTimeoutError()
        case .cancelled:
            self.showCancellationMessage()
        }
    })
    .disposed(by: disposeBag)

SwitchLatests 

이게 중요하다고 한다. flatMapLatest와 비슷하대.. flatMapLatest = map + switchLatest

example(of: "switchLatest") {
     // 1
     let one = PublishSubject<String>()
     let two = PublishSubject<String>()
     let three = PublishSubject<String>()

     let source = PublishSubject<Observable<String>>()

     // 2
     let observable = source.switchLatest()  // sequence에 대한 switch가 가능하다고 선언함.
     let disposable = observable.subscribe(onNext: { print($0) })

     // 3
     source.onNext(one)   // 현재 source의 시퀀스는 one으로한다.
     one.onNext("Some text from sequence one")  // one의 event를 받아올 수 있다.
     two.onNext("Some text from sequence two")   

     source.onNext(two)    // 현재 source의 시퀀스는 two로한다.
     two.onNext("More text from sequence two")   // two의 event를 받아올 수 있다.
     one.onNext("and also from sequence one")   // two를 바라보도록 바뀌었으므로 one의 이벤트를 더 이상 받을 수 없다.

     source.onNext(three)    // 현재 source의 시퀀스를 three로한다.
     two.onNext("Why don't you see me?")   // 받을 수 X
     one.onNext("I'm alone, help me")   // 받을 수 X
     three.onNext("Hey it's three. I win")   // 현재 시퀀스가 three이므로 이벤트를 받아온다.

     source.onNext(one)   // 다시 바라보는 시퀀스를 one으로 바꾼다.
     one.onNext("Nope. It's me, one!")

     disposable.dispose()

     /* Prints:
      Some text from sequence one
      More text from sequence two
      Hey it's three. I win
      Nope. It's me, one!
      */
 }

새 Observable로 전환할때 이전 Observable을 자동으로 해제한다.

가장 최근에 방출된 Observable의 이벤트만 수신

한번에 하나의 내부 observable만 활성화되므로 메모리 효율적

 

참고자료

https://github.com/fimuxd/RxSwift/blob/master/Lectures/09_Combining%20Operators/Ch9.CombiningOperators.md

 

RxSwift/Lectures/09_Combining Operators/Ch9.CombiningOperators.md at master · fimuxd/RxSwift

RxSwift를 스터디하는 공간. Contribute to fimuxd/RxSwift development by creating an account on GitHub.

github.com

 

'반응형프로그래밍' 카테고리의 다른 글

RxSwift(6)-RxCocoa(bind, drive, DelegateProxy)  (0) 2025.05.06
RxSwift(5)-TimeBasedOperators  (0) 2025.05.04
RxSwift(3)-Filtering Operators & TransForming Operators  (0) 2025.04.29
RxSwift(2)-Subject  (0) 2025.04.29
RxSwift(1)-Observable  (1) 2025.04.28
'반응형프로그래밍' 카테고리의 다른 글
  • RxSwift(6)-RxCocoa(bind, drive, DelegateProxy)
  • RxSwift(5)-TimeBasedOperators
  • RxSwift(3)-Filtering Operators & TransForming Operators
  • RxSwift(2)-Subject
2료일
2료일
좌충우돌 모든것을 다 정리하려고 노력하는 J가 되려고 하는 세미개발자의 블로그입니다. 편하게 보고 가세요
  • 2료일
    GPT에게서 살아남기
    2료일
  • 전체
    오늘
    어제
    • 분류 전체보기 (120)
      • SWIFT개발 (29)
      • 알고리즘 (25)
      • Design (6)
      • ARkit (1)
      • 면접준비 (30)
      • UIkit (2)
      • Vapor-Server with swift (3)
      • 디자인패턴 (5)
      • 반응형프로그래밍 (12)
      • CS (3)
      • 도서관 (1)
  • 인기 글

  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.4
2료일
RxSwift(4)-Combining Operators
상단으로

티스토리툴바