Comprehensive Guide to higher-order RXJS Mapping Operators: SwitchMap, Mergemap, ConcatMap (and ChaustMap)

Some of the most common RxJS operators we find on a daily basis are the RxJS higher-order mapping operators: switchMap, mergeMap, concatMap, and chaustmap.

For example, most network calls in our program will be made using one of these operators, so familiarity with them is essential for writing almost any reactionary program.

Knowing which operators to use (and why) in a given situation can be a bit confusing, and we often wonder how these operators actually work and why they are named the way they are.

These operators may seem unrelated, but we really want to learn them all at once, because choosing the wrong operator can accidentally cause subtle problems in our program.

Why are the mapping operators a bit confusing?

There’s a good reason for this: To understand these operators, we first need to understand each of the Observable composition strategies used internally.

Instead of trying to understand switchMap on your own, you should first understand what an Observable switch is; We need to learn about Observable connections, etc., rather than delving directly into ConcatMap.

That’s what we’re going to do in this article, where we’ll learn about Conmat, Merge, Switch, and Exhaust policies and their corresponding mapping operators in a logical order: ConcatMap, Mergemap, SwitchMap, and Exhaust Map.

We’ll use a combination of the Marble diagram and some real-life examples, including running code, to explain these concepts.

Finally, you’ll know exactly how each of these mapping operators works, when they are used, why they are used, and why they are named.

The RxJs Map Operator

Let’s start at the beginning and describe the general use of these mapping operators.

As the name of the operators suggests, they are doing some sort of mapping: but what is being mapped? Let’s first look at the marbles of the RxJS Map operator:

How the base Map Operator works

Using the map operator, we can take an input stream (values 1, 2, 3) and create a derived mapped output stream (values 10, 20, 30) from it.

The values of the bottom output stream are obtained by taking the values of the input stream and applying them to a function: the function simply multiplies these values by 10.

So the map operator is the value that maps the input Observable. Here is an example of how we can use it to process HTTP requests:

const http$ : Observable<Course[]> = this.http.get('/api/courses');

http$
    .pipe(
        tap(() => console.log('HTTP request executed')),
        map(res => Object.values(res['payload']))
    )
    .subscribe(
        courses => console.log("courses", courses)
    );

In this example, we are creating an HTTP Observable to make a back-end call, and we are subscribing to it. An Observable emits the value of the back-end HTTP response, which is a JSON object.

In this case, the HTTP response wraps the data in payload attributes, so in order to retrieve the data, we apply the RxJS mapping operator. The mapping function then maps the JSON response to the load and extracts the value of the load attribute.

Now that we’ve reviewed how basic mapping works, let’s talk about higher-order mapping.

What is Higher-Order Observable Mapping?

In a higher-order mapping, instead of mapping a normal value like 1 to another value like 10, we map a value to an Observable!

The result is a high-order Observable. It’s just an Observable, but its value itself is also an Observable that we can subscribe to separately.

This may sound far-fetched, but in fact, this type of mapping happens all the time. Let’s take a practical example of this type mapping. Suppose, for example, that we have an Angular Reactive Form that emits active Form values over time via an Observable:

@Component({ selector: 'course-dialog', templateUrl: './course-dialog.component.html' }) export class CourseDialogComponent implements AfterViewInit { form: FormGroup; course:Course; @ViewChild('saveButton') saveButton: ElementRef; constructor( private fb: FormBuilder, private dialogRef: MatDialogRef<CourseDialogComponent>, @Inject(MAT_DIALOG_DATA) course:Course) { this.course = course; this.form = fb.group({ description: [course.description, Validators.required], category: [course.category, Validators.required], releasedAt: [moment(), Validators.required], longDescription: [course.longDescription, Validators.required] }); }}

Reactive Form provides an Observable this.form.valueChanges that issues the latest Form values when the user interacts with the Form. This will be our source Observable.

What we want to do is to save at least some of these values as they are emitted over time to enable the form draft pre-save capability. This way, as the user fills out the form, the data is gradually saved to avoid losing the entire form data due to an unexpected reload.

Why Higher-Order Observables?

To implement the form draft save functionality, we need to get the form value, then create a second HTTP Observable that performs the back-end save, and then subscribe to it.

We could try to do all this manually, but we would fall into a nested subscription antipattern:

this.form.valueChanges
    .subscribe(
       formValue => {
      
           const httpPost$ = 
                 this.http.put(`/api/course/${courseId}`, formValue);

           httpPost$.subscribe(
               res => ... handle successful save ...
               err => ... handle save error ...
           );

       }        
    );

As we’ve seen, this causes our code to quickly be nested at multiple levels, which is one of the first things we want to avoid when using RxJS.

Let’s call this new HttpPost $Observable an internal Observable, since it is created in an internal nested block of code.

Avoiding nested subscriptions

We want to do all of this in a more convenient way: we want to take the form value and map it to a save Observable. This effectively creates a high-order Observable, where each value corresponds to a save request.

We then want to transparently subscribe to each of these network Observables and receive the network response directly at once to avoid any nesting.

If we had some kind of higher-order RxJS mapping operator, we could do all this! So why do we need four different operators?

To understand this, imagine what would happen if ValueChanges Observable issued multiple form values in rapid succession and the save operation took some time to complete:

  • Should we wait for one save request to complete before making another save?
  • Should we save multiple times in parallel?
  • Should we cancel an ongoing save and start a new save?
  • Should we ignore new save attempts when one is already in progress?

Before exploring each of these use cases, let’s go back to the nested subscription code above.

In the nested subscription example, we are actually triggering the save in parallel, which is not what we want, because there is no strong guarantee that the backend will process the save sequentially, and that the last valid form value is actually stored on the back end.

Let’s look at how to ensure that the save request is completed only after the last save.

Understanding Observable Concatenation

To implement sequential saving, we will introduce the new concept of Observable connections. In this code example, we use the concat() RxJs function to connect the two sample Observables:

const series1$ = of('a', 'b');

const series2$ = of('x', 'y');

const result$ = concat(series1$, series2$);

result$.subscribe(console.log);

After creating two Observables series1$and series2$with the “of” function, we create a third result$Observable, which is the result of series1$and series2$.

Here is the console output from the program, showing the values emitted by the resulting Observable:

a
b
x
y

As we have seen, these values are the result of concatenating the value of series1$with the value of series2$. But there’s a problem: the reason this example works is because these Observables are being completed!!

The of() function creates Observables, emits the values passed to of(), and then completes the Observables after emits all of the values.

Observable Concatenation Marble Diagram

Did you notice the vertical bar after the value b of the first Observable? This marks the point in time when the first Observable completed with the values a and b (series1$).

Let’s break down what happened here step by step with the timeline:

  • Two Observables series1$and series2$are passed to the concat() function
  • Concat () will then subscribe to the first Observable series1$, but not to the second Observable series2$(this is critical to understanding concatenation)
  • Source1 $emits the value a, which is immediately reflected in the output result$Observable
  • Note that source2$Observable has not issued a value yet because it has not been subscribed
  • Source1 $then emits a b value, which is reflected in the output
  • Then source1$will be complete, and only after that concat() now subscribes to source2$
  • The source2$value will then start to be reflected in the output until the source2$is complete
  • When source2$completes, result$Observable will also complete
  • Note that we can pass any number of Observables to concat(), not just the two in this example

The key point about Observable Concatenation

As we can see, Observable wiring is all about Observable completion! We take the first Observable and use its value, wait for it to finish, then we use the next Observable, and so on, until all Observables are finished.

Returning to our example of higher-order Observable mapping, let’s see how the concatenation concept helps us.

Using Observable Concatenation to implement sequential saves

As we have seen, to ensure that our form values are stored in order, we need to take each form value and map it to an HttpPost $Observable.

Then we need to subscribe to it, but we want to finish saving before we subscribe to the next HttpPost $Observable.

In order to ensure sequentiality, we need to concatenate the multiple httpPost$ Observables together!

We will then subscribe to each HttpPost $and process the results of each request in order. Finally, what we need is an operator that mixes the following:

  • A high-level mapping operation (takes the form value and converts it to HttpPost $Observable)
  • Using the concat() operation, concatenate multiple HttpPost $Observables together to ensure that the next HTTP save is not made until the previous in-progress save is first completed.

What we need is a properly named RxJS concatMap Operator that mixes higher-order mappings and Observable joins.

The RxJs concatMap Operator

The code is as follows:

this.form.valueChanges .pipe( concatMap(formValue => this.http.put(`/api/course/${courseId}`, formValue)) ) .subscribe( saveResult => ... handle successful save ... , err => ... handle save error ... ) ;

