Observable – rxjs 5

To nie będzie wpis na temat teorii reaktywnego programowania funkcyjnego. Nie jest to też wyciąg z dokumentacji rxjs. Ten wpis jest krótkim praktycznym wprowadzeniem do Obserwabli na przykładzie. Zaczynajmy!

W tym wpisie używam rxjs 5 i określenie Observable odnosi się właśnie do tej biblioteki. Dokumentacja rxjs 5.

Teoria

Wszystko co chcę Wam powiedzieć na temat teorii zawiera się w jednym zdaniu:

Reaktywne programowanie jest programowaniem z asynchronicznymi strumieniami danych.

BTW: Jest to własne tłumaczenie fragmentu artykułu „The introduction to Reactive Programming you’ve been missing”

Ale co to w ogóle oznacza? Jeśli tylko w Twojej aplikacji cokolwiek dzieje się asynchronicznie, to Observable prawdopodobnie może Ci w to ułatwić. To pewnie brzmi jak reklama dziesiątek innych bibliotek i rozwiązań. W czym więc Observable wygrywa?

  • Observable to tak naprawdę wzorzec obserwatora z bajerami. Jednocześnie jest to już de facto standard, składnia jest popularna i powszechnie znana
  • Observable pozwala w ten sam sposób obsługiwać różne rodzaje asynchronicznych zdarzeń, zarówno pojedyncze (jak żądanie http) jak i wielokrotne (jak ruchy kursorem)
  • Do tego możliwe jest łączenie różnych źródeł observabli, mieszanie ich, filtrowanie, transformowanie…
  • Observable prawdopodobnie niedługo będzie częścią JavaScriptu (link do propozycji specyfikacji)
  • Observable mogą być używane razem z najpopularniejszymi frameworkami: Angular 2 (domyślnie) oraz React/Redux (np. dzięki redux-observable)

Praktyka

No dobrz. Na razie jest trochę sucho, pewnie nikogo nie przekonałem do używania Observabli tym wprowadzeniem 🙂 Bo właściwie… po co? Popatrzmy więc na bardzo prosty przykład.

Obserwable ze zdarzeń

Chciałbym, aby po kliknięciu w przycisk pojawiała się losowa liczba. W czystym JS jest to bardzo łatwe:

const output = document.querySelector('output');  
const button = document.querySelector('button');

button  
    .addEventListener('click', () => {
        output.textContent = Math.random().toString();
    });

Mamy tutaj asynchroniczne zdarzenia, więc powinniśmy móc zamienić ten kod na observable. Tworzymy pierwszą observablę w życiu:

const output = document.querySelector('output');  
const button = document.querySelector('button');

Rx.Observable  
    .fromEvent(button, 'click')
    .subscribe(() => {
        output.textContent = Math.random().toString();
    });

Zobacz Pen Observables 1 by Michał Miszczyszyn (@mmiszy) na CodePen.

WOW

Wow, nasze pierwsze obserwable 🙂 Szkoda tylko, że na razie nie widać absolutnie żadnych zalet w stosunku do czystego JS. A skoro nie widać różnicy… i tak dalej. Dodajmy więc kolejne wymagania do naszego projektu: tylko co trzecie kliknięcie ma zmieniać wyświetlaną liczbę.

Rx.Observable  
    .fromEvent(button, 'click')
    .bufferCount(3) // !
    .subscribe((res) => {
        output.textContent = Math.random().toString();
    });

Zobacz Pen Observables 2 by Michał Miszczyszyn (@mmiszy) na CodePen.

Jakby to wyglądało w czystym JS? Na pewno byłoby nieco dłuższe. Tutaj pojawia się cała moc Observabli: operatory. Jest ich mnóstwo i nie sposób wszystkie zapamiętać, jednak dają one przeogromne, właściwie nieskończone możliwości! W tym przypadku dzięki bufferCount zbieramy (buforujemy) 3 zdarzenia i dopiero wtedy je emitujemy w postaci tablicy.

Ale w zasadzie to wymaganie 3 kliknięć łatwo też napisać w czystym JS. Zmieńmy je nieco: Niech to będą 3 kliknięcia, ale tylko w krótkim czasie 400ms. Czyli coś w stylu potrójnego kliknięcia:

const click$ = Rx.Observable.fromEvent(button, 'click');

click$  
    .bufferWhen(() => click$.delay(400)) // ! w ciągu 400 ms
    .filter(events => events.length >= 3) // ! tylko 3 kliknięcia lub więcej
    .subscribe((res) => {
        output.textContent = Math.random().toString();
    });

Zobacz Pen Observables 3 by Michał Miszczyszyn (@mmiszy) na CodePen.

bufferWhen zbiera wszystkie kliknięcia aż do momentu gdy przekazana funkcja coś wyemituje. Ta robi to dopiero po 400ms po kliknięciu. A więc razem te dwa operatory powodują, że po upływie 400ms od pierwszego kliknięcia, zostanie wyemitowania tablica ze wszystkimi kliknięciami w tym czasie. Następnie używamy filter aby sprawdzić czy kliknięto 3 lub więcej razy. Czy teraz wydaje się to bardziej interesujące?

Tworzenie observabli

Muszę przy okazji wspomnieć, że sposobów na tworzenie observabli jest bardzo wiele. Jeden z nich to poznany już fromEvent. Ale ponadto, między innymi, możemy automatycznie przekształcić dowolny Promise w Observable przy użyciu funkcji Rx.Observable.fromPromise(…), albo dowolny callback dzięki Rx.Observable.bindCallback(…) lub Rx.Observable.bindNodeCallback(…). Dzięki temu praktycznie dowolne API dowolnej biblioteki możemy zaadaptować na Observable.

HTTP

Jeśli masz ulubioną bibliotekę do obsługi żądań http, jak choćby fetch, możesz ją łatwo zaadaptować na Observable. Jednak możesz też skorzystać z metody Rx.Observable.ajax i na potrzeby tego wpisu ja tak właśnie zrobię.

Okej, prosty przykład, pobieramy listę postów z API i ją wyświetlamy. Renderowanie nie jest tematem tego posta, więc tutaj je pominę, a samo pobieranie jest tak proste jak:

const postsApiUrl = `https://jsonplaceholder.typicode.com/posts`;

Rx.Observable  
    .ajax(postsApiUrl)
    .subscribe(
      res => console.log(res),
      err => console.error(err)
    );

Voilà! To jest aż tak proste! Dodałem tutaj też drugi argument do funkcji subscribe, który służy do obsługi błędów. Okej, co teraz możemy z tym zrobić? Niech po każdym kliknięciu przycisku zostaną pobrane posty losowego użytkownika:

Rx.Observable  
  .fromEvent(button, "click")
  .flatMap(getPosts)
  .subscribe(
      render,
      err => console.error(err)
    );

function getPosts() {  
  const userId = Math.round(Math.random() * 10);
  return Rx.Observable.ajax(
    `https://jsonplaceholder.typicode.com/posts?userId=${userId}`
  );
}

Użyłem tutaj funkcji flatMap (zwanej też mergeMap), która dla każdego zdarzenia (kliknięcia) wywoła funkcję getPosts i poczeka na jej rezultat.

We wpisie dotyczącym tablic i map/reduce opisałem też funkcję flatMap. Tam zamieniała Array<Array<T>> na Array<U>, a w tym przypadku zamienia ona Observable<Observable<T>> na Observable<U>. Czy teraz widoczny jest sens poprzedniego wpisu?

Zobaczmy to na żywo:

Zobacz Pen Observables 4 by Michał Miszczyszyn (@mmiszy) na CodePen.

Super! 😉 Jednak występuje tutaj pewien problem: Wielokrotne kliknięcie na przycisk powoduje nieprzyjemny efekt wyrenderowania listy wielokrotnie. Do tego tak naprawdę nie mamy pewności, czy ostatnio pobrane dane zostaną wyrenderowane jako ostatnie… jeśli szybko klikniemy kilka razy, niemal jednocześnie zostanie wysłanych wiele żądań, a opóźnienia mogą sprawić, że żądanie wysłane wcześniej zwróci odpowiedź później… Jest to znany, częsty problem tzw. race conditions.

Rozwiązanie go przy pomocy czystego JS nie jest takie trywialne. Musielibyśmy przechowywać ostatnio wykonane żądanie, a od poprzednich się odsubskrybować. Do tego przydałoby się poprzednie żądania anulować… tu przydaje się kolejny operator z rxjs: switchMap. Dzięki niemu nie tylko automatycznie zostanie wyrenderowany tylko ostatnio pobrany zestaw danych, ale także poprzednie żądania będą anulowane:

