Quick summary: We’ll explore and implement ReactiveX with node.js using Typescript. We will show how different modules while working with this API can be leveraged in this tutorial.
Introduction
According to docs, ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O. Observables fill the gap by being the ideal way to access asynchronous sequences of multiple items.
Prerequisites
- JavaScript, and basic knowledge of TypeScript
- ReactiveX — RxJs
- NodeJS
Let’s Get Started
We will explore the various operators found in the library by using them here. There are several ways to create observables. RxJs makes use of observables to observe and emit data. We can create them using the create function or use the of
, from
, interval
, range
, etc. operators.
As a side note, from v.6+, create
is deprecated and so we must create the Observable module by ourselves.
import {Observable} from 'rxjs'
const observable = new Observable(subscriber => {
subscriber.next("Hello,")
subscriber.next("My name is...")
subscriber.next("Spectre :)")
subscriber.complete()
})
observable.subscribe({
next: x => {console.log(x)},
error: err => {console.log(`Error : ${err}`)},
complete: () => {console.log("Done")}
})
We imported the Observable module, then created the Observable object that’s stored in the observable
and passed the subscriber object as an argument. Then we’ll be calling the subscribe callbacks on the subscriber object. The methods are:
next
: executed when a value emission occurs
error
: executed when an error occurs
complete
: executed when emission ends. It does not emit values, unlike the others. The output of the code above is observer
Let’s explore creating an observable by importing and using the interval
operator.
import { interval } from 'rxjs';
//Emits a number after every second.
interval(1000).subscribe(val => console.log(val));
//Output : 0,1,2,3,4....
Great!
Let’s explore the merge
operator. This operator joins two or more observables into one observable. This means that data is emitted at once from all the observables.
import {interval, merge} from "rxjs";
import {mapTo} from "rxjs/operators";
//emits every second
const ones = interval(1000);
//emit every 2 seconds
const twos = interval(2000);
merge(
ones.pipe(mapTo("Sweet")),
twos.pipe(mapTo("sauce"))
).subscribe(val => console.log(val));
//Output : "Sweet", "sauce", "Sweet", "Sweet", "sauce", "Sweet", "Sweet"...
Above, we created two observables, Sweet
emits data after 1 second while sauce
emits after 2 seconds. We then use merge to join the two observables to one. Merge takes in the observables to join as arguments. Meanwhile, in our code we have something new: pipe.
The function is used to attach operators to observables. And we have operators in RxJs to change the data emitted by the observables. In the code, we used mapTo
to change the value emitted to a String
. The pipe function takes in operators as arguments then it applies them to the observable. We can have more than one operator in the pipe function. Let closely look at our code for more insight.
Within our code, the data emited happens in the manner:
On the 1st second count Sweet
is emitted, then 2nd second "sauce", "Sweet"
is emitted . On the 3rd second count Sweet
gets emitted while on the 4th second count "sauce", "Sweet",
emits, and so the pattern continues.
Then the map
operator used to change the values from an observable.
import {from} from 'rxjs'
import {map} from "rxjs/operators";
from([1,2,3,4,5]).pipe(map(val => Math.pow(val, 2)))
.subscribe(value => {console.log(value)})
//Output : 1,4,9,16,25
So we just created an observable using the from
operator which creates observables from an array, promise or an iterable then emits the values in the array one after the other. Then we use pipe
to attach operators to our observable, so map
takes every emitted value then squares it and then the final result is emitted.
Our last operator we’ll explore is filter
. This is used to filter the emitted data based on a criteria. But if by any chance that the value does not meet this criteria, then it is not emitted. Let’s say we want to find odd numbers in a list:
import {from} from "rxjs";
import {filter} from "rxjs/operators";
from([1, 2, 3, 4, 5]).pipe(filter(val => val % 2 !== 0))
.subscribe(value => {console.log(value)})
//Output : 1,3,5
Pretty straightforward right. We check every value that’s odd number then emits it. To add a little tweak, we can also filter out objects based on values.
import {from} from "rxjs";
import {filter} from "rxjs/operators";
from([
{ name: 'Harvey', age: 20 },
{ name: 'Mike', age: 13 },
{ name: 'Donna', age: 17 },
{ name: 'Jessica', age: 25 }
]).pipe(
filter(user => user.age >= 18))
.subscribe(value => { console.log(`${value.name} is old enough to drink`)})
/**
Output:
* Harvey is old enough to drink
* Jessica is old enough to drink
**/
Why use ReactiveX — RxJs
ReactiveX, Reactive Extensions, is load collection of beautiful projects built by the ReactiveX community making asynchronous programming in different programming languages and platforms possible as we saw in our tutorials.Their observable patterns also help in making the code smaller (less) and easier to read.
The library also have a well-managed error handling mechanism when compared to the traditional try/catch
method, it performs better. Also the ReactiveX projects include RxJava, RxJs, RxSwift, etc.
Conclusion
These will easily allow you to write asynchronous code to achieve a better performance within our code or full project. These can be use to perform more similar or complex tasks apart from the ones covered in this article.