Rx
Rx란 ReactiveX(Reactive Extensions)의 약자로, 리액티브 프로그래밍을 위한 라이브러리입니다.
ReactiveX는 비동기 프로그래밍과 Observable 시퀀스를 이용해 이벤트를 처리하기 위한 반응형 프로그래밍 기법입니다.
- 효율적으로 신속하게 비동기 처리를 도와줍니다.
- 함수형 프로그래밍을 일부 지원합니다.
- 옵저버 패턴을 사용합니다.
- 콜백에서 또 콜백을 하는 콜백 지옥에서 벗어날 수 있습니다.
ReactiveX에는 다음과 같이 여러 가지가 존재합니다.
- RxJava : Java(JVM)를 위한 ReactiveX
리액티브 프로그래밍 패러다임을 자바에서 구현한 프로그래밍 라이브러리입니다. - RxKotlin : Kotlin을 위한 ReactiveX
RxJava 라이브러리를 기반으로 포팅하여 코틀린을 위한 리액티브 프로그래밍의 특정 부분을 함수형 프로그래밍으로써 구현한 라이브러리입니다. - RxAndroid : Android를 위한 ReactiveX
RxJava에 최소한의 클래스를 추가하여 안드로이드 앱에서 리액티브 구성요소를 쉽고 간편하게 사용하게 만드는 라이브러리입니다.
Observable
ReactiveX를 다시 설명하자면 이벤트 스트림 또는 데이터 스트림을 만들고 사용자의 입맛에 맞게 연산자를 통해서 이를 변형시킨 후, 이에 관심있는 사람들이 해당 스트림을 subscribe하여 결과를 받는 것입니다.
여기서 Observer(관찰자)는 Observable(발행자)을 Subscribe(구독)하고, Observable이 발행하는 데이터에 반응합니다.
즉, Observable은 하나 혹은 연속된 데이터(아이템)을 발행하는 역할을 가지고 있습니다.
그리고 데이터가 발행되면 그 시점을 감시하는 관찰자를 Observer 안에 두고, 관찰자를 통해 Observer는 데이터 발행 알림을 받습니다.
Observable은 다음의 이벤트를 사용하여 Observer에게 알림을 전달합니다.
- onSubscribe(d: Disposable) : Observer가 구독을 신청하면 호출됩니다. Disposable 객체는 dispose()을 통해 Observer가 구독을 해제할 때 사용합니다.
- onNext(item: T) : 하나의 소스 Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터를 발행합니다.
- onComplete() : 데이터 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 더는 onNext() 호출이 발생하지 않음을 나타냅니다.
- onError(e: Throwable) : 오류가 발생했음을 Observer에게 전달합니다.
이 메서드들은 Emitter라는 Observer Interface에 선언되어 있습니다.
위의 메서드를 사용하여 Observable은 이벤트를 통지하고, Observer는 이벤트 알림을 받아서 적절하게 처리하고 사용자에게 이를 보여주는 역할을 합니다.
Observable 생성하기
RxKotlin에서는 연산자(Operator)라고 부르는 여러 메서드를 통해서 기존 데이터를 참조하거나 변형하여 Observable을 생성할 수 있습니다.
1. create()
- create()를 사용하면 Emitter를 이용하여 직접 아이템을 발행하고, 아이템 발행의 완료 및 오류의 알림을 직접 설정할 수 있습니다.
- Observable을 생성할 때 onError()와 onComplete()는 한 번씩 호출하는 것이 좋습니다.
- 주의할 점은 create() 연산자는 개발자가 직접 Emitter를 제어하므로 주의하여 사용해야 합니다.
- 예로, Observable이 폐기되었을 때 등록된 콜백을 모두 해제하지 않으면 메모리 누수가 발생합니다.
// Observable 생성
val observable = Observable.create<String> { emitter ->
emitter.onNext("First")
emitter.onNext("Second")
emitter.onError(Throwable("에러 발생!"))
emitter.onComplete()
}
// Observer 생성
val observer = object : Observer<String> {
override fun onSubscribe(d: Disposable) {
println("onSubscribe() : $d")
}
override fun onNext(t: String) {
println("onNext() : $t")
}
override fun onError(e: Throwable) {
println("onError() : ${e.message}")
}
override fun onComplete() {
println("onComplete()")
}
}
// 1. Kotlin 확장 함수를 사용하여 subscribe
observable.subscribeBy(
onNext = { data -> println("Data : $data") },
onError = { error -> println("Error : ${error.message}") },
onComplete = { println("Complete!") }
)
// 2. 메서드를 사용하여 Observer가 Observable 구독
observable.subscribe(observer)
/*
<결과>
1.
Data : First
Data : Second
Error : 에러 발생!
Complete!
2.
onSubscribe() : null
onNext() : First
onNext() : World
onError() : 에러 발생!
onComplete()
*/
2. just()
- just() 연산자는 해당 아이템을 그대로 발행하는 Observable을 생성해 줍니다. 연산자의 인자로 넣은 아이템을 차례로 발행하며, 한 개의 아이템을 넣을 수 있고, 타입이 같은 여러 개의 아이템을 넣을 수도 있습니다.
- just() 연산자에 null을 전달하면 null을 발행합니다. 만약 아무런 아이템을 발행하지 않은 Observable을 만들고 싶다면 Observable.empty() 연산자를 사용해야 합니다.
- fromXXX() 메서드와 비슷해 보이지만 just()는 단일 데이터만을 다룰 수 있고, fromXXX()은 단일 데이터가 아닌 경우에도 사용할 수 있습니다.
val observable = Observable.just("First", "Second")
observable.subscribe { data -> println("Data : $data") }
/*
<결과>
Data : First
Data : Second
*/
3. fromXXX()
- 이미 참조할 수 있는 배열 및 리스트 등의 자료 구조나 Future, Callable 또는 Publisher가 있다면 from으로 시작하는 연산자를 통해서 Observable로 변환할 수 있습니다.
- fromArray : 배열을 ObservableSource로 변환하여 아이템을 순차적으로 발행합니다.
- fromIterable : ArrayList, HashSet처럼 Iterable을 구현한 모든 객체를 ObservableSource로 변환하여 아이템을 순차적으로 발행합니다.
- fromFuture : Future 인터페이스를 지원하는 모든 객체를 ObservableSource로 변환하고 Future.get() 메서드를 호출한 값을 반환합니다.
- fromCallable : Callable을 Observable로 변환합니다.
- fromPublisher : Publisher를 Observable로 변환합니다.
// fromArray()
// 자바 -> Observable.fromArray(array)
// 코틀린 -> Array.toObservable()
val itemArray: Array<Int> = arrayOf(1, 2, 3)
val arrayObservable = itemArray.toObservable()
arrayObservable.subscribe {
println(it)
}
// fromIterable()
// Iterable 인터페이스를 구현하는 모든 클래스 사용 가능
val itemList = arrayListOf(1, 2, 3)
val iterableObservable = Observable.fromIterable(iterableObservable)
iterableObservable.subscribe {
println(it)
}
/*
<결과>
1
2
3
*/
// fromFuture()
// 비동기적인 작업 결과를 구할 때 사용
// 멀티쓰레드 환경에서 처리된 어떤 데이터를 다른 쓰레드에 전달 가능
val future = Executors.newSingleThreadExecutor().submit<String> { "Hello World!" }
val futureObservable = Observable.fromFuture(future)
futureObservable.subscribe {
println(it)
}
// fromCallable()
// 비동기적인 실행 결과 반환
val callable = Callable<String> {
"Hello World!"
}
val callableObservable = Observable.fromCallable(callable)
callableObservable.subscribe {
println(it)
}
// fromPublisher()
// 잠재적인 아이템 발행 제공
val publisher = Publisher<String> {
it.onNext("Hello World!")
it.onComplete()
}
val publisherObservable = Observable.fromPublisher(publisher)
publisherObservable.subscribe {
println(it)
}
/*
<결과>
Hello World!
*/
4. empty(), never(), error()
- empty(), never(), error() 연산자 모두 아이템을 발행하지 않지만, onComplete() 또는 onError() 호출 여부에 차이점이 있습니다.
- empty() 연산자는 아이템을 발행하지는 않지만, 정상적으로 스트림을 종료시킵니다.
- never() 연산자는 아이템을 발행하지는 않지만, 스트림을 종료시키지도 않습니다.
- error() 연산자는 아이템을 발행하지는 않지만, 에러를 발생시킵니다.
Observable.empty<String>()
.subscribeBy(
onNext = { println("empty() : onNext")},
onError = { println("empty() : onError")},
onComplete = { println("empty() : onComplete")}
)
Observable.never<String>()
.subscribeBy(
onNext = { println("never() : onNext")},
onError = { println("never() : onError")},
onComplete = { println("never() : onComplete")}
)
Observable.error<String>(Throwable("Error"))
.subscribeBy(
onNext = { println("error() : onNext")},
onError = { println("error() : onError")},
onComplete = { println("error() : onComplete")}
)
/*
<결과>
empty() : onComplete
error() : onError
*/
5. interval()
- interval() 연산자는 주어진 시간 간격으로 순서대로 정수를 발행하는 Observable을 생성합니다.
- 주의할 점은 구독을 중지하기 전까지 무한히 데이터를 발행하므로 적절한 시점에 폐기해야 합니다.
// interval(period: Long, unit: TimeUnit)
// period: 시간 크기
// unit: 시간 단위(TimeUnit.MILLISECONDS, TimeUnit.SECONDS 등)
val intervalObservable = Observable.interval(1, TimeUnit.SECONDS)
.subscribe {
println(it)
}
Thread.sleep(5000)
intervalObservable.dispose() // 아이템 발행 중단
/*
<결과>
0
1
2
3
4
*/
6. range()
- range() 연산자는 특정 범위의 정수를 순서대로 발행하는 Observable을 생성합니다.
- interval() 연산자와 비슷하지만 특정 범위의 아이템을 발행하고, 발행이 끝나면 스트림을 종료시킨다는 점에서 차이가 있습니다.
// range(start: Int, count: Int)
// start : 시작 값
// count : 생성할 숫자의 갯수
Observable.range(0, 5)
.subscribe {
println(it)
}
/*
<결과>
0
1
2
3
4
*/
7. timer()
- timer() 연산자는 특정 시간 동안 지연시킨 뒤, 아이템을 발행하고 종료합니다.
// timer(delay: Long, unit: TimeUnit)
Observable.timer(5, TimeUnit.SECONDS)
.subscribe {
println("Timer 끝!")
}
println("시작!")
Thread.sleep(6000)
/*
<결과>
시작!
Timer 끝!
*/
8. defer()
- defer() 연산자는 Observer가 구독(subscribe)할 때까지 Observable 생성을 지연시킵니다.
- 즉, subscribe() 메서드를 사용해 구독을 요청해야지만 Observable 아이템을 생성합니다.
val justSrc = Observable.just(
System.currentTimeMillis()
)
val deferSrc = Observable.defer {
Observable.just(
System.currentTimeMillis()
)
}
// 현재 시간 출력
println("#1 now = ${System.currentTimeMillis()}")
try {
Thread.sleep(5000)
} catch (ie: InterruptedException) {
ie.printStackTrace()
}
println("#2 now = ${System.currentTimeMillis()}")
// just만을 사용하여 바로 아이템 발행 -> #1 now와 비슷
justSrc.subscribe {
println("#1 time = $it")
}
// defer를 사용하여 구독이 들어왔을 때 아이템 발행 -> #2 now와 비슷
deferSrc.subscribe {
println("#2 time = $it")
}
/*
<결과>
#1 now = 1663741055883
#2 now = 1663741060887
#1 time = 1663741055816
#2 time = 1663741060900
*/
SubScribe와 Disposable
Subscribe
subscribe() 메서드는 Observer를 Observable에 연결하는 메서드입니다.
이는 Observable이 발행하는 아이템을 받고, error() 또는 complete() 알림을 받기 위해서 사용합니다.
subscribe() 메서드는 다음과 같이 파라미터에 따라서 다양하게 오버로딩되어 있습니다.
- subscribe() : Disposable
- subscribe(onNext: Consumer!) : Disposable
- subscribe(onNext: Consumer!, onError: Consumer!) : Disposable
- subscribe(onNext: Consumer!, onError: Consumer!, onComplete: Action!) : Disposable
- subscribe(onNext: Consumer!, onError: Consumer!, onComplete: Action!, onSubscribe :Consumer!) : Disposable
- subscribe(observer: Observer) : Disposable
해당 메서드들은 모두 Disposable 객체를 반환합니다.
유한한 아이템을 발행하는 Observable의 경우, onComplete() 호출로 안전하게 종료됩니다.
하지만 무한하게 아이템을 발행하거나 오랫동안 실행되는 Observable의 경우에는, 구독이 더는 필요하지 않을 때 메모리 누수 방지를 위해서 Disposable 클래스의 dispose() 메서드를 사용해 아이템 발행을 중단할 수 있습니다.
Disposable
- dispose() 메서드를 호출하면 아이템 발행을 중지시키고 리소스를 폐기할 수 있습니다.
- 리소스가 이미 폐기되었는지 확인하는 데 Disposable.isDisposed() 메서드를 활용할 수 있습니다.
- onComplete()를 명시적으로 호출하거나 호출됨을 보장한다면 dispose()를 호출할 필요가 없습니다.
- Observer가 여러 곳에 있고, 이들을 폐기하려면 각각의 Disposable 객체에 대해서 dispose()를 호출해야 합니다. 이때, CompositeDisposable을 이용하면 이들을 한꺼번에 폐기할 수 있습니다.
val observable = Observable.interval(1, TimeUnit.SECONDS)
// 1초에 한 번씩 아이템 발행(무한히)
val disposable = observable.subscribe {
println("Disposable1 : $it")
}
val disposable2 = observable.subscribe {
println("Disposable2 : $it")
}
val disposable3 = observable.subscribe {
println("Disposable3 : $it")
}
val compositeDisposable = CompositeDisposable()
compositeDisposable.addAll(disposable, disposable2, disposable3)
Thread().apply {
try {
Thread.sleep(3500)
} catch (ie: InterruptedException) {
ie.printStackTrace()
}
// 아이템 발행 해제
compositeDisposable.dispose()
}.start()
/*
결과
Disposable2 : 0
Disposable1 : 0
Disposable3 : 0
Disposable3 : 1
Disposable1 : 1
Disposable2 : 1
Disposable1 : 2
Disposable3 : 2
Disposable2 : 2
*/
'안드로이드 > 개념' 카테고리의 다른 글
[Android] Flow (0) | 2023.12.01 |
---|---|
[Android] 코루틴 (0) | 2023.11.27 |
[Android] 앱 아키텍처 패턴 (MVC, MVP, MVVM, MVI) (0) | 2023.10.27 |
[Android] 정규표현식 (0) | 2023.10.11 |
[Android] LiveData (0) | 2023.10.09 |