BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Deep Dive into Reactive Programming with RxJS

Deep Dive into Reactive Programming with RxJS

Bookmarks

Key Takeaways

  • You can use observables and RxJS to write clean and efficient code for handling asynchronous data in your application.
  • With observables, you can construct continuous data streams that emit data over time. Observables can be subscribed to, canceled, or completed at any moment.
  • Observables are lazy. This allows writing efficient code that’s more declarative.
  • It’s useful to think of observables as collections and apply functional-style procedures like mapping and filtering to manipulate data and minimize side effects.
  • You can handle even the most complex scenarios of async data flow using the nested observables and the various flattening strategies provided by RxJS.

One of the most challenging aspects of developing any user-facing application is handling asynchronous actions such as user input and API requests cleanly and robustly. Thankfully, over the years, many useful programming paradigms and tools have emerged to help us with that task. One of these paradigms is called Reactive Programming.

In this post, let's cover using Reactive Programming and RxJS when building software applications.

We will dive deep into the fundamental parts of working with RxJS and how it benefits us when building complex applications. You can build on this knowledge further after understanding these basic concepts.

What is RxJS

RxJS helps developers author declarative code for handling side effects and asynchronous actions with continuous data streams and subscriptions. Think of a stream as a collection of data that arrives asynchronously over time.

The main building blocks of RxJS include:

1.    Observable - An object responsible for handling data streams and notifying observers when new data arrives.
2.    Observer - Consumers of data streams emitted by observables. Usually, it's a simple handler function that runs each time a new event occurs.

Creating an Observable

Let's explore the anatomy of an observable and how to use it.

One of the many benefits of using RxJS is the abundance of utility methods to create observables from all kinds of sources. You can make observables from DOM events, promises, data streams, and other sources of asynchronous data. For this example, we'll create a blank new observable.

const { Observable, fromEvent, interval } = require("rxjs");

const observable = new Observable((subscriber) => {
 let counter = 0;
 subscriber.next(counter);
 const interval = setInterval(() => {
   counter++;
   subscriber.next(counter);
   if (counter === 5) {
     subscriber.complete();
     clearInterval(interval);
   }
 }, 1000);
});

observable.subscribe(
 (value) => {
   console.log({ value });
 },
 null,
 () => {
   console.log("Done!");
 }
);

Let's break down what happened:

  1. We start by creating a new observable using the "new Observable" construct.
  2. When creating the observable, we also pass a function as an argument. This function is responsible for running the observable and emitting values. This is where you declare when and how your observable should emit data.
  3. The same function also accepts the "subscriber" object as a parameter. "subscriber" is an instance of the observer that subscribed to your observable. You use methods provided by the "subscriber" object to notify your subscribers when new data arrives.
  4. "subscriber" has three methods available to us:
    • a. 'next' - this is the method we use to push new data to the observer.
    • b. 'error' - we call this method whenever we want to communicate that some error has happened.
    • c. 'complete' - we call this method to let the observer know that our observable has finished executing and there's no more data.

forEach VS subscribe

In our example, we used the 'subscribe' method to subscribe to the observable. There's also a 'forEach' method available. Let's briefly cover the differences between these two.

The core difference between these two subscription methods is their return type.

'forEach' returns a promise that either resolves or rejects when the observable completes or throws an error. When you process an observable of finite duration and want to do so synchronously, this is a good choice. It's best to avoid 'forEach' for observables that can continue infinitely, such as DOM events.

'subscribe' returns a subscription object that you can use to unsubscribe from observable at any point.

Observables vs Promises

So what makes observables better than other alternatives for handling async code, such as promises?

If stated briefly, the four main advantages of observables are:

  1. Ability to continuously emit new data, notify when the data stream is finished, and cancel observable at any point.
  2. Observable's lazy execution allows writing more efficient code.
  3. A vast number of helper methods for observables.
  4. Single interface for dealing with all types of async operations and side-effects.

Continuous data, retry, and cancel

You can only fire promises once, and you cannot easily restart or retry them. You can't cancel promises after calling them - this is something that even trivial applications often require. On the other hand, you can easily cancel and retry observables.

With Observables, you can emit data continuously and handle numerous complex scenarios you are likely to face when building any software. Things like IO and DOM events, changing state, processing data in chunks, and others.

