Programowanie reaktywne-RxJS vs EventEmitter w Node.js

Ostatnio zacząłem szukać bibliotek RxJS i RxJava(od Netflixa), które pracują nad koncepcją programowania reaktywnego.

Node.js działa w oparciu o pętle zdarzeń, które zapewniają cały arsenał do programowania asynchronicznego, a kolejne biblioteki węzłów, takie jak "klaster", pomagają w jak najlepszym wykorzystaniu wielordzeniowej maszyny. I Węzeł.js zapewnia również funkcjonalność EventEmitter, w której możesz subskrybować zdarzenia i działać na nich asynchronicznie.

Z drugiej strony, jeśli dobrze rozumiem RxJS (i programowanie reaktywne w ogóle) działa na zasadzie strumieni zdarzeń, subskrybowanie strumieni zdarzeń, asynchronicznie przekształcanie danych strumienia zdarzeń.

Pytanie Więc, co robi używanie pakietów Rx w Node.js mean. Jak różni się pętla zdarzeń węzła, emiter zdarzeń i subskrypcje strumieni i subskrypcji Rx.

Author: André Staltz, 2014-08-16

3 answers

Masz rację, że strumienie Rx i EventEmitter są bardzo podobne, obie są implementacjami wzorca Observer.

Różnica polega na tym, że Rx zawiera funkcje do przekształcania i łączenia strumieni zdarzeń. Wyobraź sobie na przykład, że chcemy opóźnić każde "zdarzenie odpowiedzi" o 2 sekundy. W przypadku programu EventEmitter musisz się do niego zapisać i ręcznie ustawić timeout:

eventEmitter.on('response', function(res) {
    setTimeout(function() { /* handle res */ }, 2000);
});

Za pomocą Rx możesz utworzyć nowy strumień zdarzeń, który jest po prostu oryginalnym strumieniem zastosowana do funkcji opóźnienia:

responseStream.delay(2000).subscribe(function(res) {
    /* handle res */
});

delay() jest prostą funkcją i jedną z wielu innych dostępnych . Istnieje tak wiele różnych sposobów modyfikowania strumieni, że możliwe staje się zaprogramowanie wielu logiki aplikacji poprzez przekształcanie strumieni ze wszystkimi możliwymi funkcjami, zamiast polegać na niskopoziomowej logice, takiej jak setTimeout.

Sprawdź również tę wizualną i interaktywną eksplorację tych funkcji.
 39
Author: André Staltz,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-08-16 13:40:10

Moja odpowiedź ~ dwa lata temu była błędna i nie mogę jej znacząco edytować ani usunąć. Obiekty obserwacyjne nie są jak Eventemitery. One mogą zachowywać się Jak EventEmitters w niektórych przypadkach, a mianowicie gdy są multicastowane za pomocą obiektów RxJS, ale zazwyczaj nie zachowują się jak EventEmitters.

W skrócie, obiekt RxJS jest jak EventEmitter, ale Rxjs obserwowalny jest bardziej ogólnym interfejsem. obserwowalne są bardziej podobne do funkcji z zero argumentów.

Rozważ następujące:


function foo() {
  console.log('Hello');
  return 42;
}

var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);

Oczywiście wszyscy spodziewamy się zobaczyć jako wyjście:

"Hello"
42
"Hello"
42

Możesz napisać to samo zachowanie powyżej, ale z Obserwowalnymi:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});

I wyjście jest takie samo:

"Hello"
42
"Hello"
42

To dlatego, że zarówno funkcje, jak i Obserwable są leniwymi obliczeniami. Jeśli nie wywołasz funkcji, console.log('Hello') nie nastąpi. Również w przypadku Obserwabli, jeśli nie "wywołasz" (subscribe), console.log('Hello') nie dojdzie do skutku. Plus, " calling" albo "subskrybowanie" jest niezależną operacją: dwa wywołania funkcji wyzwalają dwa oddzielne efekty uboczne, a dwa obserwowalne subskrybowanie wyzwalają dwa oddzielne efekty uboczne. W przeciwieństwie do Eventemiterów, które dzielą się efektami ubocznymi i mają chętną realizację niezależnie od istnienia subskrybentów, Obserwable nie mają wspólnej realizacji i są leniwe.


Jak na razie nie ma różnicy między zachowaniem funkcji a obserwowalnym. To pytanie StackOverflow byłoby lepiej sformułowane jako " Rxjs Observables vs functions?".