As we have seen, the first advantage of using a higher-order mapping operator like concatMap is that now we no longer have nested subscriptions.

By using ConcatMap, all form values are now sent to the back end in order, as shown in the Chrome DevTools Network TAB:

Breaking down the concatMap network log diagram

As we have seen, a save HTTP request is started only after the last save is complete. Here’s how the ConcatMap operator ensures that requests always occur in order:

  • ConcatMap is taking each form value and converting it into a saved HTTP Observable, called an internal Observable
  • ConcatMap then subscribes to the internal Observable and sends its output to the result Observable the second form value may be faster than saving the previous form value on the back end
  • If this happens, the new form value will not be immediately mapped to the HTTP request
  • Instead, ConcatMap waits for the previous HTTP Observable to complete, then maps the new value to the HTTP Observable, subscribes to it, and thus triggers the next save

Observable Merging

Applying an Observable in tandem with a series of HTTP save operations seems like a good way to ensure that saves occur in the expected order.

But in other cases, we want to run in parallel without waiting for the previous internal Observable to complete.

To do this, we have the Merge Observable composite strategy! Unlike Concat, Merge does not wait for an Observable to complete before subscribing to the next Observable.

Instead, the merge subscribes to each merged Observable simultaneously and then arrives with multiple values over time, and it outputs the values of each source Observable to the combined resulting Observable.

Practical Merge Example

To make it clear that the merge does not depend on completion, let’s merge two Observables that are never completed, since they are Interval Observables:

const series1$ = interval(1000).pipe(map(val => val*10));

const series2$ = interval(1000).pipe(map(val => val*100));

const result$ = merge(series1$, series2$);

result$.subscribe(console.log);

An Observable created with interval() emits the values 0, 1, 2, and so on every second and never completes.

Note that we applied several map operators to these interval Observables, just to make it easier to distinguish them from each other in the console output.

Here are the first few values visible in the console:

00 10 100 20 200 30 300

Merging and Observable Completion

As we can see, the value of the merged source Observable is immediately displayed in the resulting Observable when emitted. If one of the merged Observables completes, the merge will continue to emit the values that the other Observables arrive at over time.

Note that if the source Observables are completed, the merge will still work the same way.

The Merge Marble Diagram

Let’s take another example:

As we can see, the values of the merged source Observables are immediately displayed in the output. The resulting Observable will not be completed until all merged Observables are completed.

Now that we know the merge strategy, let’s see how it can be used in the context of a higher-order Observable map.

Going back to our earlier form draft save example, it’s clear that we need ConcatMap instead of MergeMap in this case, because we don’t want the save to happen in parallel.

Let’s see what happens if we accidentally choose Mergemap:

this.form.valueChanges .pipe( mergeMap(formValue => this.http.put(`/api/course/${courseId}`, formValue)) ) .subscribe( saveResult => ... handle successful save ... , err => ... handle save error ... ) ;

Now assume that the user interacts with the form and starts entering data fairly quickly. In this case, we now see multiple save requests running in parallel in the network log:

As we can see, the requests happen in parallel, which in this case is an error! Under high loads, these requests may be processed out of order.

Observable Switching

Now let’s talk about another Observable composition strategy: switching. The concept of switching is closer to merging than concatenating because we don’t wait for any Observable to terminate.

But when switching, unlike merging, if a new Observable starts emitting values, we unsubscribe the previous Observable before subscribing to the new Observable.

The Observable switch ensures that the unsubscribe logic for unused Observables is triggered to free resources!

Switch Marble Diagram

Pay attention to the diagonals, these are not accidental! In the case of a switching strategy, it is important to represent a high-order Observable in the graph, which is the top line of the image.

The value emitted by this high-order Observable is itself an Observable.

The moment the diagonal forks from the top line of the higher-order Observable is the moment the Observable value is emitted and subscribed to by the switch.

Breaking down the switch Marble Diagram

Here’s what happened in this picture:

  • The higher-order Observable issues its first internal Observable (a-b-c-d), which is subscribed (via switch policy).
  • The first internal Observable (a-b-c-d) emits the values a and b, which are immediately reflected in the output
  • But then a second internal Observable (E-F-G) is emitted, which triggers the unsubscription of the first internal Observable (A-B-C-D), which is a key part of the switch
  • Then a second internal Observable (e-f-g) starts emitting new values, which are reflected in the output
  • Notice, however, that the first internal Observable (a-b-c-d) is still emitting new values c and d at the same time
  • However, these later values are not reflected in the output because we also unsubscribed the first internal Observable (a-b-c-d)