Lazy execution

Another difference between observables and promises is their execution flow. Promises are eager while observables are lazy.

If you call a function that returns an observable, all you do is create one. It will not start the execution until you subscribe to it using the 'subscribe' or 'forEach' methods. Promises, however, are different. If you call a function that returns a promise, that promise will fire off immediately. The same concept of eager execution applies to functions that use async/await, which, under the hood, is just syntactic sugar for promises.

The lazy execution pattern allows writing interesting code while keeping it declarative and easy to understand. It lets you split and refactor your code in ways you haven't been able to before.

Helper methods

Another advantage of Reactive Programming with RxJS is all the helper methods included in the library. Using these helper methods, you can write a few lines of code to support complex scenarios that would otherwise require a lot of effort and testing.

Let’s take a look at how we can easily call API and retry three times on failure using "retry" and "fromFetch" helper methods:

import { fromFetch } from "rxjs/fetch";
import { retry } from "rxjs/operators";

const apiObservable = fromFetch("https://thatcopy.pw/catapi/rest").pipe(
 retry(3)
);

If we were to try to implement this feature without the help of observables and helper methods, the solution would be a lot more complex. We would need to use try/catch along with some state management to keep track of failed requests and retries.

Using observables, you can retry, throttle, and cancel asynchronous operations on the fly. And that’s just the tip of the iceberg of what you can do with observables. If you would like to explore the full capability of observables and RxJS in more detail, you can visit this reference page.

Universal API

The last advantage is applicable if you decide to use Observables for all asynchronous operations in your application.

Without using observables, you would have to write separate mechanisms for dealing with event handlers, promises, streams, and other async data. You'll need to make sure they all play nicely with each other and are compatible.

Or you could wrap all of those things in observables, using a myriad of helper functions from RxJS. Doing so will provide a universal API that will let you seamlessly and interchangeably use all of the above-mentioned asynchronous data sources together.

Observables are like collections

Another way of thinking about Observables is thinking of them as collections, specifically arrays.

Arrays are synchronous and easier to reason about when writing functional code. It’s a common practice to use functional-style programming to manipulate arrays to get the data we need and avoid side effects.

It might seem bizarre at first to think of events and async operations as collections, but that's the power of observables and RxJS. We can reason about things that are asynchronous in a synchronous manner.

We can apply filter, map, reduce, and other operations to extract and process the data. These functions help us write clean functional code for handling async actions that’s much easier to comprehend.

let count = 0;

fromEvent(document, "click")
 .pipe(map(() => count++))
 .pipe(throttle(() => interval(1000)))
 .pipe(filter((value) => value % 2 === 0))
 .subscribe((count) => console.log('Clicked ${count} times'));

Notice how we can map, filter, and throttle the DOM events using functional-style programming, similar to how we would process a collection.

The power of nested observables

Let's build on top of our understanding of observables to see how we can handle even more complex scenarios.

You might have an asynchronous stream of data that contains nested streams of data. For example, consider an infinite scrolling user experience. In this scenario, we have a continuous event stream as the user scrolls. Each time the user reaches the bottom of the view, we send an API request. (stream of data).

You can handle scenarios like that and many others using nested observables. As mentioned before, observables behave just like collections, and so they can be nested. You can perform mapping and flattening on nested observables. Equipped with the technique of nested observables, you will be able to handle even the most complex application scenarios.

Flattening strategies

The main idea behind dealing with nested observables is mapping them until you get the data you need, after which you flatten your nested observables.

There are several different flattening strategies for nested observables to consider.

concatAll

This method flattens nested observable collections using the FIFO (First In First Out) method. Data emits according to the order of the nested observables. This strategy helps to prevent race conditions and guarantees sequential pushes. Here's an illustration of nested observables and the order of the emitted data when flattened using the concatAll strategy.

As you can see, even though data "B" arrived before data "A", data "A" will be emitted first because the observable that contains it is positioned before the observable that contains data "B".

mergeAll

The mergeAll method pushes data from the nested observable in the order that the data arrives. This method provides no guarantees of preserving the execution order of the observables, unlike 'contactAll'.

The data gets displayed in the order it arrives, ignoring the order of the observables.

switchMap

This method is used the most often in web applications. "switchMap" destroys the previous nested observable when it receives a new one. This strategy works best when we always need to process the latest piece of data we receive.

