Understanding of Subjects in Angular

Understanding of Subjects in Angular

ยท

6 min read

They are two questions that come to mind before starting this topic, what are they? and when to use what?

When I heard the name "Subjects" in reactive programming I faced many difficulties in understanding when to use what type of subject. In this article, I will be going through the kinds of subjects.

First of all, RxJS (Reactive Extensions for JavaScript) is a framework for reactive programming that uses Observable, making the smooth path to write asynchronous code, callbacks, and event-based programs.

What is an Observable?

Let's first know what is a stream.Stream basically values or events that are emitted over time. Observable is a function that converts the ordinary stream of data into an observable stream of data. You can think of observable as a wrapper around the ordinary stream of data.

They are two ways to create an observable:

  1. using the constructor (new Observable()) and,
  2. using Observable.create() method

What is an Observer?

The Observable on its own is useless unless someone consumes the value emitted by the observable. Those who consume the values are called Observers or Subscribers.

The Observer must subscribe with the observable to receive the value from the observable. While subscribing it optionally passes the three callbacks next(), error(), and complete().

Observable.png

What is Subject?

A Subject is a special type of Observable that allows value to be multicasted to many observers. They can be observable and observers both. The subject is similar to EventEmitters, they just emit the same value to all its subscribers and maintain a registry of many listeners.

If we're using the subject as an observable to emit a value of its subscriber they using the .next(value) method.

Now, we understand the subject with an example:

import { Component, OnInit, VERSION } from '@angular/core';
import { Subject, BehaviorSubject, Observable } from 'rxjs';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit {
  mySubject = new Subject<string>();

  ngOnInit() {
    // Subscriber One
    this.mySubject.subscribe((value) =>
      console.log('First observer: ' + value)
    );

    // Subscriber Two
    this.mySubject.subscribe((value) =>
      console.log('Second observer: ' + value)
    );

    // Feeding the value into the observable
    this.mySubject.next('Hello RxJS');
    this.mySubject.next('Angular');
  }
}

In the above code, we created a subject and there are two observers (or subscribers) attached. We feed some values into the subject and on emitting the first value this.mySubject.next('Hello RxJS'); both the subscribers receive it at the same time.

Observable is generally unicast or cold, but Subject is multicast or hot.

Unicast

Untitled design (1).png

  • An Observable is unicast.

  • An Observer and its Subscriber have a one-to-one relationship. Each subscribed Observer owns an independent execution of the Observable.

Multicast

Untitled design (1).png

  • A Subject allows values to be multicasted to many Observers. A Subject and its subscribers have a one-to-many relationship.

Types of Subjects

1. BehaviorSubject

A BehaviorSubject holds some initial value. When it is subscribed it emits the value immediately.

BehaviorSubject has the capability to store the last emitted value. Whenever a new subscriber comes and subscribes it will get the last emitted value, even if it comes much later than the subject emitted value. The real-world usage of BehaviorSubject is when we need to share the data between two unrelated components.

import { Component, OnInit, VERSION } from '@angular/core';
import { Subject, BehaviorSubject, Observable } from 'rxjs';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit {
  myBehaviorSubject = new BehaviorSubject('Hi');

  ngOnInit() {
    //Subscriber 1
    this.myBehaviorSubject .subscribe({
      next: (value) => console.log('First observer: ' + value),
    });

    // Feeding the value into the observable 1
    this.myBehaviorSubject .next('Hello');

    //Subscriber 2
    this.myBehaviorSubject .subscribe({
      next: (value) => console.log('Second observer: ' + value),
    });

    // Feeding the value into the observable 2
    this.myBehaviorSubject .next('Bye');
  }
}

In the above example, we created a BehaviorSubject and two subscribers, Subscriber 1 getting the initial value ("Hi").

Then, we feed the value into Observable 1 so, now these value emits and notify the subscriber of the latest value. After emitting there is one more subscriber, and that subscriber gets the last emitting value, if there is nothing emitted it will get the default or initial value.