Niektórzy twierdzą, że obiekty obserwacyjne są asynchroniczne. To nieprawda. Jeśli otaczasz wywołanie funkcji logami, w ten sposób:
console.log('before');
console.log(foo.call());
console.log('after');

Oczywiście zobaczysz wyjście:

"before"
"Hello"
42
"after"
I to samo zachowanie z Obserwowalnymi:
console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

I wyjście:

"before"
"Hello"
42
"after"

Co dowodzi, że subskrypcja foo była całkowicie synchroniczna, tak jak funkcja.


Więc jaka jest naprawdę różnica między Obserwowalną a funkcją?

Obiekty obserwowalne mogą "zwracać" wiele wartości w czasie , czegoś, czego funkcje nie mogą. Nie możesz tego zrobić:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}

Funkcje mogą zwracać tylko jedną wartość. Obserwatorzy mogą jednak to zrobić:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

Z wyjściem synchronicznym:

"before"
"Hello"
42
100
200
"after"

Ale można również "zwracać" wartości asynchronicznie:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(function () {
    observer.next(300);
  }, 1000);
});

Z wyjściem:

"before"
"Hello"
42
100
200
"after"
300

Podsumowując,

  • func.call() znaczy "daj mi jedna wartość natychmiast (synchronicznie)"
  • obsv.subscribe() oznacza "daj mi wartości. Może wielu z nich, może synchronicznie, może asynchronicznie"

W ten sposób Obserwable są uogólnieniem funkcji (które nie mają argumentów).

 74
Author: André Staltz,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-06 10:50:46

Kiedy słuchacz przywiązuje się do emitera ?

Z emiterami zdarzeń, słuchacze są powiadamiani o każdym wydarzeniu, które ich interesuje. Gdy nowy słuchacz zostanie dodany po wystąpieniu zdarzenia, nie będzie wiedział o przeszłym zdarzeniu. Również nowy słuchacz nie pozna historii wydarzeń, które miały miejsce wcześniej. Oczywiście, możemy ręcznie zaprogramować nasz emiter i słuchacz do obsługi tej niestandardowej logiki.

Z reactive streams, abonent dostaje strumień wydarzeń, które miały miejsce Od początku. Więc czas, w którym się zapisuje, nie jest ścisły. Teraz może wykonywać różne operacje na strumieniu, aby uzyskać pod-strumień wydarzeń, które go interesują.

Zaleta tego wychodzi:

  • kiedy musimy przetworzyć wydarzenia, które wydarzyły się w czasie
  • kolejność, w jakiej się zdarzyły
  • wzorce, w których zdarzały się wydarzenia (powiedzmy, po każdym wydarzeniu buy na Google stock, sprzedaj wydarzenie Na Microsoft stock dzieje się w ciągu 5 minut)

Strumienie wyższego rzędu:

Strumień wyższego rzędu jest "strumieniem strumieni": strumieniem, którego wartości zdarzeń są same w sobie strumieniami.

W przypadku emiterów zdarzeń, jednym ze sposobów jest podłączenie tego samego słuchacza do wielu emiterów zdarzeń. Staje się to skomplikowane, gdy musimy skorelować zdarzenie miało miejsce na różnych emiterach.

Z reaktywnymi strumieniami, to bryza. Przykład z Jest to biblioteka programowania reaktywnego, podobna do RxJS, ale bardziej wydajna.]}
const firstClick = most.fromEvent('click', document).take(1);
const mousemovesAfterFirstClick = firstClick.map(() =>
    most.fromEvent('mousemove', document)
        .takeUntil(most.of().delay(5000)))

W powyższym przykładzie korelujemy zdarzenia kliknięcia ze zdarzeniami ruchu myszy. Wyprowadzanie wzorców między zdarzeniami staje się łatwiejsze, gdy zdarzenia są dostępne jako strumień.

Powiedziawszy to, z EventEmitter moglibyśmy to wszystko osiągnąć poprzez ponad inżynierię naszych emiterów i słuchaczy. Wymaga ponad inżynierię, ponieważ nie jest przeznaczony w pierwszej kolejności do takich scenariusze. Natomiast reactive streams robi to płynnie, ponieważ ma na celu rozwiązanie takich problemów.

 0
Author: Sairam Krish,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-07-22 12:23:53