Data "A" was skipped because we switched to a new observable that emitted data "B" before the first observable had a chance to emit "A". The same thing happened with data "D" emitting before the observable that contains data "C" had a chance to emit it.

Example of building a drag-and-drop (DnD) tool

Let's see nested observables in action. We’ll allow the user to DnD an HTML element on a canvas.
First, let's create a helper method that accepts two HTML elements and returns an observable for DnD events:

import "rxjs/add/operator/concatMap";
import "rxjs/add/operator/takeUntil";
import { fromEvent } from "rxjs";

export function dndObservable(item, container) {
 const spriteMouseDowns = fromEvent(item, "mousedown");
 const spriteContainerMouseMoves = fromEvent(container, "mousemove");
 const spriteContainerMouseUps = fromEvent(container, "mouseup");

 const spriteMouseDrags =
   // For every mouse down event on the sprite...
   spriteMouseDowns.concatMap(function (contactPoint) {
     // ...retrieve all the mouse move events on the sprite container...
     return (
       spriteContainerMouseMoves
         // ...until a mouse up event occurs.
         .takeUntil(spriteContainerMouseUps)
     );
   });

 return spriteMouseDrags;
}

A few things to note here:

  1. "concatMap" is observable’s helper method that’s a simple shortcut for calling concatAll and map methods together.
  2. "takeUntil" is another method of the observable object that you can use to instruct to listen to the observable until we get an event from another observable.

Most of the complexity lies in this "dndObservable" method. First, we're creating three separate observables from DOM events using the 'fromEvent' method provided by RxJS. We want to track mouseDown, mouseMove, and mouseUp events to encapsulate DnD behavior.

Next, we're creating a nested observable collection by combining mouseDown and mouseUp event observables. We use the "spriteContainerMouseUps" observable together with the "takeUntil" method to track mouse movements until the user releases the mouse.

Calling the "dndObservable" method simply creates the observable we need and nothing else. To execute it and start tracking events, we need to subscribe to it.

Now let's subscribe to this observable and move the element each time we receive new location data:

   const item = document.getElementById("item");
   const container = document.getElementById("container");
   if (!item || !container) return;

   const observable = dndObservable(item, container);

   // For each mouse drag event, move the sprite to the absolute page position.
   observable.subscribe(function (dragPoint) {
     item.style.left = dragPoint.pageX + "px";
     item.style.top = dragPoint.pageY + "px";
   });

As we receive new data, we're assigning new coordinates to our HTML item using the "style" property.

Conclusion

Let's briefly summarize the main points of the article:

  • Observables and observers are core concepts of reactive programming with RxJS.
  • Observables have richer API than promises and have many benefits, among which is lazy execution.
  • Observables allow thinking about asynchronous flows of data in a synchronous fashion. That allows for mapping, filter and reducing of the events and other async data.
  • Nested observables are a powerful concept that will let you handle even the most complex scenarios in your application. The three main strategies for dealing with nested observables: concatAll, mergeAll, switchMap.

The best way to understand observables and reactive programming is to start using them in your code. Thank you for reading!

RxJS is a popular JavaScript open-source library with the Apache 2 license. RxJS accepts contributions under strict guidelines outlined in their contribution guidelines document as well as their code of conduct.

About the Author

Iskander Samatov is currently a senior software engineer at HubSpot. He was previously involved with several startups in legal tech such as Joinder, building communication platforms and streamlining processes for professionals in the legal industry. Samatov has developed a number of mobile and web apps, used by thousands of people, like Planly. Samatov frequently blogs here.

 

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • Please upgrade all examples to the latest version of RJs

    by Enrico Piccinin,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    I think the author should update the examples to the latest version of RxJS.
    For instance the "switchLatest"operator has been removed from the library since several versions. The use of "forEach" method to subscribe to a stream is still supported but definitely is not to be considered as the normal mean to elaborate notifications coming from Observable streams.

  • Re: Please upgrade all examples to the latest version of RJs

    by Iskander Samatov,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Thank you for your input.
    1. Indeed, looks like switchLatest has been renamed to switchMap, I'll make the adjustments.
    2. While I agree that subscribe is used more often, forEach still has its use-cases in certain situations, also depends on if you prefer to handle your side-effects using imperative style code.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT