본문 바로가기

FrontEnd

RxJS와 반응형 프로그래밍

반응형

반응형 프로그래밍(Reactive Programming)이 뭘까요? Observable이 뭘까요? RxJS와 함께 알아봅시다.

원문입니다. : https://blog.bitsrc.io/reactive-programming-in-javascript-with-rxjs-9db53c07ef14

 

Reactive Programming in JavaScript with RxJS.

RxJS is JavaScript library for transforming, composing and querying asynchronous streams of data. RxJS can be used both in the browser or…

blog.bitsrc.io

RxJS는 비동기 데이터 스트림을 변환, 합성 및 쿼리하기 위한 JavaScript 라이브러리입니다.
RxJS는 브라우저 또는 Node.js를 사용하는 서버 측 모두에서 사용할 수 있습니다.

 

RxJS를 배울 때 가장 어려운 부분은 "반응형으로 사고하기(Thinking in Reactively)"입니다.

RxJS를 비동기 이벤트 처리를 위한 "LoDash"로 생각하십시오.

반응형 프로그래밍이란 정확히 뭔가요?

여러개의 비동기 값

반응형 프로그래밍은 주로 비동기 데이터 스트림과 관련된 코드 작성을 위한 프로그래밍 패러다임입니다.
변경 사항을 처리하기 위해 명시적으로 코드(일명 "명령형" 프로그래밍)를 작성하는 일반적인 소프트웨어 작성 방식 대신
발생하는 변경 사항에 "반응"하는 소프트웨어 애플리케이션을 구축하는 다른 방법입니다.

스트림

스트림은 시간에 따라 정렬되어 진행 중인 이벤트의 시퀀스입니다.
주 : frp가 아닌 이유가 시간의 시퀀스가 아니라 이벤트의 시퀀스라 그렇다 들었습니다.

사용자 입력, 버튼 클릭 또는 데이터 구조와 같은 모든 것이 될 수 있습니다.
스트림을 구독하고 그에 따라 반응할 수 있습니다.

함수를 사용하여 스트림을 결합, 필터링 또는 매핑할 수 있습니다.

 

스트림은 타임라인 동안 값, 오류 및 완료 세 가지 신호를 내보냅니다.
우리는 이 비동기 이벤트를 포착하고 그에 따라 함수를 실행해야 합니다.

 

Promise와 Observable은 모두 비동기(async) 관련 문제를 해결하기 위해 만들어졌습니다(즉, "콜백 지옥"을 피하기 위해).
 

최신 웹 애플리케이션의 비동기 작업 타입

  • DOM 이벤트 - (mouse events, touch events, keyboard events, form events etc)
  • 애니메이션 — (CSS Transitions and Animations, requestAnimationFrame etc)
  • AJAX
  • WebSockets
  • SSE — Server-Sent Events
  • 대체 입력 (voice, joystick etc)
혼란스럽다면 걱정하지 마세요. 
앞으로 하나씩 알아볼겁니다.

Observable

Observable은 몇 가지 특별한 특성을 가진 함수일 뿐입니다.
"observer"("next", "error" 및 "complete" 메서드가 있는 객체)를 가져와 Subscription을 반환합니다.
Observable은 몇 가지 특별한 특성을 가진 함수일 뿐입니다. "observer"("next", "error" 및 "complete" 메서드가 있는 객체)를 가져와 구독취소 로직를 반환합니다.
  • Observables는 애플리케이션에서 발행자와 구독자 간에 메시지를 전달하는 함수를 지원합니다.
  • Observable은 이벤트 처리, 비동기 프로그래밍 및 여러 값 처리에 대한 다른 기술에 비해 상당한 이점을 제공합니다.
  • Observable은 지연(lazy) 평가합니다. 구독할 때까지 데이터 생성을 시작하지 않습니다.
  • subscribe()는 Subscription을 반환합니다. 소비자는 Subscription을 취소하고 프로듀서를 제거하기 위해 unsubscribe()를 호출할 수 있습니다.
  • RxJS는 새로운 옵저버블을 생성하는 데 사용할 수 있는 다양한 기능을 제공합니다.
    • 이러한 함수는 이벤트, 타이머, 프로미스 등과 같은 것으로부터 옵저버블을 생성하는 프로세스를 단순화할 수 있습니다.
    const button = document.querySelector("button");
    const observer = {
      next: function(value) {
        console.log(value);
      },
      error: function(err) {
        console.error(err);
      },
      complete: function() {
        console.log("Completed");
      }
    };

    // Create an Observable from event
    const observable = Rx.Observable.fromEvent(button, "click");
    // Subscribe to begin listening for async result
    observable.subscribe(observer);

Subscription

  • Observable 인스턴스는 누군가가 구독할 때만 값 발행을 시작합니다. 인스턴스의 subscribe() 메서드를 호출하고 ovserver 객체를 전달하여 Observable로부터의 알림을 수신하여 구독합니다.
  • Subsription에는 인수를 사용하지 않고 구독이 보유한 리소스만 처리하는 중요한 메서드인 unsubscribe()가 있습니다.
const button = document.querySelector("button");
const observable = Rx.Observable.fromEvent(button, "click");
const subscription = observable.subscribe(event => console.log(event));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();

Observer

  • Observer는 next(), error() 및 complete() 함수가 있는 객체 리터럴입니다.
    • 위의 예에서 Observer는 Observable의 subscribe() 메서드에 전달하는 객체 리터럴입니다.
  • Observable이 값을 생성하면
    • 새 값이 성공적으로 캡처되면 .next() 메서드를 호출하고
    • 오류가 발생하면 .error()를 호출하여 관찰자에게 알립니다.
  • Observable을 구독하면 완료 신호가 발생할 때까지 관찰자에게 값을 계속 전달합니다. 
// observer is just object literal with next(), error() and complete() functions
// Howerver, next() function is required, remaining error() and complete() functions are optional 
const observer = {
  next: function(value) {
    console.log(value);
  },
  error: function(err) {
    console.error(err);
  },
  complete: function() {
    console.log("Completed");
  }
};

Operators

 

  • Operator는 컬렉션의 정교한 조작을 가능하게 하는 Observable을 기반으로 하는 함수입니다.
  • Operator는 본질적으로 하나의 Observable을 입력으로 취하고 다른 Observable을 출력으로 생성하는 순수 함수입니다.
  • 다양한 목적을 위한 연산자가 있으며 생성, 변환, 필터링, 조합, 멀티캐스팅, 오류 처리, 유틸리티 등으로 분류할 수 있습니다.
  • Operator는 집합의 다음 값으로 진행하기 전에 한 연산자에서 다음 연산자로 각 값을 전달합니다.
    • 이것은 각 단계에서 전체 배열을 처리하는 배열 연산자(맵 및 필터)와 다릅니다.
const observable = Rx.Observable.of(1, 2, 3).map(value => value * value);

observable.subscribe(x => console.log(x));
// Output:
// 1
// 4
// 9

RxJS는 많은 연산자를 제공하지만 소수만이 자주 사용됩니다. 
연산자 및 사용 샘플 목록은 RxJS API 문서를 참조하십시오.


Subject

Subject는 값을 여러 관찰자에게 멀티캐스트할 수 있는 특수한 유형의 Observable입니다.
  • RxJS의 Subject는 값을 여러 관찰자에게 멀티캐스트할 수 있는 특수한 유형의 Observable입니다.
    • 일반 Observable이 유니캐스트인 반면(구독된 각 Observer는 Observable의 독립적 실행을 소유함)
    • Subject는 멀티캐스트입니다.
    • 옵저버블이 옵저버를 구독
  • RxJS의 Subject는 Observable과 Observer 역할을 동시에 수행할 수 있는 특별한 하이브리드입니다.
  • 아래 예제에는 Subject에 연결된 두 개의 Observer가 있고 Subject에 몇 가지 값을 제공합니다.
const subject = new Rx.Subject();

subject.subscribe({
  next: v => console.log("observerA: " + v)
});
subject.subscribe({
  next: v => console.log("observerB: " + v)
});

subject.next(1);
subject.next(2);

// output
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

Observable vs Promise

더 나은 이해를 위해 ES6 Promise API를 Observable 라이브러리인 RxJS와 비교하고 대조하겠습니다.
Promise와 Observable이 얼마나 유사한지, 어떻게 다른지,
특정 상황에서 Promise 대신 Observable을 사용하려는 이유를 살펴보겠습니다.

Single value vs multiple values

  • Promise를 통해 요청하고 응답을 기다리면. 동일한 요청에 대해 여러 응답이 없을 것이라고 확신할 수 있습니다.
    • 어떤 값으로 resolve되는 Promise를 만들 수 있습니다.
  • Promise는 항상 resolve 함수에 전달된 첫 번째 값으로 리졸브되며 추가 호출은 무시합니다.
    • 반대로 Observable을 사용하면 우리가 observer.complete() 함수를 호출할 때까지 여러 값을 확인할 수 있습니다.
// creating demoPromise using ES6 Promise API
const demoPromise = new Promise((resolve, reject) => {
  asyncOperation((err, value) => {
    if (err) {
      reject(err); // error occured. We will catch error inside chain .catch()
    } else {
      resolve(value); // value received. we will get value inside .then() chain method
    }
  });
});

// creating a demoObservable using Rxjs.Observable API
const demoObservable = Rx.Observable.create(observer => {
  asyncOperation((err, value) => {
    if (err) {
      observer.error(err); // instead of reject(err)
    } else {
      observer.next(value); // instead of resolve(value)
      observer.complete(); // optional. once your async task finished then call observer.complete()
    }
  });
});​

 

eager vs lazy

  • Promise는 eager(즉시 평가)합니다. 즉, Promise 생성자가 호출되는 즉시 Promise가 제공한 액션을 수행하기 시작합니다.
  • Observable은 지연 평가합니다. Observable 생성자의 인자는 누군가가 실제로 Observable을 구독할 때만 호출됩니다.
    • 이는 당신이 구독할 때까지 아무 일도 일어나지 않는다는 것을 의미합니다.
// demoPromise started emmiting values but still we have not call .then() method on promise
const demoPromise = new Promise((resolve, reject) => {
  setTimeout(() => {
    console.log('emmit value');
    resolve(100);
  }, 3000);
});

// demoObservable not started emmiting values unitll we subscribe to it.
const demoObservable = new Observable(observer => {
  setInterval(() => {
    if (err) {
      observer.error('DemoError throw'); // instead of reject(err)
    } else {
      observer.next('value'); // instead of resolve(value)
      observer.complete(); // optional. once your async task finished then call observer.complete()
    }
  });
});​

Not cancellable vs cancellable

  • 새로운 Promise 사용자가 가장 먼저 궁금해하는 것 중 하나는 Promise를 취소하는 방법입니다.
    • ES6 Promise은 아직 취소를 지원하지 않습니다.
    • 현실적인 문제는 취소가 클라이언트 측 프로그래밍에서 정말 중요한 시나리오라는 것입니다.
    • Promise 취소 기능을 제공하는 bluebird 또는 axios와 같은 타사 라이브러리를 사용하십시오.
  • Observable에서 unsubscribe() 메서드를 호출하여 Observable에서 비동기 작업의 취소를 지원합니다.
  • Observable을 구독하면 진행 중인 실행을 나타내는 Subscribe를 리턴받습니다.
    • 실행을 취소하려면 Subscribe의 unsubscribe() 메서드를 호출하기만 하면 됩니다.
const observable = Rx.Observable.from([10, 20, 30]);
const subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe(); // its will stop ongoing execution



실전 예제

값을 이용해 Observable 만들기

const observable = Rx.Observable.of("foo", 98, false, ["john", "doe"], {
  age: 19,
  gender: "male"
});

observable.subscribe(val => console.log(val));

값의 Stream을 이용해 Observable 만들기

const observable = Rx.Observable.create( observer => {
  observer.next('Hello');
  observer.next('Its monday morning!!');
});

observable.subscribe(value => console.log(value));
// output:
// Hello
// It's monday morning

Dom Event 이용해 Observable 만들기

const button = document.querySelector('button');
const observable = Rx.Observable.fromEvent(button, 'click');
observable.subscribe(event => console.log(event));

Promise > Observable

const promise = new Promise((resolve, reject) => {
  asyncOperation((err, value) => {
    if (err) {
      reject(err);
    } else {
      resolve(value);
    }
  });
});

const Observable = Rx.Observable.fromPromise(promise);

Observable.subscribe(value => console.log(value));

Observable from Timer method

const timer = Rx.Observable.timer(3000);

timer.subscribe(() => console.log("timeout!!"));

Observable from Interval

const interval = Rx.Observable.interval(3000);

interval.subscribe(tick => console.log(`${tick} tick`));

Map Operator

const observable = Rx.Observable.from(2, 4, 6, 8);

observable.map(value => value * value).subscribe(result => console.log(result));

Do Operator

const dogs = Rx.Observable.of("Buddy", "Charlie", "Cooper", "Rocky");

// do operator used for debugging purpose
dogs
  .do(dog => console.log(dog))
  .filter(dog => dog === "Cooper")
  .do(dog => console.log(dog))
  .subscribe(dog => console.log(dog));

Debounce and Throttle

  • 디바운스 — X 시간을 기다리고 마지막 값을 주세요.
  • 스로틀 — 첫 번째 값을 주고 다음 X 시간 동안 기다리세요.
const input = document.querySelector("input");
const observable = Rx.Observable.fromEvent(input, "keyup");

observable.debounceTime(3000).subscribe(event => console.log(event));

observable.throttleTime(1000).subscribe(event => console.log(event));

bufferTime

과거의 값을 배열로 수집하고 해당 배열을 시간에 따라 주기적으로 내보냅니다.
const clicks = Rx.Observable.fromEvent(document, "click");
const buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));

 

 

Conclusion

Promise는 AJAX 작업에 가장 적합합니다.
Observable은 (복잡한)비동기 작업을 다루는데 적합합니다.
Observable은 비동기 이벤트를 생성, 변환, 필터링 및 멀티캐스팅하기 위한 많은 연산자를 제공합니다. 

Closing Note

읽어 주셔서 감사합니다. 이 기사가 마음에 들기를 바랍니다.
RxJS 대한 더 깊은 이해를 위한 참조 링크를 공유합니다.


반응형