Blog article 2021-01-27 ยท 9 min. read

Controlling multiple HTTP requests with RxJS

There are several situations where a UI-component depends on data from multiple sources. Maybe it's necessary to load the first chunk of data before we can even decide which other data is required. In those cases it can become confusing and complex to control the order and start of execution of all those requests. In this article we will cover mutiple options to better orchestrate a multitude of requests using the reactive programming library RxJS.

TL;DR

Setting the stage

Let's imagine we want to render a page that displays a list of users. For this we send a request to /users to fetch the required data.

interface UserSummary {
  id: number;
  name: string;
}

function loadUsers(): Promise<UserSummary[]> {
  return fetch("/users").then((response) => response.json());
}

We wrote this data loading function with the fetch API, but will wrap the returned promise into an observable in the next step in order to leverage the RxJS operators that will help us control multiple data dependencies. If you are working in an Angular project, you can of course directly load the data with the built-in HTTPClient.

Now, we can get an observable of the users by using the from function from RxJS to turn our promise into an observable that emits the loaded data and completes.

import { from } from "rxjs";

const users$: Observable<UserSummary[]> = from(loadUsers());

users$.subscribe((users) => console.log(users));

Note: Observables are lazy! If noone subscribes to the users$ observable, no request will be issued. Additionally, each subscriber will trigger a separate request to the backend.

As you can see above, we only get back a list of user summaries from our endpoint. Sadly, our product manager wants to display the users avatar next to the user name. If only our project used GraphQL, then we could just add the avatarUri field to the query to get all data with a single request... Howoever, in our case, we need to wait for the list of users to return to issue one request per user, to retrieve the required information by using a new function loadUserDetails:

interface UserDetails {
  id: number;
  name: string;
  avatarUri: string;
  // ...
}

function loadUserDetails(userId: number): Promise<UserDetails> {
  return fetch(`/users/${userId}`).then((response) => response.json());
}

Chaining explicit requests with switchMap

This will be our first hurdle: We want to wait until we got the response to our first request before we start the second request. We could, of course, subscribe to the first observable, and call the loadUserDetails function in the subscriber. This forces us to handle yet another subscription (never leave a subscription unsubscribed), so we'd prefer to have one single RxJS pipeline. To accomplish this, we can use the switchMap operator to transform each value in our users$ observable (a list of UserSummaries) to a new observable of a list of UserDetails.

const userDetails$ = users$.pipe(
  switchMap((userSummaries) => {
    const userDetailObservables = userSummaries.map((s) =>
      from(loadUserDetails(s.id))
    );

    // ...
  })
);

By using switchMap, we can transform each value of the outer observable, into a new inner observable. switchMap guarantees, that whenever a new value is emitted from the outer observable, the inner observable is closed and thrown away, before the next inner observable is created. This means that we discard old userDetailRequests as soon as we get a new list of userSummaries.

Interested in modern web development? Follow me on Twitter so you don't miss new content.

Until now, our userDetails code is incomplete. In the first step of the switchMap we transform each UserSummary into an Observable<UserDetails>. We now have a list of observables, that will each emit a separate UserDetail object. Our task is to somehow combine this list of observables into one single observbable of the underlying list of UserDetails objects. To do this, we have multiple options:

Running requests in parallel with combineLatest

The first scenario is that we want to fire all possible requests at the same time. The browser will then decide, how many requests are really being sent at the same time but we don't have to worry about that:

const userDetails$ = users$.pipe(
  switchMap((userSummaries) => {
    const userDetailObservables = userSummaries.map((s) =>
      from(loadUserDetails(s.id))
    );

    const userDetails$ = combineLatest(userDetailObservables);
    return userDetails$;
  })
);

combineLatest takes an array of observables, directly subscribes to each and waits until all of those observables emit once. Once each emitted, combineLatest emits an array of the last emitted values from each observable until all input observables complete. Since we are only using http requests as sources for our observables, each observable will complete after the first emitted value. This means we could also use forkJoin, which waits until all sources complete before it emits the array of emitted values.

Since both combineLatest and forkJoin instantly subscribe to all source observables (once we subscribe to the combined observable), all requests are triggered (more or less) at the same time. This can cause problems: Browsers have a limit of how many requests can run in parallel (somewhere between 3 and 15, depending on the browser). When we now fill the request pipeline with 100 requests (one for each user), the browser may defer any other incoming request until the first 100 are done, which might block other, more critical, requests on our page. To solve this problem, we could opt to run all of our requests in serial:

Running requests in serial with concat + toArray

const userDetails$ = users$.pipe(
  switchMap(userSummaries => {
    const userDetailObservables = userSummaries.map(s =>
      from(loadUserDetails(s.id))
    );

    const userDetails$ = concat(...userDetailObservables).pipe(toArray());
    return userDetails$;
  })
);

Here we use the concat function from RxJS to chain all observables in a sequence: concat takes multiple observables, subscribes to the first one and emits all its values until it is completed. Once it is completed, it subscribes to the next one, again waiting until it is completed. This goes on until the last request is done. concat produces a new observable emitting all values in a sequence, so that we have to gather them into an array, by using the toArray operator: This gathers all emitted values from an observable and emits all these values as one array once the source completes.

As with the forkJoin operator from earlier, this only works when our source observables complete, since we will get stuck at one observable if it emits an endless sequence of values.

Now, we are able to gently request one resource after the other from our server. Of course, this will drastically increase the loading time of the page, since we are not doing any requests in parallel. Ideally we would combine both approaches so that we can specify a maximum number of parallel requests:

Running requests with a maximum concurrency by combining serial and parallel

With RxJS, this quite complex task becomes very easy to manage: All we have to do is create n chunks of observables by splitting the userDetailObservables into n arrays, while n defines our concurrency parameter. For this task we will use one of Lodash's helper methods chunk. Of course you could also reimplement that for yourself if you want to skip the dependency.

const userDetails$ = users$.pipe(
  switchMap(userSummaries => {
    const userDetailObservables = userSummaries.map(s =>
      from(loadUserDetails(s.id))
    );

    const chunkSize = Math.ceil(userDetailObservables.length / 4);
    const chunkedUserDetailObservables = _.chunk(
      userDetailObservables,
      chunkSize
    );

    // ...
  })
);

All of the observables within one chunk will be run in serial (concat + toArray) and combined into one resulting observable with chunks.map(chunk => ...). Additionally, all chunks themselves will be run in parallel (combineLatest or forkJoin) and combined (chunkResults.flat(1)) when all data is ready:

const userDetails$ = users$.pipe(
  switchMap(userSummaries => {
    const userDetailObservables = userSummaries.map(s =>
      from(loadUserDetails(s.id))
    );

    const chunkSize = Math.ceil(userDetailObservables.length / 4);
    const chunkedUserDetailObservables = _.chunk(
      userDetailObservables,
      chunkSize
    );

    const userDetails$ = combineLatest(
      chunkedUserDetailObservables.map(chunk =>
        concat(...chunk).pipe(toArray())
      )
    ).pipe(map(chunkResults => chunkResults.flat(1)));

    return userDetails$;
  })
);

While still being a bit complex, RxJS allowed us to simply combine our logic for running observables serially or in parallel into a solution where you can specify the number of concurrent request depending on your environment.

Running multiple requests with a max concurrency with mergeMap

After I finished with the above solution, I decided to reasearch a bit more about this problem space and, lo and behold, RxJS already has a builtin solution to this problem: mergeMap. At first I was a bit stumped. I always understood mergeMap like that: Transform every value of an observable into a new observable. When the input observable emits, keep the output observables of the previously emitted inputs while directly subscribing to the next one. This means run in parallel! This is why I ruled mergeMap out for serialization of requests and used concat. However, mergeMap takes an optional second parameter called concurrent. With this, you can specify, how many output observables should be active at the same time. When this maximum number is reached, a new output observable will only be subscribed, when one other output observable completes. This is exactly what we hand roled earlier! This means, that we can rewrite our code like this:

const userDetails$ = users$.pipe(
  switchMap((userSummaries) =>
    from(userSummaries).pipe(
      mergeMap((summary) => from(loadUserDetails(summary.id)), 4),
      toArray()
    )
  )
);

The pipeline follows these steps: Whenever users$ emits a new list of UserSummary objects, we complete previous inner observables and create a new one (switchMap). This new observable emits all single userSummary objects (by using from) and transforms each userSummary into an observable of userDetails (mergeMap). By specifying the concurrent parameter, we define that only 4 userDetails requests should run at the same time. In the end, we gather all completed detail objects into a final array. The nice thing about this solution is that we can freely decide if we need more or less concurrency: By setting the parameter to 1, all requests will run in serial, while setting it to Infinity will result in parallel requests.

What have I learned?

In the end, the most important lesson from this journey was that you can always do a bit more research before diving into the implementation of a solution. However, by trying to rebuild a complex operation like the maximum concurrent requests, you can definitely improve your understanding of a tool and the problems that it is able to solve elegantly.


Discuss on Twitter
Back to all
esveo Academy