canceled http request

Zobacz Pen Observables 5 by Michał Miszczyszyn (@mmiszy) na CodePen.

Observable z różnych źródeł

Skoro umiemy już tak dużo to może teraz rozbudujemy nieco naszą aplikację. Damy użytkownikowi możliwość wpisania ID usera od 1 do 10 (input) oraz wybór zasobu, który ma zostać pobrany: posts, albums, todos (select). Po zmianie dowolnego z tych pól żądanie powinno zostać wysłane automatycznie. Jest to praktycznie kopia 1:1 funkcji, którą ostatnio implementowałem w aplikacji dla klienta. Na początek definiujemy obserwable na podstawie zdarzeń inputchange dla selecta i inputa:

const id$ = Rx.Observable  
  .fromEvent(input, "input")
  .map(e => e.target.value);

const resource$ = Rx.Observable  
  .fromEvent(select, "change")
  .map(e => e.target.value);

Od razu też mapujemy każde zdarzenie na wartość inputa/selecta. Następnie łączymy obie obserwable w taki sposób, aby po zmianie dowolnej z nich, zostały pobrane wartości obu. Używamy do tego combineLatest:

Rx.Observable  
  .combineLatest(id$, resource$)
  .switchMap(getPosts)
  .subscribe(render);

Co istotne, funkcja combineLatest nie wyemituje niczego dopóki obie observable (id$resource$) nie wyemitują przynajmniej jednej wartości. Innymi słowy, nic się nie stanie dopóki nie wybierzemy wartości w obu polach.

Zobacz Pen Observables 6 by Michał Miszczyszyn (@mmiszy) na CodePen.

Podsumowanie

W zasadzie o obserwablach nie powiedziałem jeszcze za dużo. Chciałem szybko przejść do przykładu i pokazać coś praktycznego. Czy mi się to udało?

Jako bonus zmieniam ostatni kod i nieco inaczej obsługuję pole input. Pytanie czy i dlaczego jest to lepsze?