We can now understand why we have to plot the chart in this unusual way, using diagonals: this is because we need to visually represent each internal Observable as it is subscribed (or unsubscribed), which happens diagonally from the source of a high-order Observable.

The RxJs switchMap Operator

Let’s then take the switch strategy and apply it to higher-order mappings. Suppose we have a normal input stream that emits the values 1, 3, and 5.

We then map each value to an Observable, as we did in the case of Concatmap and MergeMap, and get a higher-order Observable.

If we now switch between the emitted internal Observables, instead of joining or merging them, we’ll end up with the switchMap operator:

Breaking down the switchMap Marble Diagram

Here’s how the operator works:

  • The source Observable emits the values 1, 3, and 5
  • These values are then converted to Observables by applying mapping functions
  • The internal Observable of the map is subscribed to by the SwitchMap
  • When an internal Observables emits a value, the value is immediately reflected in the output
  • But if a new value like 5 is emitted before the previous Observable has a chance to complete, the previous internal Observable (30-30-30) will be unsubscribed, and its value will no longer be reflected in the output
  • Notice the red 30-30-30 internal Observable in the figure above: the last 30 values were not emitted because the 30-30-30 internal Observable was unsubscribed

As we’ve seen, the Observable switch is to ensure that we never use an Observable that triggers unsubscribe logic. Now let’s see the switchMap in action!

Search TypeAhead – switchMap Operator Example

A very common use case for switchMap is searching for Typeahead. First let’s define the source Observable, whose value itself will trigger the search request.

The source Observable emits values, which are the search text that the user typed in the input:

const searchText$: Observable<string> = 
      fromEvent<any>(this.input.nativeElement, 'keyup')
    .pipe(
        map(event => event.target.value),
        startWith('')
    )
    .subscribe(console.log);

This source Observable links to the input text field in which the user types his search. These are the values emitted by searchText$when the user enters the word “Hello World” as a search:

Debouncing and removing duplicates from a Typeahead

Note the repetition of values, either due to the use of Spaces between two words, or the use of the Shift key to capitalize the letters H and W.

To avoid sending all of these values to the back end as a single search request, let’s use the debounceTime operator to wait for user input to stabilize:

const searchText$: Observable<string> = 
      fromEvent<any>(this.input.nativeElement, 'keyup')
    .pipe(
        map(event => event.target.value),
        startWith(''),
        debounceTime(400)
    )
    .subscribe(console.log);

With this operator, if the user types at normal speed, the output of searchText$now has only one value: Hello World

This is already much better than our previous one, which now only emits values when stable for at least 400 milliseconds!

But if the user is typing slowly when considering the search, so that more than 400 milliseconds are needed between two values, the search flow might look like this:

In addition, the user can type in a value, press the backspace key and type again, which may result in duplicate search values. We can prevent duplicate searches from occurring by adding the distinctuntilChanged operator.

Cancelling obsolete searches in a Typeahead

But more importantly, we need a way to cancel the old search as a new one begins.

What we’re doing here is converting each search string to a back-end search request and subscribing to it, and applying a switch policy between two consecutive search requests, canceling the previous search if a new one is triggered.

That’s exactly what the switchMap operator will do! Here is the final implementation of the Typeahead logic using it:

const searchText$: Observable<string> = 
      fromEvent<any>(this.input.nativeElement, 'keyup')
    .pipe(
        map(event => event.target.value),
        startWith(''),
        debounceTime(400),
        distinctUntilChanged()
    ); 

const lessons$: Observable<Lesson[]> = searchText$
    .pipe(
        switchMap(search => this.loadLessons(search))        
    )
    .subscribe();

function loadLessons(search:string): Observable<Lesson[]> {
    
    const params = new HttpParams().set('search', search);
   
    return this.http.get(`/api/lessons/${coursesId}`, {params});
}

switchMap Demo with a Typeahead

Now let’s look at the switchMap operator in action! If a user types in the search bar and then hesitates to type something else, we usually see the following in a web log:

As we’ve seen, some of the previous searches have been canceled while in progress, which is great because this frees up server resources that can be used for other things.

The Exhaust Strategy

