Lukas Marx
August 25, 2017

Learn how to use RxJS in Angular Effectively

RxJs is not easy. To be honest, it is sometimes pretty hard to understand. Especially in the beginning. So why should you even bother?

Yes, RxJs can be complex. Very complex sometimes.

But it also offers some great features that let you think in a different way. It enables you to simplify asynchronous functions, with much more flexibility than promises do.

It also lets you write functions, that not only return one but a whole stream of values.

In this article, we will discover how we can utilize RxJs in angular to create reactive angular applications.

So let's get started!


Why should you use RxJs (in Angular)?

Well, let me answer it this way: Angular uses it. You can see it in action in services like the HttpClient or in the @Input / @Output decorators of your component. So even if you don’t see any advantages in the library, there is no easy way around it. But it is not all bad.

Once you have figured out the basics, you will see RxJs in a different light. You will see the advantages of the stream-based approach and understand, why it plays such an important role in the angular framework.


Observers & Subjects

RxJs uses the observer pattern. Don’t know this pattern yet? No problem! It is quite simple.

The observer pattern

Let’s take a TV as an example. All it does is waiting for new signals to come in.  Then it processes them and updates the image on the screen. It can not go ahead and send data back to the television station.

The TV observes the signal from the television station. Whenever the station sends a signal, the TV updates.

This does not only work between one TV and one television station. Instead, there are thousands of TV, that are all subscribed to the station's signal.


The Observer pattern works just the same. Usually, there is a so-called subject. The subject is the television station from our example. On the other hand, there are also observers. These are the TVs.

To get notified about new data, the observer subscribes to the subject. One subject can have many subscribed observers.

It is the responsibility of the subject, to publish new data to the clients. Observers can not send any data back. They also don’t know about possible other subscribers.


The Rx implementation follows the pattern described above. However, some terms and details are different. So let’s take a look at how to subscribe to subjects in RxJs.

First of all, there are a two different version of the 'subject' from the pattern. The first one is called Subject. With the subject, you can subscribe to messages, but also push new data to the stream. Similar to the television station, but with a key of the front door. You can walk in and broadcast your favorite show whenever you want. The subject should not be passed around in your application, to avoid confusion where the new data came from. For that, it should be converted to an observable.


On the other hand, there is the observable. The observable is much more limited. Most important, it only allows subscribing to the subject, but is missing the required methods to publish new messages. That way, it can be passed around in your application, since it does not allow publishing new data.


Creating a RxJs Subject

Creating a subject is very easy. Just import the Subject class from the Rx package. This package is included in every angular application, so we do not need to install it separately.

import { Subject } from 'rxjs/Subject'

Next, we create a new instance of the class. Subject is generic, so we have to define the type of the payload. In this example, our stream will only contain strings.

const subject = new Subject<string>()


Subscribing to RxJs Streams

Next, we want to subscribe to our subject, to receive the values that it emits. To do so, just call the subscribe method. It expects a callback function with the value as the first parameter. As the second parameter, you can also register a callback, that is called when an error occurs. That callback gets the error as a parameter. However, having that is optional.