const id$ = Rx.Observable  
  .fromEvent(input, "input")
  .map(e => e.target.value)
  .filter(id => id >= 1 && id <= 10)
  .distinctUntilChanged()
  .debounceTime(200);
  • Fajne, ale… Może jestem jakiś dziwny, niemniej za dużo magii czuję pod spodem i jestem nieufny. Pod względem używania tego jest całkiem przyjemnie, niemniej zawsze mnie drażni jak coś jest za bardzo lib-dependent. Tutaj wykorzystujesz masę funkcji typowych dla Rx.js, które z oczywistych powodów nigdy nie trafią do standardu ES.

    Inna rzecz, że obecna propozycja nie uwzględnia nawet metod filter i map. I to jest bez sensu, bo (teraz wyjdzie jak bardzo lamię :D) przecież Observable to taka „żywa” tablica z danymi i filtrowanie to podstawa…

    • Implementacja Promise bluebird, której używam na codzień w node też zawiera znacznie więcej funkcji niż natywny obiekt Promise. Czemu to miałoby przeszkadzać? Ważne, że jest zgodna z Promise/A+, podobnie jak rxjs 5 jest zgodny z aktualną propozycją specyfikacji obiektu Observable.

      Funkcje, które wykorzystuje nie są też tak naprawdę typowe dla rxjs. Nazwy większość z nich pochodzą z Haskella i działają identycznie jak tam – więc są znane wszystkim osobom, które chociaż liznęły programowanie funkcyjne.

      Ale to nie dla nich ten wpis, bo oni doskonale sobie z rxjs poradzą i pewnie pokochają. Ten wpis miał na celu pokazać tylko kilka „prostych” przykładów – prostych dzięki rxjs, skomplikowanych gdyby chcieć je napisać w czystym JS. Tendencja jest dobra, bo chcemy ukrywać trudną obsługę asynchroniczności pod warstwą przyjemnej abstrakcji. Spróbuj zaimplementować te przykłady w samym JS bez bibliotek 🙂

      • > Spróbuj zaimplementować te przykłady w samym JS bez bibliotek 🙂

        Jak tylko znajdę godzinkę wolnego… 😛

      • Jeszcze co do Haskella: prawdę mówiąc nigdy mu się bliżej nie przyglądałem – mimo wszystko bardziej ciągnie mnie do języków OOP. Jeśli jest tak, jak w Haskellu, to fajnie. Nie zmienia to jednak faktu, że na gruncie JS wciąż jest to dość egzotyczna technika.

        Co do implementacji zgodnej, ale rozszerzonej: owszem, dobrze, że jest zgodna. Niemniej prawdziwą przenośność kodu zapewniłoby dopiero ograniczenie się do tego, co jest zdefiniowane w standardzie. Inaczej mimo wszystko robimy sobie lock-in technologiczny. Największy ból polega na tym, że obecna propozycja Observable i tak jest nieużywalna, więc lock-in tylko się pogłębia, bo i tak wszyscy użyją Rx.js albo ekwiwalenta tej biblioteki. I IMO jest to pewien problem (w taki sam sposób Angular i React po prostu zniszczyły Web Components).

        > Tendencja jest dobra, bo chcemy ukrywać trudną obsługę asynchroniczności pod warstwą przyjemnej abstrakcji.

        Tylko w którą stronę to idzie? Mamy callbacki, Promise, generatory, async/await, WHATWG definiuje obecnie streamy i w końcu mamy też Observable… Robi się dość tłoczno 😉

        • Chwila chwila 🙂 Generatory są synchroniczne, przynajmniej na razie, więc rozwiązują inny problem.
          Async/await to tylko cukier na Promise, nic nowego, tylko składnia przyjemna. No i tą metodą obsłużysz tylko pojedyncze zdarzenia, Promise sie rozwiązuje raz i koniec.
          Callback i Observable mogą służyć do obsługi wielokrotnych asynchronicznych zdarzeń, ale trzeba przyznać, że callbacki są mniej czytelne i ekspresywne niż Observable 🙂

          • > Generatory są synchroniczne, przynajmniej na razie, więc rozwiązują inny problem.

            Tzn tak, teoretycznie rozwiązują inny problem, ale przecież to właśnie one były inspiracją do stworzenia składni async/await → http://webroad.pl/javascript/746-synchroniczna-asynchronicznosc

            Tak, async/await to tylko przyjemna składnia, ale mimo wszystko większość kodu asynchronicznego jest w moim odczuciu jednorazowa, więc tego typu składnia stanie się popularna.

            Tak, callbacki i observable nadają się do powtarzalnej asynchroniczności, ale w dużej liczbie projektów powtarzalna asynchroniczność sprowadza się do obsługi zdarzeń DOM-owych, co niekoniecznie wymaga observables (zwłaszcza do prostej obsługi klików).

            Mówiąc „tłoczno” miałem na myśli, że mamy wiele różnych rzeczy, które niby się pokrywają, równocześnie mając subtelnie inne use case’y. To IMO wprowadza, przynajmniej na początku, niepotrzebne zamieszanie.

          • Jeśli masz po prostu na myśli to, że nie ma sensu użyć rxjs jeśli się go nie potrzebuje to tak, jak najbardziej. Jest to prawdziwe odnośnie każdej biblioteki 🙂

            Jednak praca (zabawa?), z rxjs to też pewnego rodzaju gimnastyka umysłu (podobnie jak pisane w Haskellu), dlatego też polecam każdemu spróbować 🙂 Zupełnie inaczej się potem patrzy na świat.

          • > Jeśli masz po prostu na myśli to, że nie ma sensu użyć rxjs jeśli się go nie potrzebuje to tak, jak najbardziej.

            To też. Bardziej mnie zastanawia, po co do standardu ES do każdej wersji dokładać „nowy, lepszy sposób na async!!!!!” 😉 Jak teraz wejdą Observables, to boję się, co wymyślą za rok.

            > Jednak praca (zabawa?), z rxjs to też pewnego rodzaju gimnastyka umysłu

            Co za czasy nastały… Kiedyś się kod pisało dla przyjemności, teraz się trzeba gimnastykować! 😛

            Ale tak, skrobnę se coś z tym, bo wygląda ciekawie.

  • Pingback: Map i Reduce w JS • Type of Web()