Archive

RxJS Operator (계속 업데이트)

|

[RxJS] Operator (계속 업데이트)


1. Operator

연산자는 함수이다. 연산자에는 2종류가 있다.

  • 파이프형 연산자 : pipe(operator()) 구문을 사용해서 input에 대한 같은 output만을 반환하는 순수연산을 한다. 따라서, 파이프형 연산자는 순수함수이고 Observable 객체를 받아 다른 Observable을 반환한다.

    pipe() : 파이프 연산자는 RxJS 5.5 이전에 사용되던 chaining을 가독성 좋게 사용할 수 있게 해준다.

    // Before RxJS 5.5
    Observable.operator1().operator2().operator3().subscribe(...);
    
    // Since RxJS 5.5
    Observable.pipe(
    	operator1(),
        operator2(),
        operator3()
    ).subscribe(...);
    


  • 생성 연산자 : 새로운 Observable 객체를 생성하기 위해 단독 함수로서 호출될 수 있다.


2. Higher-order Observables (고차 Observables)

Observable은 보편적으로 string, number 같은 보통의 값을 내보내지만, observable을 다루는 observable을 고차 Observable이라고 한다.

고차 Observable은 일반적으로 평탄화해서 사용하는데 고차 Observable을 하나의 보통 Observable로 바꿔서 사용한다.

concatAll(), mergeAll(), switchAll(), exhaust(), concatMap(), mergeMap(), switchMap(), exhaustMap() 등 사용.


3. Marble Diagram (마블 다이어그램)

Observable의 연산자를 설명할 때 시간의 흐름에 따라 어떻게 작동하는지 그림을 통해 시각적으로 제공한다.

Marble Diagram : https://rxmarbles.com/


4. Custom Operator

pipe() 함수로 새로운 연산자를 만들 수 있다. 흔히 사용되는 연산자들의 시퀀스를 하나로 묶을 수 있다.

import { pipe } from "rxjs";
import { filter, map } from "rxjs/operators";

function discardOddDoubleEven() {
  return pipe(
    filter((v) => !(v % 2)),
    map((v) => v + v)
  );
}
// 홀수를 버리고 짝수를 2배하는 Custom Operator

pipe() 함수는 Observable의 pipe() 메소드와 유사하지만 같지 않다.


5. 자주 사용되는 연산자 정리

👻 Creation Operators

of() : 나열된 인자를 순서대로 next하는 Observable 반환

import { of } from "rxjs";

of(1, 2, 3).subscribe(
  (next) => console.log("next: ", next),
  (err) => console.log("err: ", err),
  () => console.log("complete")
);
// next: 1, next: 2, next: 3, complete

// 배열을 전달하면 배열이 통으로 나옴
of([1, 2, 3]).subscribe(
  (next) => console.log("next: ", next),
  (err) => console.log("err: ", err),
  () => console.log("complete")
);
// next: [1,2,3], complete


from() : array, observable, Promise, string, array-like, iterable 등을 Observable로 반환

import { from } from "rxjs";

const arr = [1, 2, 3];
const result = from(arr);
result.subscribe((x) => console.log(x));
// 1, 2, 3


fromEvent() : 브라우저에서 발생하는 Event를 Observable로 반환

import { fromEvent } from "rxjs";

const clicks = fromEvent(document, "click");
clicks.subscribe((x) => console.log(x));
// MouseEvent Object


defer() : 팩토리 함수로 Observable 생성 후 구독 시점에 팩토리 함수를 호출해 만들어둔 Observable 반환받아 구독한다. from은 Promise 생성 후 바로 동작시키지만 defer는 Observable을 subscribe하는 시점에 Promise를 생성할 수 있어서 동작 시점을 조절할 수 있다.

import { defer, fromEvent, interval } from "rxjs";

const clickOrInterval = defer(() => {
  return Math.random() > 0.5 ? fromEvent(document, "click") : interval(1000);
});
clickOrInterval.subscribe((x) => console.log(x));

// 랜덤 숫자가 0.5보다 크면 클릭 이벤트를 대기한다.
// 랜덤 숫자가 0.5보다 작으면 1초마다 방출한다.


range() : 범위 내 숫자들을 갖는 Observable 생성

import { range } from "rxjs";