subject.subscribe(value => {
  // value is the value of the received data

As mentioned before, subscribing to the subject itself does totally work. The subject itself however, should never leave the place it was created. For example, you should not return the subject in a function. Otherwise, it becomes hard to understand, where new values enter the stream.

For that reason, the library contains the observable. Thankfully converting a subject to an observable is very easy. Just call the asObservable() method. That way, the caller of the function can only subscribe, but not publish.

import { Observable } from 'rxjs/Observable'

function someFunc(): Observable<number> {
  const subject = new Subject<number>()
  return subject.asObservable()

Hot & Cold

There are two types of observables.  Hot and cold ones. Hot observables are just like the TV station from our example. They always broadcast. It doesn't matter if there is actually a TV somewhere, that receives the signal. Even if there is no one subscribed to them. Also, if you connect to that observable, you can be at any point in the stream. Missed values, before you connected, cannot be accessed.

Cold observables, on the other hand, are like watching a DVD. They start when you want, typically from the beginning. Cold observables do not produce any output unless there is somebody subscribed to them. They start pushing values to the stream when the subscribe method is called.

Why is this important? There are some services in the angular framework, that return cold observables. One example is the HttpClient. If you don't subscribe to the result of a request, the request is not actually made!

For some detailed explanations and examples see the documentation on GitHub.


The subscribe() method returns a subscription. Make sure to unsubscribe from that at some if you no longer need it. Otherwise, you create immortal objects and memory leaks.

A good point in angular to unsubscribe from observables is the ngOnDestroy lifecycle event.

import { Subject } from "rxjs/Subject";
import { Observable } from "rxjs/Observable";
import { Subscription } from "rxjs/Subscription";

private subscription: Subscription
  this.subscription = new Subject().subscribe();



Publishing Data with Subjects

To receive data, somebody has to actually send the data, right? To do so, we use the subject, since the observable is not capable of sending data to the stream.

To push data to the observable's data stream, we call the next() method. As the parameter, we pass in the value/object we want to broadcast.

const subject = new Subject<string>()'string 1')


RxJs Operators: Create a pipeline!

RxJs is not only a great way to decouple your code and handle async broadcasting. It also provides a bunch of operators, that can be used to modify observable streams. However, these operators are where it can get very complex quickly. The good news?

You don't need those. Actually, you probably will never use most of them. I don't say that you shouldn't. Try all of them if you want to. You can see the full list of operators here. However, they can be difficult to get your head around. In this article, I will only show you the most common and useful of them.


You probably know already. The RxJs map operator does exactly the same. You provide it with a function, that gets applied to all elements that enter the stream before they reach the subscriber.


The following example increases the value of all numbers of the stream by 1.

Also, make sure the operator from the add directory. That way, every operator can be imported separately, to keep the bundle size as small as possible.

With RxJs 5.5, the team has introduced a new concept called letable operators. Basically what this means is, instead of chaining all your operators on an observable, we use a new method called pipe() to pass in the operators we want to apply.

Using the map-function, the example looks like this:

import { map } from 'rxjs/operators'

const observable$ = subject.asObservable()
observable$.pipe(map(x => x + 1)).subscribe()

But why did they change it? The new approach has two major benefits. The main difference to the old is that the operators are now standalone functions. They are not prototype methods of observable anymore.

That allows for bundling them into so-called ECMAScript modules. See how the import call is different? That's because of the new module format. This format allows for better tree-shaking (keeping unused code out of your application) which results in a smaller application size.

Furthermore, it has become very easy to write your own operators. As operators are just standalone functions now, you can just create your own. Every function that takes an observable and returns one will do.

For more details, see the official GitHub page on lettable operators.


With the help of the filter operator, it is possible to filter the elements of a stream using a condition. Just like the map operator, the filter operator expects a function. This time, the function has to return either true or false. If you return false, the element is not sent to the subscribers.


In this example, all the stream only contains numbers greater than 1.

import { filter } from 'rxjs/operators'

    filter(x => {
      return x > 0 ? true : false


Errors in RxJs

Since RxJs streams are highly asynchronous, trying to catch errors with a simple try/catch would not work. Because of that, RxJs introduces a home-brewed error system. Errors can be any Javascript object, that is sent to the stream, as well. Other than usual stream elements, the error does not trigger the first, but the second callback passed to the subscribe method. In there, we can react to the error accordingly.

Handling Errors

To get notified about errors, the observable subscribe method offers us to pass in a second callback. This callback gets executed, when an error occurs.

observable$.subscribe(value => {}, error => {})

Although we get notified of the error, we are not actually handling it. Unhandled errors cause the stream to get terminated. That means, that no more elements can get send through it. To prevent that, we have to catch the error. We can do so, by using the catch operator.

observable$.catch(() => Observable.of(0)).subscribe()

The example above, returns a new Observable with the value of null, when an error occurs. We have to return an observable, since the old observable is terminated.

If we want to inspect the error, we can do so, adding a parameter to our callback.

  .catch(e => {
    if (e) return Observable.of(0)

Throwing Errors

Throwing errors is so easy, that is actually not worth the headline. Just call the error function of the subject and pass any object as a parameter.

subject.error(new Error())

Why should you use RxJs in Angular?

Now that we understand the basics, let's take a look at a real example. As I mentioned before, angular uses RxJs in many different modules. One application of RxJs is the HttpClient.

The HttpClient uses observables to return values from HTTP requests.

constructor(private http: HttpClient) { }

    this.http.get('https://...').subscribe(value => {
      // calue contains the servers response (parsed from JSON by default)

Do you want to learn more about the HttpClient? Get started with this article about the angular HttpClient.


In this article, we took a look at the basics of RxJs. I only included the aspects of RxJs that are relevant for your daily angular development. I kept the complex stuff away from you, as good as possible. Of course, this article does not cover even half of the libraries' functionalities, because of that. If you want to go ahead and learn more about RxJs, you can do so at the official RxJs manual.

I hope you liked this article. If you did, click that button below, and share it with your friends and colleges!

Never miss a post, by following me on twitter @malcoded.

Leave a comment

We save your email address, your name and your profile picture on our servers when you sing in. Read more in our Privacy Policy.