- RxSwift(1)-Observable2025년 04월 28일
- 2료일
- 작성자
- 2025.04.28.:20
사실 저는 개인적으로 Combine을 너무나 사랑하고 자주 사용하는 개발자입니다. 같은 반응형 프로그래밍의 개념이지만 애플에서 만든 퍼스트 파티로 외부 라이브러리를 관리할 필요가 없기 때문이죠. 하지만 Combine이 나오기 이전에는 RxSwift로 사용을 했습니다. 그러다보니 여전히 여러 기업에서는 Rx를 쓰죠! 그렇기에 저도 Rx마스터가 되어야 하겟습니다.
Rx?: ReactiveX 프로그래밍 패러다임을 Swift로 구현한 라이브러리
비동기 프로그래밍을 선언적이고 함수형 접근 방식으로 처리하여 코드의 가독성과 유지보수성을 획기적으로 향상시킵니다.
복습 : 반응형 프로그래밍
: 데이터 흐름과 변경 사항의 전파에 중점을 둡니다. ex) data가 바뀌면 UI가 자동으로 반응한다
핵심 구성요소
1. Observable과 Observer
이벤트를 방출하는 객체(콤바인에서는 Publisher)
Observable 시퀀스 특징
enum Event<Element> { case next(Element) // 새로운 요소 발행 case error(Swift.Error) // 오류 발생 (시퀀스 종료) case completed // 정상 완료 (시퀀스 종료) }
계층구조
🧐: 흠 클래스로 이루어져있고 Element 제너릭 타입이군.. 오 ObservableType을 채택하고 있군
public protocol ObservableType: ObservableConvertibleType { /// 이 시퀀스에서 이벤트를 수신할 관찰자를 구독합니다. /// /// - parameter observer: 시퀀스에서 이벤트를 수신할 관찰자 /// - returns: 시퀀스의 구독을 처리하는 Disposable func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element }
- subscribe method: 프로토콜의 유일 필수 구현 메서드로 Observer가 Observable 시퀀스를 구독할 수 있게 한다.
- 이벤트 전달 매커니즘: 0개 이상의 .next event를 Observer에 전달할 수 ㅅ있다.
- 선택적으로 하나의 .error 또는 .completed 이벤트를 전달할 수 있으며 이후 시퀀스는 종료
- 시퀀스가 종료되면 내부 리소스 자동으로 해제, 명시적인 리소스 해제를 위해 반한된 Disposable의 dispose() 호출 ㄱㄴ합니다.
extension ObservableType { public func asObservable() -> Observable<Element> { Observable.create {o in self.subscribe(o)} } }
extension을 보면 asObservable함수를 정의한다. 그런데 이를 위해서는 ObservableConvertibleType프로토콜을 알아야한다.
public protocol ObservableConvertibleType { /// Observable 요소의 타입 associatedtype Element /// 이 객체를 Observable 시퀀스로 변환합니다. func asObservable() -> Observable<Element> }
이 프로토콜의 주 목적은 다양한 타입이 asObservable() 메서드를 통해 Observable 시퀀스로 변환될 수 있게 하는것이다. 이 프로토콜을 통해 커스텀 타입도 RxSwift의 생태계에 쉽게 통합할 수 있다.
결국 Observable은 ObservableType을 채택한 클래스!
ObservableType은 subscribe함수를 정의하고 ObservableConvertibleType을 채택한다.
ObservableConvertibleType은 asObservable함수를 정의한다.
Observable 생성
1. Create : ObservableType에 정의되어있다.
// 수동으로 라이프사이클 제어 let observable = Observable<String>.create { observer in observer.onNext("첫 번째 이벤트") observer.onNext("두 번째 이벤트") // 주석 해제하면 completed 전에 종료 // observer.onError(MyError.someError) observer.onCompleted() // 이 후의 이벤트는 무시됨 observer.onNext("무시되는 이벤트") return Disposables.create() }
사실 뭐 onCompleted처럼 수동으로 하는 경우는 거의 없다.
Combine에서의 Cancellable들을 객체가 해제될때 해제해주는 것처럼 disposebag이 있다.
2. Empty
extension ObservableType { /** Returns an empty observable sequence, using the specified scheduler to send out the single `Completed` message. - seealso: [empty operator on reactivex.io](http://reactivex.io/documentation/operators/empty-never-throw.html) - returns: An observable sequence with no elements. */ public static func empty() -> Observable<Element> { EmptyProducer<Element>() } } final private class EmptyProducer<Element>: Producer<Element> { override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { observer.on(.completed) return Disposables.create() } }
바로 Completed되기 때문에 더 이상 이 Observable에 대해 subscribe하거나 아무런 조치를 못한다.
??그럼 왜 써 이걸 어따써 아무 의미없는거 아냐??
- 조건부 데이터 소스 전환
func fetchData(forceRefresh: Bool) -> Observable<[Item]> { // 새로고침이 필요하지 않고 캐시가 유효한 경우 if !forceRefresh && isCacheValid() { return Observable.empty() } // 실제 네트워크 요청 수행 return apiClient.fetchItems() }
- 오류 복구 메커니즘
apiClient.fetchUserProfile() .catch { error -> Observable<UserProfile> in if error is NetworkTimeoutError { // 재시도 로직 트리거 self.scheduleRetry() return .empty() // 즉시 완료하여 에러 처리 흐름 종료 } // 다른 유형의 오류는 다시 throw return Observable.error(error) } .subscribe(onNext: { profile in self.updateUI(with: profile) }) .disposed(by: disposeBag)
- 테스트를 위한 목업작업 등
3. Never
extension ObservableType { /** Returns a non-terminating observable sequence, which can be used to denote an infinite duration. - seealso: [never operator on reactivex.io](http://reactivex.io/documentation/operators/empty-never-throw.html) - returns: An observable sequence whose observers will never get called. */ public static func never() -> Observable<Element> { NeverProducer() } } final private class NeverProducer<Element>: Producer<Element> { override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { Disposables.create() } }
다른점이 이제는 보이나요? 보면 create만 하고 아무런 이벤트를 넣어주지 않고 있어요!!
completed도 없고..방출도 안하고.. 오류도 안나와...? 그러면 무한 대기 상태를 표시하는 시퀀스./? 이건 또 언제 필요할까?
- 타임 아웃 및 장기 실행 작업 제어
// 사용자가 명시적으로 취소할 때까지 실행되는 장기 작업 func startLongRunningTask() -> Observable<Progress> { return longRunningOperation() .takeUntil(cancelButton.rx.tap) .timeout(.seconds(60), scheduler: MainScheduler.instance, other: .never()) }
타임아웃이 발생해도 작업을 계속 진행하도록 합니다. 타임아웃 발생 후 작업 완료를 기다리되, 더이상 진행 상황을 보고하지 않음!
- 조건부 스트림 제어
func fetchUserData(userId: String?) -> Observable<UserProfile> { // 유효하지 않은 사용자 ID가 제공된 경우 guard let validUserId = userId, !validUserId.isEmpty else { if isDebugMode { return .error(AppError.invalidUserId) // 디버그 모드에서는 오류 발생 } else { return .never() // 프로덕션 환경에서는 무한 대기 } } return apiClient.fetchUser(id: validUserId) }
잘못된 입력이 제공됐을때 오류를 발생시키는 대신, "무한 대기" 상태로 전환하여 상위계층의 코드가 계속 실행될수 있도록 할 수 있다.
4. From
배열과 같은 시퀀스 타입을 받아 각 요소를 개별적인 이벤트로 방출하는 Observable을 생성합니다.
let numbers = [1, 2, 3, 4, 5] let observable = Observable.from(numbers) observable.subscribe(onNext: { element in print("요소: \(element)") }, onCompleted: { print("완료!") }).disposed(by: disposeBag) // 출력: // 요소: 1 // 요소: 2 // 요소: 3 // 요소: 4 // 요소: 5 // 완료!
5. Just
: 특정 항목을 방출하는 Observable
extension ObservableType { /** Returns an observable sequence that contains a single element. - seealso: [just operator on reactivex.io](http://reactivex.io/documentation/operators/just.html) - parameter element: Single element in the resulting observable sequence. - returns: An observable sequence containing the single specified element. */ public static func just(_ element: Element) -> Observable<Element> { Just(element: element) } final private class Just<Element>: Producer<Element> { private let element: Element init(element: Element) { self.element = element } override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { observer.on(.next(self.element)) observer.on(.completed) return Disposables.create() }}
next 이벤트로 본인의 element만 넣고 바로 completed 시켜버린다. From을 Just로 바꾸면 [1,2,3,4,5]가 출력이됌. 단일요소로
Disposable
모든 create를 보면 Disposables를 생성하고 있다. 이게 뭘까?
public protocol Disposable { func dispose() } extension Disposable { public func disposed(by: bag: DisposeBag){ bag.insert(self) } }
Disposable은 구독 관계를 관리하고 리소스를 정리하는 매커니즘입니다. 결국 Disposables는 Disposable프로토콜을 채택하는 객체를 생성한다고 생각하면 될거같다.
Observable.of(1,2) .subscribe { print($0) } .disposed(by: disposeBag)
그래서 cancel처럼 bag에 넣어준다. Disposable이 Extension으로 disposed(by:)함수를 가지고 있어 여러 Disposable을 모아서 객체가 해제될때 모두 한꺼번에 정리해줍니다.
1. Observable이 구독될때 Disposable 객체 생성
2. 구독이 활성화되어 있는 동안 참조 유지
3. 시퀀스가 .completed 또는 .error로 정상 종료될때, 구독자가 명시적으로 dispose()호출할때 종료
참고자료
https://github.com/fimuxd/RxSwift/blob/master/Lectures/02_Observables/Ch2.%20Observables.md
RxSwift/Lectures/02_Observables/Ch2. Observables.md at master · fimuxd/RxSwift
RxSwift를 스터디하는 공간. Contribute to fimuxd/RxSwift development by creating an account on GitHub.
github.com
'반응형프로그래밍' 카테고리의 다른 글
RxSwift(4)-Combining Operators (0) 2025.05.02 RxSwift(3)-Filtering Operators & TransForming Operators (0) 2025.04.29 RxSwift(2)-Subject (0) 2025.04.29 다음글이전글이전 글이 없습니다.댓글