Add queueing to Angular's HttpClient
Asked Answered
P

5

8

I have exact same requirement as mentioned in Add queueing to angulars $http service but need implementation in Angular 4.3 or 5 using the HttpInterceptor from @angular/common/http.

I have a very quirky API that can only handle a single request at a time for a particular browser session. Therefore, I need to ensure that every time a request is made in same session, it goes into a queue, and that queue is executed one request at a time, until it is empty.

Parsee answered 29/12, 2017 at 11:14 Comment(2)
So what have you tried and what exactly is the problem with it? SO isn't a code-writing service.Strachan
So... what happens?Strachan
P
11

Solution


@Zlatko has suggested correct approach, although there are few logical and syntax issues in it but I have corrected it pasting below a working code:

import { Injectable } from '@angular/core';
import { Response } from '@angular/http';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject'

export class PendingRequest {
  url: string;
  method: string;
  options: any;
  subscription: Subject<any>;

  constructor(url: string, method: string, options: any, subscription: Subject<any>) {
    this.url = url;
    this.method = method;
    this.options = options;
    this.subscription = subscription;
  }
}

@Injectable()
export class BackendService {
  private requests$ = new Subject<any>();
  private queue: PendingRequest[] = [];

  constructor(private httpClient: HttpClient) {
    this.requests$.subscribe(request => this.execute(request));
  }

  /** Call this method to add your http request to queue */
  invoke(url, method, params, options) {
    return this.addRequestToQueue(url, method, params, options);
  }

  private execute(requestData) {
    //One can enhance below method to fire post/put as well. (somehow .finally is not working for me)
    const req = this.httpClient.get(requestData.url)
      .subscribe(res => {
        const sub = requestData.subscription;
        sub.next(res);
        this.queue.shift();
        this.startNextRequest();
      });
  }

  private addRequestToQueue(url, method, params, options) {
    const sub = new Subject<any>();
    const request = new PendingRequest(url, method, options, sub);

    this.queue.push(request);
    if (this.queue.length === 1) {
      this.startNextRequest();
    }
    return sub;
  }

  private startNextRequest() {
    // get next request, if any.
    if (this.queue.length > 0) {
      this.execute(this.queue[0]);
    }
  }
}

One can use/call above service following way to make HTTP calls (from any other component or service) (Obviously you need to inject BackendService in the component e.g. Mention in provider of component and define in constructor):

    this.backendService.invoke('https://jsonplaceholder.typicode.com/posts', 'Get', null, null)
    .subscribe(
      result => {
        this.data = result;
      }
    );

In case of someone wants to look at working plunker then here is the working plunker.

Parsee answered 5/1, 2018 at 11:17 Comment(0)
E
5

I have the exact same requirement as you. The other answers work perfectly fine, just that it requires developer to create requests with another a custom service instead of native HttpClient. You could try the following interceptor to apply queuing as well.

This solution requires you to add 2 services, a HttpInterceptor and a service (RequestQueueService) to manage the queue.

HttpInterceptor:

@Injectable()
export class QueueInterceptorService implements HttpInterceptor {
  constructor(private queueService: RequestQueueService) { }

  intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    return this.queueService.intercept(request, next);
  }
}

RequestQueueService:

@Injectable({
  providedIn: 'root'
})
export class RequestQueueService {
  private queue: ReplaySubject<any>[] = [];

  intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    const requestQueueItem$ = new ReplaySubject<any>();
    const result$ = requestQueueItem$.pipe(
      switchMap(() => next.handle(request).pipe(
        tap(req => {
          if (req.type == HttpEventType.Response) {
            this.processNextRequest();
          }
        }),
        catchError(err => {
          this.processNextRequest();
          throw err;
        })
      ))
    );
    this.queue.push(requestQueueItem$);

    if (this.queue.length <= 1) {
      this.dispatchRequest();
    }

    return result$;
  }

  private processNextRequest(): void {
    if (this.queue && this.queue.length > 0) {
      this.queue.shift();
    }
    this.dispatchRequest();
  }

  private dispatchRequest(): void {
    if (this.queue.length > 0) {
      const nextSub$ = this.queue[0];
      nextSub$.next();
      nextSub$.complete();
    }
  }
}

Lastly, in AppModule:

@NgModule({
  declarations: [
    AppComponent
  ],
  imports: [
    BrowserModule,
    HttpClientModule
  ],
  providers: [
    RequestQueueService,
    { provide: HTTP_INTERCEPTORS, useClass: QueueInterceptorService, multi: true }
  ],
  bootstrap: [AppComponent]
})
export class AppModule { }

BTW, I am using Angular 8 with rxjs 6.4.

Expellee answered 5/12, 2019 at 4:17 Comment(2)
Could u provide a demo? I have been using this solution for months with no problems.Expellee
I think its connected to my extra logic, i will check again. And ping you here if its not working. Thank you very much.Juryrig
W
4

The Answer from Ryan Teh works like a charm if you want to generalize all of your http requests in a cue, as it is a Catch All solution.

Here I expand it with the adequate imports for Angular 8.2.4 and rxjs 6.4.0 as there will be many users looking for cut n' paste

My apologies for the creating a new answer instead of a comment, i still don't have enough cred for that.

queue-interceptor.service.ts

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { HttpInterceptor, HttpRequest, HttpHandler, HttpEvent } from '@angular/common/http';
import { RequestQueueService } from './request-queue.service';

@Injectable()
export class QueueInterceptorService implements HttpInterceptor {
  constructor(private queueService: RequestQueueService) { }

  intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    return this.queueService.intercept(request, next);
  }
}

request-queue.service.ts

import { Injectable } from '@angular/core';
import { HttpRequest, HttpHandler, HttpEvent, HttpEventType } from '@angular/common/http';
import { Observable, ReplaySubject } from 'rxjs';
import { tap, catchError, switchMap } from 'rxjs/operators'; 

@Injectable({
  providedIn: 'root'
})
export class RequestQueueService {
  private queue: ReplaySubject<any>[] = [];

  intercept(request: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
    const requestQueueItem$ = new ReplaySubject<any>();
    const result$ = requestQueueItem$.pipe(
      switchMap(() => next.handle(request).pipe(
        tap(req => {
          if (req.type == HttpEventType.Response) {
            this.processNextRequest();
          }
        }),
        catchError(err => {
          this.processNextRequest();
          throw err;
        })
      ))
    );
    this.queue.push(requestQueueItem$);

    if (this.queue.length <= 1) {
      this.dispatchRequest();
    }

    return result$;
  }

  private processNextRequest(): void {
    if (this.queue && this.queue.length > 0) {
      this.queue.shift();
    }
    this.dispatchRequest();
  }

  private dispatchRequest(): void {
    if (this.queue.length > 0) {
      const nextSub$ = this.queue[0];
      nextSub$.next();
      nextSub$.complete();
    }
  }
}

and app.module.ts

import { NgModule } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { AppComponent } from './app.component';
import { HttpClientModule, HTTP_INTERCEPTORS } from '@angular/common/http';
import { RequestQueueService } from './YOUR_SERVICES_DIRECTORY/request-queue.service';
import { QueueInterceptorService } from './YOUR_SERVICES_DIRECTORY/queue-interceptor.service';


@NgModule({
  declarations: [
    AppComponent
  ],
  imports: [
    BrowserModule,
    HttpClientModule
  ],
  providers: [
    RequestQueueService,
    { provide: HTTP_INTERCEPTORS, useClass: QueueInterceptorService, multi: true }
  ],
  bootstrap: [AppComponent]
})
export class AppModule { }
Wreath answered 12/7, 2020 at 14:43 Comment(0)
O
3

You can do this relatively easily. A naive example follows bellow.

It lacks typing etc, it's not elaborate, it has a few weak points, and it would be better to extract the queueing part and http-requesting part into different services or classes, but this should get you started.

interface PendingRequest {
  url: string;
  method: string;
  options: any;
  subscription: Observable<any>;
}

@Injectable()
export class BackendService {
  // This is your private requests queue emitter.
  private requests$: Subject = new Subject();
  private queue: PendingRequest[] = [];

  constructor(private http: HttpClient) {
    // subscribe to that queue up there
    this.requests$.subscribe(request => this.execute(request));
  }

  // This is your public API - you can extend it to get/post/put or specific
  // endpoints like 'getUserProfile()' etc.
  invoke(url, method, options) {
      return this.addRequestToQueue(url, method, params, options);
  }

  private execute(requestData) {
    const req = this.httpClient.request(requestData.method, requestData.url, requestData.options)
      // as a last step, invoke next request if any
      .finally(() => this.startNextRequest());

    const sub = requestData.subscription;
    sub.switchMap(req);

  }

  private addRequestToQueue(url, method, options) {
    const sub = new Subject<any>();
    const request = new PendingRequest(url, method, options, sub)
    // if there are no pending req's, execute immediately.
    if (this.queue.length === 0) {
      this.requests$.next(request);
    } else {
      // otherwise put it to queue.
      this.queue.push(request);
    }
    return sub;
  }

  private startNextRequest() {
    // get next request, if any.
    if (this.queue.length) {
      this.execute(this.queue.shift());
    }
  }
}
Osswald answered 29/12, 2017 at 11:36 Comment(5)
Don't you think here "invoke" method also should return Observable<any>, otherwise the how caller will get the response data? If I try to return Observable<any> to invoke method then ultimately it needs to returned from addRequestToQueue method and I don't know how addRequestToQueue will return Observable<any>Parsee
Yes, forgot about that part. You can add an observable in the queue, then switchMap or something once it gets off the queue and into execution.Osswald
Also, thanks for the downvote, whoever. I've thought the practice is to mention in the comments as to why you're downvoting a particular thing.Osswald
@HiteshShekhada there, subscription and error handling added.Osswald
Zlatko: thanks for directing me to correct approach.Parsee
T
1

I have implemented a simple observable queue class which only subscribes to the next observable once the previous one is done. Using this with Angular's http client will only start the next request once the previous one has completed.

You can use this class in your interceptor.


import { last, Observable, of } from "rxjs";
import { catchError, concatMap, shareReplay } from "rxjs/operators";

export class ObservableQueue {
  private jobQueue: Observable<any> = of(undefined);

  add<T>(obs: Observable<T>): Observable<T> {
    const newJob = this.jobQueue.pipe(
      concatMap(() => obs),
      shareReplay()
    );
    this.jobQueue = newJob.pipe(
      last(),
      catchError(() => of(undefined))
    );
    return newJob;
  }
}

usage:

const queue = new ObservableQueue();
queue(of(1)).subscribe(() => /** process result*/);
queue(of(2)).subscribe(() => /** process result*/);
Tomlinson answered 30/11, 2022 at 11:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.