2. ReplaySubject

ReplaySubject is a variant of the subject. It buffers old values and emits those values to a new subscriber.

The ReplaySubject will store its values into the buffers if any new subscriber comes it emits those values. ReplaySubject provides the capability of configuring the buffer using the arguments bufferSize and windowTime.

bufferSize: Number of items that replaysubject will keep in its buffer. Default Infinity.

windowTime: The amount of time to keep the values in the buffer. Default Infinity

Let's understand with some examples:

import { Component, OnInit, VERSION } from '@angular/core';
import { ReplaySubject } from 'rxjs';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit {

  myReplaySubject$ = new ReplaySubject();

  ngOnInit() {
   this.myReplaySubject$.next("Hello, Guys.....");
   this.myReplaySubject$.next("How are you doing?");
   this.myReplaySubject$.next("Is everyone enjoying this rainy season or not?");
   this.myReplaySubject$.next("Is anyone taking a booster dose?");

   this.myReplaySubject$.subscribe((message) => {
     console.log("User 1: " + message)
   })
  }

  // Output
  /* User 1: Hello, Guys.....
     User 1: How are you doing?
     User 1: Is everyone enjoying this rainy season or not?
     User 1: Is anyone taking a booster dose?
  */
}

It means it holding the message for a new subscriber. This is the default behavior of ReplaySubject.

Now, suppose we don't want all the values to need to be held we want only the last 2 values to emit for my new subscriber so that can also be done with ReplaySubject and we do with the first argument of ReplaySubject i.e. bufferSize

If we did not pass any value it means you want to buffer all the values. myReplaySubject$ = new ReplaySubject();

myReplaySubject$ = new ReplaySubject(2); now this observable hold last 2 values in buffer for it's new subscriber.

import { Component, OnInit, VERSION } from '@angular/core';
import { ReplaySubject } from 'rxjs';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit {

  myReplaySubject$ = new ReplaySubject(2);

  ngOnInit() {
   this.myReplaySubject$.next(1);
   this.myReplaySubject$.next(2);
   this.myReplaySubject$.next(3);
   this.myReplaySubject$.next(4);

   this.myReplaySubject$.subscribe((message) => {
     console.log("User 1: " + message)
   });

   this.myReplaySubject$.next(5);
   this.myReplaySubject$.next(6);

   this.myReplaySubject$.subscribe((message) => {
    console.log("User 2: " + message)
  });
  }

  // Output
  /* User 1: 3
     User 1: 4
     User 1: 5
     User 1: 6
     User 2: 5
     User 2: 6
  */
}

3. AsyncSubject

AsyncSubject emits its last value only on COMPLETION. The last value of the Observable execution is sent to its observers, only when the execution completes.

import { Component, OnInit, VERSION } from '@angular/core';
import { AsyncSubject, ReplaySubject } from 'rxjs';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit {

  myAsyncSubject$ = new AsyncSubject();

  ngOnInit() {

   this.myAsyncSubject$.subscribe((message) => {
     console.log("User 1: " + message)
   });

   this.myAsyncSubject$.next(1);
   this.myAsyncSubject$.next(2);


   this.myAsyncSubject$.subscribe((message) => {
    console.log("User 2: " + message)
   });

   this.myAsyncSubject$.next(3);
   this.myAsyncSubject$.next(4);

   this.myAsyncSubject$.complete()
  }

  // Output
  /* User 1: 4
     User 2: 4
  */
}

Here, both the subscribers are getting only one (last emitted) value when the Subject calls the complete().

Hope you guys get a basic understanding of the subject in RxJS. Next time to use these subjects in your code not get much difficulty ๐Ÿ‘จโ€๐Ÿ’ป๐Ÿคž

Did you find this article valuable?

Support Swaraj Kumar by becoming a sponsor. Any amount is appreciated!

ย