본문 바로가기

IT/RxJava, RxAndroid, RxKotlin

[RxJava, RxKotlin,RxAndroid] concat , merge, zip 알아보기

728x90
SMALL

안녕하세요 남갯입니다


오늘은 concat 과 merge 와 zip를 알아보도록 하겠습니다.


-concat


concat은 두개의 Observable을 합쳐 첫번째 Observable을 발행 한 뒤 두번째 Observable을 발행합니다.


위쪽의 Timestamp와 아래쪽의 Timestamp는 동일한 시기에 각각 데이터를 발행했지만

concat 이후에 보면


위쪽 데이터 이후에 아래쪽 데이터가 발행되는것을 볼 수 있습니다.


즉 요즘은 해당 api를 부르기위해 auth를 요청받는 api가 있는데, 그때 이용하게 되면 좋을것 같습니다. 해당 세션이 유효한지 판단 한 후

원하는 api를 요청할 때 이용하면 좋을것 같습니다.



val test1 = Observable.just("1", "2", "3").delay(2, TimeUnit.SECONDS)
val test2 = Observable.just("apple", "banana", "car")
val test3 = Observable.interval(2, TimeUnit.SECONDS)

dp.add(Observable.concat(test1, test2).subscribe({
Log.e("concat observable", it)
}, {
Log.e("concat observable", "error")
}, {
Log.e("concat observable", "complete")
}))

여기서 보게되면 test1을 1,2,3을 발행하게 해놓고 2초뒤에 발행하도록 하였고

test2는 apple, banana, car를 발행하도록 해서 두개의 데이터를 concat하였습니다.



이걸 timestamp로 표현하면 









결과는 test1이 더 늦게 발행되야함에도 불구하고

1,2,3 이 발행된 뒤에 apple, banana, car가 발행됨을 볼 수 있습니다.






- merge


merge는 두개의 Observable를 merge시켜 하나로 결합해 전송하는걸 말합니다. merge 를 사용함으로써 마치 두개의 Observable을 결합해 하나의 Observable처럼 동작 시킬수있습니다.




출처: 공식문서 http://reactivex.io/documentation/operators/merge.html



위의 timestamp에서는 20,40,60,80,100을 발행하고

아래의 timestamp에서는 1과 1을 발행합니다.


두개의 다른 timestamp를 합쳐 하나의 timestamp에서 발행하는것처럼 변경해줍니다.


20 , 40 , 60 , 1 , 80 , 100, 1 이렇게 말이죠


하지만 둘중에 하나가 onError를 타게되면 그 즉시 멈추게 됩니다.


그걸 방지하기위해 mergeDelayError를 쓰면 됩니다.



위의 그림과같이 timestamp로 하나씩 발행 한 뒤 onError난 부분을 빼고 발행하고나서 onError를 발행하게 되어있습니다.




소스코드로 한번 보도록 하겠습니다.

val test1 = Observable.just("1", "2", "3").delay(2, TimeUnit.SECONDS)
val test2 = Observable.just("apple", "banana", "car")
val test3 = Observable.interval(2, TimeUnit.SECONDS)

dp.add(Observable.merge(test1, test2).subscribe({
Log.e("merge observable", it)
}, {
Log.e("merge observable", "error")
}, {
Log.e("merge observable", "complete")
}))


여기서 볼 수 있는건 test1은 delay시켰고 test2는 바로 발행시켰습니다.








이렇게 발행되는것을 볼 수 있습니다.








-zip

zip은 여러개의 Observable을 합쳐서 전송하게 됩니다. 특정 item 끼리 합쳐서 두개의 발행을 합쳐서 내려주게 됩니다.



위쪽 Timestamp에서는 1,2,3,4를 발행하였고

아래쪽 Timestamp에서는  a,b,c,d를 발행했습니다.

두개를 zip 했을때  1a, 2b, 3c, 4d,를 발행하는것을 알 수 있습니다.


즉 가장 최근에 zip되지 않은 데이터끼리 zip 하는것을 알 수 있습니다.


이번엔 test2와 test3을 통해 zip을 시켜보도록 하겠습니다.


val test1 = Observable.just("1", "2", "3").delay(2, TimeUnit.SECONDS)
val test2 = Observable.just("apple", "banana", "car")
val test3 = Observable.interval(2, TimeUnit.SECONDS)

Observable.zip(test3, test2, BiFunction { t1: Long, t2: String -> t1.toString() + t2 })
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Log.e("zip observable ${System.currentTimeMillis()}", it)
}, {

}, {
Log.e("zip observable ", "complete")
})





위와같이 약 2초간격으로 발행되며 합쳐지는것을 볼 수 있습니다.



출처: https://programmingfbf7290.tistory.com/entry/6-RxJava-merge



728x90
LIST

'IT > RxJava, RxAndroid, RxKotlin' 카테고리의 다른 글

[RxJava] flowable  (2) 2019.05.08
[RxJava,RxKotlin] RxJava , Observable 알아보기  (0) 2019.04.18