RxJS 의 개념
싱글 (Single Item) | 멀티플 (Multiple Items) | |
PULL (Sync) | 함수 (Function) | 이터레이터 (Iterator) - array, symbol, generator |
PUSH (Async) | 프로미스 (Promise) - async & await |
옵저버블 (Observable) |
- 싱글: 하나의 값이나 이벤트를 다루는 것
- 멀티플: 여러 개의 값이나 이벤트를 다루는 것
- PULL: 데이터를 받을지 결정하는 것
- PUSH: 데이터를 보낼지 결정하는 것
Reactive Programming
명령형 프로그래밍 언어에서 표현식은 순차적으로 실행된다.
a = 10;
b = 20;
c = a + b; // c = 30
a = 40; // c = 30
반응형 프로그래밍 언어에서는 데이터 흐름의 변경으로 인해 변수가 변경될 수 있다.
a = 10;
b = 20;
c = a + b; // c = 30
a = 40; // c = 60
반응형 프로그래밍은 시스템 내 다른 시스템이나 구성 요소 간의 상호 작용을 의미할 수 있다.
이런 부분에서 RxJs 와 같은 라이브러리는 메시지 브로커(RabbitMQ, Kafka, MSMQ 등)를 대체하지는 않는다.
리액티브 선언문
https://www.reactivemanifesto.org/ko
리액티브 선언문
탄력성(Resilient): 시스템이 장애 에 직면하더라도 응답성을 유지 하는 것을 탄력성이 있다고 합니다. 탄력성은 고가용성 시스템, 미션 크리티컬 시스템에만 적용되지 않습니다. 탄력성이 없는 시
www.reactivemanifesto.org
Functional Reactive Programming
기능적 반응형 프로그래밍(FRP)은 Reactiveness 을 달성하기 위해 입력, 변수 변경 등을 포함한 모든 것을 스트림 및 스트림 작업을 사용하는 하위 집합이다.
https://dl.acm.org/doi/abs/10.1145/258948.258973
Functional reactive animation | Proceedings of the second ACM SIGPLAN international conference on Functional programming
ABSTRACT Fran (Functional Reactive Animation) is a collection of data types and functions for composing richly interactive, multimedia animations. The key ideas in Fran are its notions of behaviors and events. Behaviors are time-varying, reactive values, w
dl.acm.org
반응형 프로그래밍은 비동기 데이터 스트림을 사용한 프로그래밍이며, 함수형 리액티브 프로그래밍(FRP)은 함수형 프로그래밍의 구성 요소(map, reduce, filter 등)를 사용하는 리액티브 프로그래밍을 위한 프로그래밍 패러다임이다.
FRP는 시간을 명시적으로 모델링하여 이러한 문제를 단순화하는 것을 목표로 그래픽 사용자 인터페이스(GUls), 로봇 공학 및 음악 프로그래밍에 사용되었다.
RxJava와 RxJS는 이에 상응하는 Java 및 JavaScript 프로그래밍 언어용 라이브러리 측면에서 Functional Reactive Programming 패러다임을 구현하며 Microsoft의 Reactive를 기반으로 파생되었다.
지원 언어로는 Java(RxJava), JavaScript(RxJS), C#(Rx.NET), C# Unity(UniRx), 스칼라(RxScala), Clojure(RxClojure), C++( RxCpp), 루비(Rx.rb), Python(RxPY), Groovy(RxGroovy), JRuby(RxJRuby), Kotlin(RKotlin), Swift(RxSwift) 가 있으며, 플랫폼 및 프레임워크용 ReactiveX으로는 RxNetty, RxAndroid, RxCocoa 가 있다.
Rx 는 다음과 같은 룰이 있다.
RxJs
Rx.Observable.from(["Reactive", "Extensions", "Java"])
.take(2)
.map(function(s) { return s + ": on " + new Date() })
.subscribe(function(s) { console.log(s) });
// Reactive: on [2024-05-02T02:12:00.000Z]
// Extensions: on [2024-05-02T02:12:00.000Z]
RxJava
Observable.from(new String[] {"Reactive", "Extensions", "Java"})
.take(2)
.map(s -> s + ": on " + new Date())
.subscribe(s -> System.out.println(s));
// Reactive: on [2024-05-02T02:19:00.000Z]
// Extensions: on [2024-05-02T02:19:00.000Z]
Observable lifecycle
Observable은 데이터 스트림의 래퍼 역할을 하는 함수이다. 이는 애플리케이션 내에서 메시지 전달을 지원한다. Observable은 관찰자가 구독하기 전까지는 쓸모없고, 관찰자는 관찰 가능 항목에서 방출된 데이터를 소비하는 개체이다. 관찰자는 Observable이 완료되거나, 관찰자가 Observable 구독을 취소할 때까지 Observable로부터 데이터 값을 계속 수신한다. 그렇지 않으면 관찰자는 Observable 항목으로부터 지속적으로 비동기적으로 데이터 값을 받을 수 있다. 따라서 사용자 인터페이스 업데이트, JSON 응답 전달 등 다양한 작업을 수행할 수 있다.
Observable 의 4단계 lifecycle
- 옵저버블 생성 (Creating Observables)
- 옵저버블 구독 (Subscribing to Observables)
- 옵저버블 실행 (Executing the Observable)
- 옵저버블 구독 해제 (Disposing Observables)
참고 : https://medium.com/analytics-vidhya/understanding-rxjs-observables-ad5b34d9607f
Understanding RxJS Observables
Let’s look into Reactive Programming with RxJS
medium.com
주요 연산자
옵저버블 생성 (Creating Observables)
- from: from 연산자는 다양한 데이터 소스에서 옵저버블을 생성
배열, Promise, 이터러블 객체(배열, 문자열 등), 이벤트(클릭 이벤트 등) 등을 사용하여 옵저버블을 생성할 수 있음
from 연산자는 데이터 소스의 각 항목을 옵저버블로 방출함 - interval: interval 연산자는 지정된 주기마다 숫자 시퀀스를 방출하는 옵저버블을 생성
예를 들어, interval(1000)은 1초 간격으로 0부터 시작하여 계속해서 증가하는 숫자 시퀀스를 방출함 - of: of 연산자는 지정된 값을 방출하는 옵저버블을 생성하고 인수로 전달된 값을 순서대로 방출하고 종료함
예를 들어, of(1, 2, 3)은 1, 2, 3을 차례로 방출한 뒤 종료함 - timer: timer 연산자는 지정된 시간 후에 값을 방출하는 옵저버블을 생성
timer는 첫 번째 인수로 초기 지연 시간(ms)과 두 번째 인수로 옵저버블이 방출하는 간격(ms)을 받음
예를 들어, timer(1000, 500)은 초기 1초 후에 시작하며 0부터 0.5초마다 값을 방출하는 옵저버블을 생성함
예시 코드
RxJs 에서는 지원하지만, RxJava 에서 지원하지 않는 부분은 주석으로 처리해 두었다.
public static void main(String[] args) {
// from 연산자를 사용하여 배열에서 옵저버블 생성
Integer[] array = {1, 2, 3, 4, 5};
Observable<Integer> fromObservable = Observable.fromArray(array);
fromObservable.subscribe(System.out::println);
// interval 연산자를 사용하여 주기적으로 값을 방출하는 옵저버블 생성
Observable<Long> intervalObservable = Observable.interval(1, TimeUnit.SECONDS);
intervalObservable.take(5).subscribe(System.out::println); // 처음 5개 값만 출력
// of 연산자를 사용하여 값들을 순차적으로 방출하는 옵저버블 생성하려 했으나, 3 버전에서 of 를 찾을 수 없음
// Observable<Integer> ofObservable = Observable.of(1, 2, 3, 4, 5);
// ofObservable.subscribe(System.out::println);
// timer 연산자를 사용하여 지정된 시간 후에 값을 방출하는 옵저버블 생성
Observable<Long> timerObservable = Observable.timer(1, TimeUnit.SECONDS);
timerObservable.subscribe(System.out::println);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
조건 연산자 (Conditional Operators)
조건 연산자는 옵저버블의 값을 조건부로 처리하고, 기본값을 제공하거나 옵저버블 간의 비교를 한다.
- defaultIfEmpty
- every
- sequenceEqual
예시 코드
RxJava 에서 지원하지 않는 부분은 주석으로 처리해 두었다.
public static void main(String[] args) {
// defaultIfEmpty 연산자를 사용하여 비어있는 옵저버블에 기본값을 제공하는 예시
Observable.empty()
.defaultIfEmpty("옵저버블이 비어있습니다.")
.subscribe(System.out::println);
// every 연산자를 사용하여 모든 값이 조건을 만족하는지 확인하는 예시
// Observable.range(1, 10)
// .every(i -> i < 10)
// .subscribe(result -> System.out.println("모든 값이 조건을 만족하는가? " + result));
// sequenceEqual 연산자를 사용하여 두 개의 옵저버블이 동일한지 비교하는 예시
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(1, 2, 3);
Observable.sequenceEqual(observable1, observable2)
.subscribe(result -> System.out.println("두 옵저버블이 동일한가? " + result));
}
결합 연산자 (Combination Operators)
결합 연산자를는 두 개 이상의 옵저버블을 결합하거나 조작하는데 사용되며, 여러개의 옵저버블을 결합하여 단일 옵저버블로 만들거나, 여러 옵저버블로부터 동시에 데이터를 받아 처리할 수 있다.
- combineLatest: 여러개의 옵저버블에서 값이 방출될 때 마다 최신 값을 기반으로 새로운 값을 생성하여 방출
모든 옵저버블이 최소한 하나의 값을 방출한 후부터는 어느 하나의 옵저버블에서 값이 방출될 때마다 해당 값과 다른 옵저버블들의 최신 값들을 결합하여 새로운 값을 생성 - concat: 여러개의 옵저버블을 순차적으로 연결하여 하나의 옵저버블로 만듬
첫번째 옵저버블의 모든 값을 방출한 후 두 번째 옵저버블의 값을 방출함
즉, 첫 번째 옵저버블이 완료된 후에 두 번째 옵저버블이 시작 - merge: 여러개의 옵저버블을 병합하여 단일 옵저버블로 만듬
각 옵저버블이 방출하는 값들을 순서대로 모두 방출하며 병합된 옵저버블은 동시에 값을 방출할 수 있음 - race: 여러개의 옵저버블 중 가장 먼저 값을 방출하는 옵저버블을 선택하여 방출함
선택된 옵저버블의 값을 방출하고 나머지 옵저버블들은 무시됨 - startWith: 옵저버블이 방출하기 전에 지정된 초기 값 또는 초기 시퀀스를 방출함
옵저버블이 값을 방출하기 전에 초기값을 먼저 방출함 - withLatestFrom: 하나의 옵저버블과 다른 옵저버블들의 최신 값을 조합하여 새로운 값을 생성하여 방출함
주로 두 번째 옵저버블이 값을 방출할 때 첫 번째 옵저버블의 최신 값을 가져와 조합함 - zip: 여러개의 옵저버블에서 값이 방출될 때 마다 각 옵저버블의 값들을 조합하여 새로운 값을 생성하여 방출
각 옵저버블이 방출한 값들이 순서대로 조합됨
예시 코드
RxJs 에서는 지원하지만 RxJava 에서 지원하지 않는 부분은 주석으로 처리해 두었다.
public static void main(String[] args) {
// combineLatest
Observable<Integer> observableCombineLatest1 = Observable.just(1, 2, 3);
Observable<String> observableCombineLatest2 = Observable.just("A", "B", "C");
Observable.combineLatest(
observableCombineLatest1,
observableCombineLatest2,
(num, letter) -> num + letter
).subscribe(System.out::println);
// 3A
// 3B
// 3C
// concat
Observable<Integer> observableConcat1 = Observable.just(1, 2, 3);
Observable<Integer> observableConcat2 = Observable.just(4, 5, 6);
Observable.concat(observableConcat1, observableConcat2)
.subscribe(System.out::println);
// 1
// 2
// 3
// 4
// 5
// 6
// merge
Observable<Integer> observableMerge1 = Observable.just(1, 2, 3);
Observable<Integer> observableMerge2 = Observable.just(4, 5, 6);
Observable.merge(observableMerge1, observableMerge2)
.subscribe(System.out::println);
// 1
// 2
// 3
// 4
// 5
// 6
// race
// Observable<Integer> observableRace1 = Observable.just(1, 2, 3).delay(500, TimeUnit.MILLISECONDS);
// Observable<Integer> observableRace2 = Observable.just(4, 5, 6).delay(200, TimeUnit.MILLISECONDS);
// Observable.race(observableRace1, observableRace2)
// .subscribe(System.out::println);
// startWith
// Observable<Integer> observableStartWith = Observable.just(1, 2, 3);
// observableStartWith.startWith(0)
// .subscribe(System.out::println);
// withLatestFrom
Observable<Integer> observableWithLatestFrom1 = Observable.just(1, 2, 3);
Observable<String> observableWithLatestFrom2 = Observable.just("A", "B", "C");
observableWithLatestFrom1.withLatestFrom(observableWithLatestFrom2, (num, letter) -> num + letter)
.subscribe(System.out::println);
// 1C
// 2C
// 3C
// zip
Observable<Integer> observableZip1 = Observable.just(1, 2, 3);
Observable<String> observableZip2 = Observable.just("A", "B", "C");
Observable.zip(
observableZip1,
observableZip2,
(num, letter) -> num + letter
).subscribe(System.out::println);
// 1A
// 2B
// 3C
}
필터링 연산자 (Filtering Operators)
옵저버블 스트림에서 특정 조건에 따라 값을 필터링하거나 선택하는데 사용된다. 필터링 연산자를 사용하여 옵저버블 스트림을 조작하여 필요한 값만을 선택할 수 있다.
- debounceTime: 지정된 시간 동안 값이 방출되지 않을 때까지 기다린 후에 가장 최근의 값을 방출함
주로 사용자 입력 필드 같은 곳에서 사용됨 - debounce: 각 항목의 발행 사이에 일정한 시간 간격이 지나면 마지막 항목만 방출함
debounceTime과 비슷하지만 각 항목의 지연 시간을 동적으로 설정할 수 있음 - distinct: 이전에 방출된 값과 동일하지 않은 값만을 방출함
- distinctUntilChanged: 연속으로 동일한 값을 방출할 때에는 새로운 값을 방출하지 않음
- elementAt: 지정된 위치의 항목만을 방출하고 완료함
- filter: 지정된 조건을 만족하는 항목만을 방출함
- find: 지정된 조건을 만족하는 첫 번째 항목을 방출함
- findIndex: 지정된 조건을 만족하는 첫 번째 항목의 인덱스를 방출함
- first: 첫 번째 항목을 방출함
- ignoreElements: 옵저버블이 방출하는 값은 무시하고, 완료 또는 에러만 방출함
- last: 마지막 항목을 방출함
- sample: 다른 옵저버블이 값을 방출할 때, 그 시점에서의 원본 옵저버블의 값을 방출함
- skip: 처음 몇 개의 항목을 건너뛴 후에 항목을 방출함
- skipUntil: 다른 옵저버블이 값을 방출할 때까지 항목을 건너뜀
- skipWhile: 지정된 조건을 만족할 때까지 항목을 건너뛰고, 그 이후의 항목들을 방출함
- take: 처음 몇 개의 항목만을 방출함
- takeLast: 마지막에서부터 지정된 개수의 항목만을 방출함
- takeUntil: 다른 옵저버블이 값을 방출할 때까지의 항목을 방출함
- takeWhile: 지정된 조건을 만족할 때까지 항목을 방출하고, 그 이후의 항목들은 무시함
- throttle: 일정한 시간 간격 동안 값이 연속으로 방출될 때, 첫 번째 값만을 방출함
- throttleTime: 지정된 시간 간격 동안 값이 연속으로 방출될 때, 첫 번째 값만을 방출함
예시 코드
RxJs 에서는 지원하지만, RxJava 에서 지원하지 않는 것은 동일한 기능을 지원하는 키워드로 대체했고, 유사 구현이 애매한 경우 주석으로 처리해 두었다.
public static void main(String[] args) {
// 1. debounceTime
Observable.intervalRange(1, 10, 0, 300, TimeUnit.MILLISECONDS).debounce(200, TimeUnit.MILLISECONDS).subscribe(System.out::println);
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
// 2. debounce
Observable.intervalRange(1, 10, 0, 300, TimeUnit.MILLISECONDS).debounce(item -> Observable.timer(200, TimeUnit.MILLISECONDS)).subscribe(System.out::println);
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
// 3. distinct
Observable.just(1, 2, 2, 3, 3, 3, 4, 5).distinct().subscribe(System.out::println);
// 1
// 2
// 3
// 4
// 5
// 4. distinctUntilChanged
Observable.just(1, 1, 2, 2, 3, 3, 4, 5).distinctUntilChanged().subscribe(System.out::println);
// 1
// 2
// 3
// 4
// 5
// 5. elementAt
Observable.range(1, 5).elementAt(2).subscribe(System.out::println);
// 3
// 6. filter
Observable.range(1, 10).filter(num -> num % 2 == 0).subscribe(System.out::println);
// 2
// 4
// 6
// 8
// 10
// 7. find
// Observable.range(1, 10).find(num -> num > 5).subscribe(System.out::println);
// 8. findIndex
// Observable.range(1, 10).findIndex(num -> num > 5).subscribe(System.out::println);
// 9. first
Observable.range(1, 5).first(0).subscribe(System.out::println);
// 1
// 10. ignoreElements
Observable.just(1, 2, 3).ignoreElements().subscribe(() -> System.out.println("Completed"), Throwable::printStackTrace);
// Completed
// 11. last
Observable.range(1, 5).last(0).subscribe(System.out::println);
// 5
// 12. sample
Observable.intervalRange(1, 10, 0, 300, TimeUnit.MILLISECONDS).sample(800, TimeUnit.MILLISECONDS).subscribe(System.out::println);
// 3
// 6
// 9
// 13. skip
Observable.range(1, 5).skip(2).subscribe(System.out::println);
// 3
// 4
// 5
// 14. skipUntil
Observable.intervalRange(1, 5, 0, 300, TimeUnit.MILLISECONDS).skipUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)).subscribe(System.out::println);
// 5
// 15. skipWhile
Observable.range(1, 5).skipWhile(num -> num <= 2).subscribe(System.out::println);
// 3
// 4
// 5
// 16. take
Observable.range(1, 5).take(3).subscribe(System.out::println);
// 1
// 2
// 3
// 17. takeLast
Observable.range(1, 5).takeLast(2).subscribe(System.out::println);
// 4
// 5
// 18. takeUntil
Observable.intervalRange(1, 5, 0, 300, TimeUnit.MILLISECONDS).takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)).subscribe(System.out::println);
// 1
// 2
// 3
// 4
// 19. takeWhile
Observable.range(1, 5).takeWhile(num -> num <= 3).subscribe(System.out::println);
// 1
// 2
// 3
// 20. throttle
Observable.intervalRange(1, 10, 0, 100, TimeUnit.MILLISECONDS).throttleFirst(200, TimeUnit.MILLISECONDS).subscribe(System.out::println);
// 1
// 4
// 7
// 10
// 21. throttleTime
Observable.intervalRange(1, 10, 0, 100, TimeUnit.MILLISECONDS).throttleLast(200, TimeUnit.MILLISECONDS).subscribe(System.out::println);
// 2
// 5
// 6
// 9
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
수학 연산자 (Mathematical Operators)
Mathematical Operators는 옵저버블 스트림에서 숫자 값들을 처리하고 조작하는 데 사용되는 연산자들을 포함한다. 이 연산자들을 사용하여 옵저버블의 값을 합치거나 최댓값, 최솟값을 계산하고 평균 값을 구할 수 있다.
- count: 옵저버블이 방출하는 항목의 수를 계산함
즉, 옵저버블이 완료될 때까지 발생한 항목의 수를 세어 방출하며 옵저버블이 완료되면 마지막에 총 항목 수를 방출함 - max: 옵저버블이 방출하는 항목 중에서 가장 큰 값을 찾아 방출 함
선택적으로 comparator 함수를 제공하여 방출할 값의 순서를 정의할 수 있음 - min: 옵저버블이 방출하는 항목 중에서 가장 작은 값을 찾아 방출
선택적으로 comparator 함수를 제공하여 방출할 값의 순서를 정의할 수 있음 - reduce: 옵저버블이 방출하는 각 항목에 대해 지정된 함수를 사용하여 단일 결과값을 생성함
초기값을 제공할 수 있으며, 제공되지 않을 경우 첫 번째 항목을 초기값으로 사용되며 이후 각 항목은 이전 결과값과 함께 지정된 함수에 전달되어 결과를 계산하고 최종적으로 완료되면 최종 결과값을 방출함
예시 코드
RxJs 에서는 지원하지만, RxJava 에서 지원하지 않는 것은 유사하게 구현해봤다.
public static void main(String[] args) {
Integer[] numbers = {1, 3, 5, 2, 4};
// 1. count
Observable.fromArray(numbers)
.count()
.subscribe(count -> System.out.println("Total count of numbers: " + count));
// Total count of numbers: 5
// 2. max
Observable.fromArray(numbers)
.reduce((max, next) -> Math.max(max, next))
.subscribe(maxValue -> System.out.println("Max value: " + maxValue));
// Max value: 5
// 3. min
Observable.fromArray(numbers)
.reduce((min, next) -> Math.min(min, next))
.subscribe(minValue -> System.out.println("Min value: " + minValue));
// Min value: 1
// 4. reduce
Observable.fromArray(numbers)
.reduce(0, (sum, next) -> sum + next)
.subscribe(sum -> System.out.println("Sum of numbers: " + sum));
// Sum of numbers: 15
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
변환 연산자 (Transformation Operators)
옵저버블의 값, 구조를 변경하거나 조작하는데 사용된다. 변환 연산자를 사용하여 데이터 스트림을 변경하고 새로운 데이터 스트림을 생성할 수 있다. 이를 통해 데이터 스트림을 필터링하거나 매핑하거나, 여러 개의 스트림을 조합하거나 변경할 수 있다.
- buffer: 일정 갯수나 시간, 다른 옵저버블의 신호에 따라 항목을 그룹화하여 새로운 옵저버블을 생성함
- bufferCount: 지정된 개수만큼의 항목을 모아서 배열로 그룹화한 후 방출함
- bufferTime: 지정된 시간 동안 수신된 항목을 모아서 배열로 그룹화한 후 방출함
- bufferToggle: 지정된 오프닝 및 클로징 옵저버블 신호를 사용하여 버퍼를 열고 닫음
- bufferWhen: 지정된 함수가 true를 반환할 때까지 항목을 모아서 배열로 그룹화한 후 방출함
- concatMap: 각 항목을 새로운 옵저버블로 매핑하고, 이를 순차적으로 결합하여 하나의 옵저버블을 생성함
- concatMapTo: 모든 항목을 지정된 옵저버블로 매핑하고, 이를 순차적으로 결합하여 하나의 옵저버블을 생성함
- map: 각 항목을 새로운 값으로 변환함
- mapTo: 모든 항목을 지정된 값으로 변환함
- mergeMap: 각 항목을 새로운 옵저버블로 매핑하고, 이를 병렬로 결합하여 하나의 옵저버블을 생성함
- mergeMapTo: 모든 항목을 지정된 옵저버블로 매핑하고, 이를 병렬로 결합하여 하나의 옵저버블을 생성함
- pairwise: 연속된 두 항목을 그룹화하여 새로운 항목을 방출함
- pluck: 각 항목에서 지정된 속성 값을 추출하여 새로운 옵저버블을 생성함
- repeat: 옵저버블을 반복하여 방출함
- scan: 각 항목을 이전 결과와 함께 지정된 함수를 사용하여 집계한 후 결과를 방출함
- switchMap: 각 항목을 새로운 옵저버블로 매핑하고, 이전에 방출된 옵저버블을 취소하고 가장 최신의 옵저버블만을 구독함
- switchMapTo: 모든 항목을 지정된 옵저버블로 매핑하고, 이전에 방출된 옵저버블을 취소하고 가장 최신의 옵저버블만을 구독함
예시 코드
RxJs 에서 지원하는 범위와 RxJava 가 지원하는 범위가 다르기에 Buffer 의 경우 유사하게 구현했으며, 구현하기 애매한 것은 주석으로 처리해 두었다.
public static void main(String[] args) {
// 1. buffer
Observable.range(1, 10)
.buffer(3)
.subscribe(System.out::println);
// [1, 2, 3]
// [4, 5, 6]
// [7, 8, 9]
// [10]
// 2. bufferCount
Observable.range(1, 10)
.buffer(2, 3)
.subscribe(System.out::println);
// [1, 2]
// [4, 5]
// [7, 8]
// [10]
// 3. bufferTime
Observable.intervalRange(1, 10, 0, 100, TimeUnit.MILLISECONDS)
.buffer(300, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
// [1, 2, 3, 4]
// [5, 6]
// [7, 8, 9, 10]
// 4. bufferToggle
Observable.interval(1, TimeUnit.SECONDS)
.buffer(Observable.interval(3, TimeUnit.SECONDS))
.subscribe(System.out::println);
// [0, 1, 2]
// 5. bufferWhen
// Observable.intervalRange(1, 10, 0, 100, TimeUnit.MILLISECONDS)
// .bufferWhen(completeSignal -> Observable.timer(1, TimeUnit.SECONDS))
// .subscribe(System.out::println);
// 6. concatMap
Observable.just(1, 2, 3)
.concatMap(num -> Observable.just(num * 2)) // 각 항목을 2배로 변환하며 순차적으로 결합
.subscribe(System.out::println);
// 2
// 4
// 6
// 7. concatMapTo
// Observable.just(1, 2, 3)
// .concatMapTo(Observable.just(10, 20, 30))
// .subscribe(System.out::println);
// 8. map
Observable.just(1, 2, 3)
.map(num -> num * 2) // 각 항목을 2배로 변환
.subscribe(System.out::println);
// 2
// 4
// 6
// 9. mapTo
// Observable.just(1, 2, 3)
// .mapTo("Hello")
// .subscribe(System.out::println);
// 10. mergeMap
// Observable.just(1, 2, 3)
// .mergeMap(num -> Observable.just(num * 2))
// .subscribe(System.out::println);
// 11. mergeMapTo
// Observable.just(1, 2, 3)
// .mergeMapTo(Observable.just(10, 20, 30))
// .subscribe(System.out::println);
// 12. pairwise
// Observable.range(1, 5)
// .pairwise()
// .subscribe(pair -> System.out.println(pair.getFirst() + " - " + pair.getSecond()));
// 13. pluck
// Observable.just(
// new Person("Alice", 30),
// new Person("Bob", 25),
// new Person("Charlie", 35)
// )
// .pluck(person -> person.getName())
// .subscribe(System.out::println);
// 14. repeat
Observable.just(1, 2, 3)
.repeat(2) // 옵저버블을 2번 반복하여 방출
.subscribe(System.out::println);
// 1
// 2
// 3
// 1
// 2
// 3
// 15. scan
Observable.range(1, 5)
.scan((sum, num) -> sum + num) // 각 항목을 이전 결과와 함께 지정된 함수를 사용하여 집계한 후 결과를 방출
.subscribe(System.out::println);
// 1
// 3
// 6
// 10
// 15
// 16. switchMap
Observable.just(1, 2, 3)
.switchMap(num -> Observable.range(num * 10, 3)) // 각 항목을 새로운 옵저버블로 매핑하고, 이전에 방출된 옵저버블을 취소하고 가장 최신의 옵저버블만을 구독
.subscribe(System.out::println);
// 10
// 11
// 12
// 20
// 21
// 22
// 30
// 31
// 32
// 17. switchMapTo
// Observable.just(1, 2, 3)
// .switchMapTo(Observable.range(10, 3))
// .subscribe(System.out::println);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class Person {
private String name;
private int age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
}
유틸리티 연산자 (Utility Operators)
옵저버블의 동작을 지연시키거나 제어하는데 사용되며 시간 관련 기능에 주로 쓰인다.
- delay: 옵저버블의 각 항목을 일정한 시간동안 지연
일정한 시간이 지난 후에 항목을 방출하므로, 옵저버블의 흐름을 지연시키는 데 사용됨 - delayWhen: 옵저버블의 각 항목을 지정된 조건에 따라 지연
지연 시간을 계산하기 위해 각 항목을 처리하는 함수를 제공하며 이 함수는 지연시키고 싶은 항목을 처리하고 얼마나 지연시킬지 결정하며 비동기 작업이 완료되기를 기다릴 때 일반적 사용됨
예시 코드
RxJs 에서는 지원하지만, RxJava 에서 지원하지 않는 것은 주석으로 처리했다.
public static void main(String[] args) {
// delay 예시: 각 항목을 1초씩 지연시킴
Observable.just(1, 2, 3)
.delay(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
// 1
// 2
// 3
// delayWhen 예시: 각 항목을 해당 항목의 값만큼 지연시킴
// Observable.just(1, 2, 3)
// .delayWhen(item -> Observable.timer(item, TimeUnit.SECONDS))
// .subscribe(System.out::println);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
'Programming > 스프링 부트(Spring Boot)' 카테고리의 다른 글
[Spring Boot] - 003 DataTypes, Variables 데이터 유형 및 변수 (0) | 2024.04.09 |
---|---|
[Spring Boot] - 002 Basic Syntax 기본 문법 (0) | 2024.04.09 |
[Spring Boot] - 001 Spring Boot 개발자 로드맵 (0) | 2024.03.26 |
[Spring Boot] - Hikari (0) | 2024.03.03 |
[Spring Boot] - Valid Annotation 정리 (0) | 2024.02.24 |