Explore ReactiveX With NodeJS And TypeScript

Quick summary: We’ll explore and implement ReactiveX with node.js using Typescript. We will show how different modules while working with this API can be leveraged in this tutorial.

Introduction

According to docs, ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O. Observables fill the gap by being the ideal way to access asynchronous sequences of multiple items.

Prerequisites

  1. JavaScript, and basic knowledge of TypeScript

Let’s Get Started

We will explore the various operators found in the library by using them here. There are several ways to create observables. RxJs makes use of observables to observe and emit data. We can create them using the create function or use the of, from, interval, range, etc. operators.

As a side note, from v.6+, create is deprecated and so we must create the Observable module by ourselves.

import {Observable} from 'rxjs'

const observable = new Observable(subscriber => {
subscriber.next("Hello,")
subscriber.next("My name is...")
subscriber.next("Spectre :)")
subscriber.complete()
})

observable.subscribe({
next: x => {console.log(x)},
error: err => {console.log(`Error : ${err}`)},
complete: () => {console.log("Done")}
})

We imported the Observable module, then created the Observable object that’s stored in the observable and passed the subscriber object as an argument. Then we’ll be calling the subscribe callbacks on the subscriber object. The methods are:

next : executed when a value emission occurs

error : executed when an error occurs

complete : executed when emission ends. It does not emit values, unlike the others. The output of the code above is observer

Let’s explore creating an observable by importing and using the interval operator.

import { interval } from 'rxjs';

//Emits a number after every second.
interval(1000).subscribe(val => console.log(val));

//Output : 0,1,2,3,4....

Great!

Let’s explore the merge operator. This operator joins two or more observables into one observable. This means that data is emitted at once from all the observables.

import {interval, merge} from "rxjs";
import {mapTo} from "rxjs/operators";

//emits every second
const ones = interval(1000);
//emit every 2 seconds
const twos = interval(2000);

merge(
ones.pipe(mapTo("Sweet")),
twos.pipe(mapTo("sauce"))
).subscribe(val => console.log(val));

//Output : "Sweet", "sauce", "Sweet", "Sweet", "sauce", "Sweet", "Sweet"...

Above, we created two observables, Sweet emits data after 1 second while sauce emits after 2 seconds. We then use merge to join the two observables to one. Merge takes in the observables to join as arguments. Meanwhile, in our code we have something new: pipe.

The function is used to attach operators to observables. And we have operators in RxJs to change the data emitted by the observables. In the code, we used mapTo to change the value emitted to a String. The pipe function takes in operators as arguments then it applies them to the observable. We can have more than one operator in the pipe function. Let closely look at our code for more insight.

Within our code, the data emited happens in the manner:

On the 1st second count Sweet is emitted, then 2nd second "sauce", "Sweet" is emitted . On the 3rd second count Sweet gets emitted while on the 4th second count "sauce", "Sweet", emits, and so the pattern continues.

Then the map operator used to change the values from an observable.

import {from} from 'rxjs'
import {map} from "rxjs/operators";

from([1,2,3,4,5]).pipe(map(val => Math.pow(val, 2)))
.subscribe(value => {console.log(value)})

//Output : 1,4,9,16,25

So we just created an observable using the from operator which creates observables from an array, promise or an iterable then emits the values in the array one after the other. Then we use pipe to attach operators to our observable, so map takes every emitted value then squares it and then the final result is emitted.

Our last operator we’ll explore is filter. This is used to filter the emitted data based on a criteria. But if by any chance that the value does not meet this criteria, then it is not emitted. Let’s say we want to find odd numbers in a list:

import {from} from "rxjs";
import {filter} from "rxjs/operators";

from([1, 2, 3, 4, 5]).pipe(filter(val => val % 2 !== 0))
.subscribe(value => {console.log(value)})

//Output : 1,3,5

Pretty straightforward right. We check every value that’s odd number then emits it. To add a little tweak, we can also filter out objects based on values.

import {from} from "rxjs";
import {filter} from "rxjs/operators";

from([
{ name: 'Harvey', age: 20 },
{ name: 'Mike', age: 13 },
{ name: 'Donna', age: 17 },
{ name: 'Jessica', age: 25 }
]).pipe(
filter(user => user.age >= 18))
.subscribe(value => { console.log(`${value.name} is old enough to drink`)})

/**
Output:
* Harvey is old enough to drink
* Jessica is old enough to drink
**/

Why use ReactiveX — RxJs

ReactiveX, Reactive Extensions, is load collection of beautiful projects built by the ReactiveX community making asynchronous programming in different programming languages and platforms possible as we saw in our tutorials.Their observable patterns also help in making the code smaller (less) and easier to read.

The library also have a well-managed error handling mechanism when compared to the traditional try/catch method, it performs better. Also the ReactiveX projects include RxJava, RxJs, RxSwift, etc.

Conclusion

These will easily allow you to write asynchronous code to achieve a better performance within our code or full project. These can be use to perform more similar or complex tasks apart from the ones covered in this article.

Related Resources

  1. ReactiveX

Software Engineer | Technical Writer | I love bunnies