const numbers = range(1, 10);
numbers.subscribe((x) => console.log(x));
// 1부터 10까지 출력


interval() : setInterval()과 유사하며 일정 간격마다 값을 발행하는데 0부터 순차적으로 증가, 명시적으로 구독해제 하지 않으면 무한하게 값을 발행하니 주의할 것.

import { interval } from "rxjs";

const intervalObservable = interval(1000).subscribe(
  (v) => console.log("next: ", v),
  (err) => console.log("err: ", err),
  () => console.log("complete")
);

setTimeout(() => {
  intervalObservable.unsubscribe();
}, 3000);

// next: 1, next: 2, next: 3, complete


timer() : setTimeout()과 유사하며 특정 시간 이후 값을 발행한다.

import { timer } from "rxjs";

const numbers = timer(3000, 1000); // 3초 후에 시작하며 1초마다 발행
numbers.subscribe((x) => console.log(x));

setTimeout(() => {
  numbers.unsubscribe();
}, 6000);

// 0, 1, 2


empty(), never(), throwError() : empty()는 Observable complete 상태를 전달하는 Observable 생성, never()는 어떤값도 발행하지 않는 Observable, throwError()는 호출 즉시 에러를 발생시킨다. empty()와 never()는 RxJS 6 이후 deprecated되었으며 상수로 변경되었다.

import { EMPTY } from "rxjs";

EMPTY.subscribe(
  () => console.log("next"),
  (err) => console.log("err"),
  () => console.log("complete")
);
// complete

import { NEVER } from "rxjs";

NEVER.subscribe(
  () => console.log("next"),
  (err) => console.log("err"),
  () => console.log("complete")
);
// 아무것도 출력되지 않음

import { throwError } from "rxjs";

throwError("err!").subscribe(
  () => console.log("next"),
  (err) => console.log("err: ", err),
  () => console.log("complete")
);
// err: err!


👻 Transformation Operators

map() : array method의 map과 유사하며 Observable을 다른 Observable로 변환하는데 사용된다.

const observable = of(1, 2, 3, 4, 5);

const subscription = observable
  .pipe(map((val) => val * val))
  .subscribe((val) => console.log(val));
// 1, 4, 9, 16, 25


mapTo() : Observable이 값을 방출할 때마다 주어진 상수 값을 방출한다.

import { fromEvent } from "rxjs";
import { mapTo } from "rxjs/operators";

const clicks = fromEvent(document, "click");
const greetings = clicks.pipe(mapTo("Foo"));
greetings.subscribe((x) => console.log(x));

// click시마다 'Foo' 출력


mergeMap() : Observable이 방출하는 값을 출력 Observable과 함께 병합한다.

import { of, interval } from "rxjs";
import { mergeMap, map } from "rxjs/operators";

const letters = of("a", "b");
const result = letters.pipe(
  mergeMap((x) => interval(1000).pipe(map((i) => x + i)))
);
result.subscribe((x) => console.log(x));

// a0, b0, a1, b1, a2, b2 ...


switchMap() : mergeMap과 비슷하지만, 외부 Observable이 발생될 때마다 내부 Observable의 새로운 구독이 시작된다.

import { fromEvent, interval } from "rxjs";
import { switchMap } from "rxjs/operators";

const clicks = fromEvent(document, "click");
const result = clicks.pipe(switchMap((ev) => interval(1000)));
result.subscribe((x) => console.log(x));

// click, 0, 1, 2, click, 0, 1, click, 0 ...
// 클릭할 때마다 구독이 취소되고 새로운 구독이 시작됨


👻 Filtering Operators

debounceTime() : 출력 사이의 지정된 시간이 경과하지 않으면 이 시간 내의 방출된 값들을 버린다.

import { fromEvent } from "rxjs";
import { debounceTime } from "rxjs/operators";

const clicks = fromEvent(document, "click");
const result = clicks.pipe(debounceTime(3000));
result.subscribe((x) => console.log(x));

// 마지막 클릭 후 3초 후에 click object 출력


throttleTime() : 지정된 시간동안 처음 값을 제외한 값들을 버린다.

import { fromEvent } from "rxjs";
import { throttleTime } from "rxjs/operators";

const clicks = fromEvent(document, "click");
const result = clicks.pipe(throttleTime(3000));
result.subscribe((x) => console.log(x));

// 클릭 연타 시 처음 click Object 출력 후 3초 후에 click Object 출력


take() : take(number)의 number에 전달된 값과 같은 개수의 값들만 방출한다.

filter() : array method의 filter와 유사하며 Observable의 필터링에 사용된다

const observable = of(1, 2, 3, 4, 5);

const subscription = observable
  .pipe(filter((val) => val % 2 === 0))
  .subscribe((val) => console.log(val));
// 2, 4


👻 Join Operators

startWith() : 출력 Observable이 방출되기 전에 startWith의 소스들이 먼저 방출된다.

import { of } from "rxjs";
import { startWith } from "rxjs/operators";

of("a", "b")
  .pipe(startWith("first", "second"))
  .subscribe((x) => console.log(x));

//  'first', 'second', 'a', 'b'


withLatestFrom() : 두 개의 Observable을 합성하지만, 한쪽에서 이벤트가 발생할 때에만 합성해준다. 이벤트 발생이 없다면 스킵된다.

import { fromEvent, interval } from "rxjs";
import { withLatestFrom } from "rxjs/operators";

const clicks = fromEvent(document, "click");
const timer = interval(1000);
const result = clicks.pipe(withLatestFrom(timer));
result.subscribe((x) => console.log(x));

// click이 발생할때만 [click Object, 경과한 시간]이 출력된다.


👻 Error Handling Operators

catchError() : 오류 발생 시 새 Observable을 반환하거나 오류를 반환하거나 retry 할 수 있다.

import { of } from "rxjs";
import { map, catchError } from "rxjs/operators";

of(1, 2, 3, 4, 5)
  .pipe(
    map((n) => {
      if (n === 4) {
        throw "four!";
      }
      return n;
    }),
    catchError((err) => of("I", "II", "III", "IV", "V"))
  )
  .subscribe((x) => console.log(x));
// 오류 발생 시 다른 Observable 방출
// 1, 2, 3, I, II, III, IV, V

import { of } from "rxjs";
import { map, catchError, take } from "rxjs/operators";

of(1, 2, 3, 4, 5)
  .pipe(
    map((n) => {
      if (n === 4) {
        throw "four!";
      }
      return n;
    }),
    catchError((err, caught) => caught),
    take(30)
  )
  .subscribe((x) => console.log(x));
// 오류 발생 시 retry한다
// 1, 2, 3, 1, 2, 3, ...

import { of } from "rxjs";
import { map, catchError } from "rxjs/operators";

of(1, 2, 3, 4, 5)
  .pipe(
    map((n) => {
      if (n === 4) {
        throw "four!";
      }
      return n;
    }),
    catchError((err) => {
      throw "error in source. Details: " + err;
    })
  )
  .subscribe(
    (x) => console.log(x),
    (err) => console.log(err)
  );
// 오류 발생 시 new error를 발생시킨다.
// 1, 2, 3, error in source. Details: four!


👻 Utility Operators

tap() : 소스에서 각 방출을 가로채서 함수를 실행하지만 오류가 발생하지 않는 한 소스와 동일한 출력을 반환

import { fromEvent } from "rxjs";
import { tap, map } from "rxjs/operators";

const clicks = fromEvent(document, "click");
const positions = clicks.pipe(
  tap((ev) => console.log(ev)),
  map((ev) => ev.clientX)
);
positions.subscribe((x) => console.log(x));


delay() : 주어진 시간까지 Observable 방출을 지연시킨다.

import { fromEvent } from "rxjs";
import { delay } from "rxjs/operators";

const clicks = fromEvent(document, "click");
const delayedClicks = clicks.pipe(delay(1000)); // 1초씩 지연되어 방출
delayedClicks.subscribe((x) => console.log(x));

// ---------------------------------------------------------------

import { fromEvent } from "rxjs";
import { delay } from "rxjs/operators";

const clicks = fromEvent(document, "click");
const date = new Date("March 15, 2050 12:00:00");
const delayedClicks = clicks.pipe(delay(date)); // 해당 날짜 후에 방출 가능
delayedClicks.subscribe((x) => console.log(x));



참고 자료


ReactiveX 공식문서

RxJS 공식문서

[Javascript] RxJS - Operators