Rxjs ReplaySubject & AsyncSubject in Angular

macbook air on brown wooden table
Reading Time: 3 minutes

Hi Readers,
In this blog, let’s learn about Rxjs ReplaySubject and AsyncSubject in angular.

The Rxjs ReplaySubject and AsyncSubject are unique observables that act as both observers and observables. They allow us to emit new values to the observable stream using the next method. All the subscribers, who subscribe to the subject will receive the same instance of the subject and hence the same values.

If you are new to Rxjs subjects, refer to this tutorial on Introduction to rxjs Subject and BehaviorSubjects in angular.

What are the ReplaySubjects?

ReplaySubject replays old values to new subscribers when they first subscribe.

The ReplaySubject will store every value it emits in a buffer. It will emit them to the new subscribers in the order it received them. You can configure the buffer using the arguments bufferSize and windowTime

Let’s create a new ReplaySubject 

  • Step 1:  Create a service using the command ng generate service or ng g s and declare a ReplaySubject there
import { Injectable } from '@angular/core';

import { AsyncSubject, ReplaySubject,  } from 'rxjs';

@Injectable({

  providedIn: 'root',

})

export class ApiService {

  replaySubject$ = new ReplaySubject();

  constructor() {}

}
  • Step 2: The ReplaySubject can emit values. We can use the next method to emit the value to its subscribers. Call the complete & error method to raise complete & error notifications.
import { Component, OnInit } from '@angular/core';

import { ApiService } from '../../api.service';

@Component({

  selector: 'app-comp1',

  templateUrl: './comp1.component.html',

  styleUrls: ['./comp1.component.css'],

})

export class Comp1Component implements OnInit {

    constructor(private api: ApiService) {}

    ngOnInit() {

    // ReplaySubject replays old values to new subscribers

    // when they first subscribe.

    this.api.replaySubject$.next("Welcome, This is ReplaySubject")

    this.api.replaySubject$.error("error");

    this.api.replaySubject$.complete();

  }

}
  • Step 3: We subscribe to it just like any other observable and get the value in any component.
import { Component, OnInit } from '@angular/core';

import { ApiService } from './api.service';

@Component({

  selector: 'app-root',

  templateUrl: './app.component.html',

  styleUrls: ['./app.component.css'],

})

export class AppComponent implements OnInit {

  replaySubject:string;

  constructor(private api: ApiService) {}

  ngOnInit() {

    this.api.replaySubject$.subscribe((val:string) => {

      this.replaySubject = val;

    });

  }

}

ReplaySubject is accessible for all unrelated components too.

What is the AsyncSubject?

AsyncSubject only emits the latest value only when it completes. If it errors out then it will emit an error, but will not emit any values.

Let’s create a new AsyncSubject 

  • Step 1:
import { Injectable } from '@angular/core';

import { AsyncSubject, ReplaySubject,  } from 'rxjs';

@Injectable({

  providedIn: 'root',

})

export class ApiService {

  asyncSubject$ = new AsyncSubject();

  constructor() {}
}
  • Step 2 :
import { Component, OnInit } from '@angular/core';

import { ApiService } from '../../api.service';

@Component({

  selector: 'app-comp1',

  templateUrl: './comp1.component.html',

  styleUrls: ['./comp1.component.css'],

})


export class Comp1Component implements OnInit {

  constructor(private api: ApiService) {}

  ngOnInit() {

   // AsyncSubject only emits the latest value only when it completes.

  //if it errors out then it will emit an error, but will not emit any value.

    this.api.asyncSubject$.next(1);

    this.api.asyncSubject$.next(2);

    this.api.asyncSubject$.next(3);

    this.api.asyncSubject$.complete();

  }

}
  • Step 3:
import { Component, OnInit } from '@angular/core';

import { ApiService } from './api.service';

@Component({

  selector: 'app-root',

  templateUrl: './app.component.html',

  styleUrls: ['./app.component.css'],

})

export class AppComponent implements OnInit {

  asyncSubject: string;

  constructor(private api: ApiService) {}

  ngOnInit() {

    this.api.asyncSubject$.subscribe((value:string) => {

      this.asyncSubject = value;

      () => console.log("Sub1 Complete")

    });

  }

}

Result:

In the above example, all the subscribers will receive the value 3 including those who subscribe after the complete event.

But if the AsyncSubject errors out, then all subscribers will receive an error notification and no value.

Conclusion

So, in this blog, we have gone through the introduction of Rxjs ReplaySubject and AsyncSubject. Next, we went through how we can create a ReplaySubject and AsyncSubject in angular. To learn more about Rxjs you can go through the official documentation here.

For more updates, please follow our LinkedIn page- FrontEnd Studio.

Thank you for sticking to the end. If you like the blog, please don’t forget to give a thumbs up and share it. Feel free to share your thoughts about this blog in the comments.

Written by 

Neha Kumari is a Software Consultant at Knoldus Software. Her practice area is Front End(Angular). She is always charged up for new things & learnings. She has robust design and integration with intuitive problem-solving skills. Proficient in TYPESCRIPT, JAVASCRIPT, and CSS/SCSS,HTML . Her hobbies include writing blogs and cooking.