The switchMap operator is ideal for the pre-input scenario, but in other cases, what we want to do is ignore the new value in the source Observable until the previous value has been fully processed.

For example, suppose we are triggering a back-end save request in response to clicking the save button. We might first try to do this using the concatMap operator to ensure that the save operations occur in order:

fromEvent(this.saveButton.nativeElement, 'click')
    .pipe(
        concatMap(() => this.saveCourse(this.form.value))
    )
    .subscribe();

This ensures that the save is done in order, but what happens if the user clicks the save button multiple times? Here’s what we’ll see in the weblog:

As we’ve seen, each click triggers its own save: if we click 20 times, we get 20 saves! In this case, we want to do more than just make sure we save in order.

We also want to be able to ignore clicks, but only if the save is already in progress. The exhaust Observable combination strategy will allow us to do this.

Exhaust Marble Diagram

Just like before, we have a higher-order Observable on the first row, whose value is itself an Observable, branched out from the first row. Here’s what happened in this picture:

  • Just like in the case of Switch, Extreme subscribes to the first internal Observable (a-b-c) and as usual, the values a, b, and c are immediately reflected in the output
  • It then emits a second internal Observable (D-E-F) while the first Observable (A-B-C) is still in progress
  • The second Observable is discarded by the emission policy and is not subscribed (this is a key part of the emission). The exhaust policy subscribes to a new Observable only after the first Observable (A-B-C) is completed
  • When the third Observable (G-H-I) is emitted, the first Observable (A-B-C) is completed, so the third Observable will not be discarded but will be subscribed
  • Then, the value of the third Observable, g-h-i, is displayed in the output of the resulting Observable, unlike the value d-e-f, which does not exist in the output

Just as in the case of Concat, Merge, and Switch, we can now apply Exclusive policies in the context of higher-order mappings.

The RxJs exhaustMap Operator

Now let’s look at the pinball diagram of the HustMap operator. Let’s remember that unlike the first line in the figure above, the values emitted by the source Observable 1-3-5 are not Observable.

Instead, these values can be such as mouse clicks:

So here’s what happened in the case of the emission map:

  • Emit the value 1 and create an internal Observable 10-10-10
  • Observable 10-10-10 emits all values and does this before emits the value 3 in the source Observable, so all 10-10-10 values are emitted in the output
  • Emits a new value of 3 in the input, triggering a new 30-30-30 internal Observable
  • But now, while 30-30-30 is still running, we get a new value of 5 in the source Observable
  • This value of 5 is discarded by the exhaust policy, which means that a 50-50-50 Observable is never created, so the 50-50-50 value never appears in the output

A Practical Example for exhaustMap

Now let’s apply the new chaustMap Operator to our save button scenario:

fromEvent(this.saveButton.nativeElement, 'click')
    .pipe(
        exhaustMap(() => this.saveCourse(this.form.value))
    )
    .subscribe();

If we click Save now, say 5 times in a row, we will get the following weblog:

As we can see, the clicks we made while the save request was still in progress were ignored, as expected!

Note that if we click for example 20 times in a row, eventually the in-progress save request will be completed and then the second save request will start.

How to choose the right mapping Operator?

ConcatMap, Mergemap, SwitchMap, and ChaustMap behave similarly in that they are all higher-order mapping operators.

But it is also so different in so many subtle ways that virtually no operator can safely point to the default.

Instead, we can simply choose the appropriate operator based on the use case:

  • If we need to do things sequentially while waiting for them to be done, then Concatmap is the right choice
  • For parallel processing, Mergemap is the best choice
  • SwitchMap is the way to go if we need to cancel logic
  • To ignore new Observables while the current one is still in progress, this is what exUSTMap does

conclusion

As we have seen, RXJ’s higher-order mapping operators are essential for performing some very common operations (such as network calls) in reactive programming.

To really understand these mapping operators and their names, we first need to focus on the underlying Observable composition policies Conmat, Merge, Switch, and Exclusive.

We also need to be aware that a higher-order mapping operation is taking place, where values are converted to separate Observables, and these Observables are subscribed to in a hidden way by the mapping operator itself.

Choosing the right operator is choosing the right internal Observable combination strategy. Choosing the wrong operator does not usually cause the program to break immediately, but over time it can cause problems that are difficult to resolve.

More of Jerry’s original articles can be found on “Wang Zixi “: