본문 바로가기

IT/RxJava, RxAndroid, RxKotlin

[RxJava,RxKotlin] RxJava , Observable 알아보기

728x90
SMALL

안녕하세요 남갯입니다

개인적으로 RxJava,RxKotlin 에 대해 정리해보려고합니다.



ReactiveX란 ?
ReactiveX 는 비동기 프로그래밍 그리고 Observable 시퀀스를 이용해 이벤트를 처리하기위한 라이브러리입니다.
즉 위 말을 간단하게 말해보면 비동기적 데이터흐름을 처리하는 프로그래밍입니다. 
Rxjava는 2013년 2월 넷플리스 기술블로그에서 처음 소개되었고 REST기반의 서비스를 개선하고자 프로젝트를 진행했고 .net환경의 rx를 JVM에 포팅하여 만들어지게 되었습니다.







Observable이란?

ReactiveX는 옵저버패턴을 사용하기 때문에 rx의 Observer는 Observable을 구독하게 됩니다. Observable이 emit하는 하나 혹은 연속된 item 에 대해 Observer에게 알림을 보냅니다. Rxjava는 Observable의 시작이면서 Observable의 끝이라고 할정도로 중요한 개념입니다. Observable은 
onNext , onError, onComplete의 세가지 알림을 구독자에게 전달합니다.

onNext : Observable이 데이터 발행을 알림
onError : error가 발생했음을 알리고 Observable을 종료
onComplete : 모든 이벤트가 발행을 완료했음을 알립니다 이벤트가 발생한 후 onNext를 발행해서는 안됩니다.

 기존 RxJava 1.x에서 Observable과 Single로 구성했지만  RxJava 2.x부터 Observable, Maybe, Flowable 클래스로 구분지어 사용하게 됩니다.








스케줄러란?
스케줄링은 다중 프로그래밍을 가능하게 하는 운영 체제의 동작 기법이다. 운영 체제는 프로세스들에게 CPU 등의 자원 배정을 적절히 함으로써 시스템의 성능을 개선할 수 있다.  - 위키백과
Observable을 연산자 체인을 하고 멀티스테딩을 적용하기 위해선 특정 스케줄러를 사용해서 실행하면 됩니다.
보통 스케줄러를 지정하기위해서는 RxAndroid 즉 안드로이드에서는 두가지를 사용하게되는데

* ObserveOn : SubscribeOn된 스레드를Observable의 체인 이후에 사용할 스레드를 변경할때 사용됩니다.
* SubScribeOn : Observable연산을 사용하기위해 처음 사용할 스레드를 지정하는것입니다.



파란색 subscribeOn을 통해 메서드 체인을 통해 생성했는데,  메서드 체인의 뒤쪽에 놓아도 
subscribe 먼저 동작하고 observeOn을 하게되면 해당의 모양이나 색깔로 변경되어 동작하는 그림을 볼 수 있습니다.

위의 내용이 더 궁금하시면 스케쥴러관련 문서를 보면 확실하게 이해가 될것같습니다.

실제 지정할 스케쥴러는
아래의 출처에서 확인해서 사용하시면 됩니다.

Schedulers.computation() - 이벤트 룹에서 간단한 연산이나 콜백 처리를 위해 사용됩니다.
RxComputationThreadPool라는 별도의 스레드 풀에서 돌아갑니다. 최대 갯수 개의 스레드 풀이 순환하면서 실행됩니다.
Schedulers.immediate() - 현재 스레드에서 즉시 수행합니다. 
observeOn()이 여러번 쓰였을 경우 immediate()를 선언한 바로 윗쪽의 스레드를 따라갑니다.
Schedulers.from(executor) - 특정 executor를 스케쥴러로 사용합니다.
Schedulers.io() - 동기 I/O를 별도로 처리시켜 비동기 효율을 얻기 위한 스케줄러입니다.
자체적인 스레드 풀 CachedThreadPool을 사용합니다. API 호출 등 네트워크를 사용한 호출 시 사용됩니다.
Schedulers.newThread() - 새로운 스레드를 만드는 스케쥴러입니다.
Schedulers.trampoline() - 큐에 있는 일이 끝나면 이어서 현재 스레드에서 수행하는 스케쥴러.
AndroidSchedulers.mainThread() - 안드로이드의 UI 스레드에서 동작합니다.
HandlerScheduler.from(handler) - 특정 핸들러 handler에 의존하여 동작합니다.
출처: https://tiii.tistory.com/18 [안드로이드 개발 팁 블로그]







Observable 생성하기
Rxjava에서는 옵저버블을 생성하는 다양한 방법을 제공한다 리스트 값을 하나씩 발행하거나 지연 혹은 아무것도 발행시키지 않고 종료하거나 에러를 발생시키는 방법이 있다.

-Observable을 생성하는 방법 출처 https://brunch.co.kr/@lonnie/18



Obseravble.create()
직접적인 코드 구현을 통해 옵저버 메서드를 호출하여 Observable을 생성한다.


val observable1: Observable<String> = Observable.create {
it.onNext("a")
it.onNext("b")
it.onComplete()
}
observable1.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(
{ loge("onNext : ${it}") },
{ loge("onError : ${it}") },
{ loge("onComplete") }
)

// onNext : a

// onNext : b

// onComplete 




Obseravble.defer()
Observer가 구독하기전까지 Observable을 생성하고 있지 않다가 구독하고 나면 각각의 새로운 Observable을 만듭니다.
Lazy initalize(지연 초기화) 형태입니다.



val observable2: Observable<String> = Observable.defer {
loge("createTest CreateObservable :")
Observable.just("1", "2", "3", "4")
}

observable2.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(
{ loge("onNext : ${it}") },
{ loge("onError : ${it}") },
{ loge("onComplete") }
)

// onNext : 1

// onNext : 2

// onNext : 3

// onNext : 4

// onComplete






Obseravble.just()
특정 파라미터로 받아서 아이템을 옵져버블로 발행한다. 최대 10개까지의 아이템을 발행 가능하다. 
기본적으로 간단하게 Obseravle을 테스트할때 많이 사용한다.



val observableJust: Observable<String> = Observable.just("1", "2","abc","1234","aaa")
observableJust
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(
{ loge("onNext : ${it}") },
{ loge("onError : ${it}") },
{ loge("onComplete") }
)

// onNext : 1

// onNext : 2

// onNext : abc

// onNext : 1234

// onNext : aaa

// onComplete


Observable.fromCallable()

아마 자주사용하는 것중에 하나일것이다. 구독이 발생할때 call()함수가 호출되는 lazy initailze(지연초기화) 하는 함수이다 


val fromCallableObservable: Observable<Int> = Observable.fromCallable {
1
}
fromCallableObservable
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(
{ loge("onNext : ${it}") },
{ loge("onError : ${it}") },
{ loge("onComplete") }
)

// onNext : 1

// onComplete


Observable.interval()

 특정 시간별로 연속된 정수형을 배출하는 Observable을 생성한다


val intervalObservable: Observable<Long> =
Observable.interval(2, TimeUnit.SECONDS)
.take(5)

intervalObservable.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(
{ loge("onNext : ${it}") },
{ loge("onError : ${it}") },
{ loge("onComplete") }
)


 실제 2초간격으로 0부터 4까지 발행된것을 알 수 있다.

* Take Observable이 배츨한 처음 n개의 항목들만 배출한다



Observable.range()

특정 범위의 시작점부터 원하는 갯수만큼의 정수형 아이템을 emit한다.


val start = 5
val count =3
val rangeObservable: Observable<Int> =
Observable.range(start,count)
rangeObservable.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(
{ loge("onNext : ${it}") },
{ loge("onError : ${it}") },
{ loge("onComplete") }
)

// onNext : 5

// onNext : 6

// onNext : 7

// onComplete



Observable.timer()

특정 시간 이후에 정수형 0의 아이템을 발행한다.


val timerObservable: Observable<Long> =
Observable.timer(5, TimeUnit.SECONDS)
.doOnSubscribe { loge("onSubscribe") }
timerObservable.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(
{ loge("onNext : ${it}") },
{ loge("onError : ${it}") },
{ loge("onComplete") }
)


위의 설명과 함께 그밖에 많은 함수들을 갖고있다

empty :  아무런 아이템을 발행하지 않고 옵저버블 생성

never : 아무런 아이템 발행안하고 완료를 안하는 옵저버블 생성

throw : 에러를 발행하는 옵저버블 생성

728x90
LIST

'IT > RxJava, RxAndroid, RxKotlin' 카테고리의 다른 글

[RxJava] flowable  (2) 2019.05.08
[RxJava, RxKotlin,RxAndroid] concat , merge, zip 알아보기  (9) 2019.02.26