Using RxJs and Angular 2 in order to deal with server-sent events
Asked Answered
F

3

12

I am trying to display server-sent events emitted values in an angular 2 /RxJs app.

The backend regularly sends individual strings to the client through server-sent events.

I am not sure how to deal with the retrieved values on the angular 2/RxJs side.

Here is my client (a ng component):

import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable}     from 'rxjs/Observable';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings | async">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    constructor(private http:Http) {
    }

    errorMessage:string;
    someStrings:string[];

    ngOnInit() {
        this.getSomeStrings()
            .subscribe(
                aString => this.someStrings.push(aString),
                error => this.errorMessage = <any>error);
    }

    private getSomeStrings():Observable<string> {
        return this.http.get('interval-sse-observable')
            .map(this.extractData)
            .catch(this.handleError);
    }

    private extractData(res:Response) {
        if (res.status < 200 || res.status >= 300) {
            throw new Error('Bad response status: ' + res.status);
        }
        let body = res.json();
        return body || {};
    }

    private handleError(error:any) {
        // In a real world app, we might send the error to remote logging infrastructure
        let errMsg = error.message || 'Server error';
        console.error(errMsg); // log to console instead
        return Observable.throw(errMsg);
    }
}

The backend method is as follows (and uses RxJava):

   @ResponseStatus(HttpStatus.OK)
   @RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
    public SseEmitter tickSseObservable() {
        return RxResponse.sse(
                Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                        .map(tick -> randomUUID().toString())
        );
    }

I just noticed that the app hangs on the request and that nothing is displayed on the page.

I suspect there is an issue with my use of the map method i.e. .map(this.extractData).

I would just like to add the incoming strings to the array and display that array in the template which would update as the strings come in.

Can anyone please help?

edit: Here is a working solution (thanks to Thierry's answer below):

import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    someStrings:string[] = [];

    ngOnInit() {
        let source = new EventSource('/interval-sse-observable');
        source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
    }
}
Footworn answered 23/4, 2016 at 15:30 Comment(1)
Can you pls let me know if you were able to get this working with angular 4? I use import { Component, OnInit } from '@angular/core'; and it does not find EventSource class. Can you pls let me know if you hve installed any other custom package? Would appreciate if you can share the complete code in github or so, for reference. Thanks.Bucentaur
P
11

You can't use the Http class of Angular2 to handle server side events since it's based on the XHR object.

You could leverage the EventSource object:

var source = new EventSource('/...');
source.addListener('message', (event) => {
  (...)
});

See these articles:

Paraffinic answered 23/4, 2016 at 19:10 Comment(1)
Hi Thierry, This is a slightly off topic question: is it encouraged to use SSE today? Is SSE going to be superseded by http2? Are there better alternatives to SSE? I noticed that some browsers don't support SSE at all...Footworn
W
28

Here is a working example :

SseService

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';

declare var EventSource;

@Injectable()
export class SseService {

    constructor() {
    }

    observeMessages(sseUrl: string): Observable<string> {
        return new Observable<string>(obs => {
            const es = new EventSource(sseUrl);
            es.addEventListener('message', (evt) => {
                console.log(evt.data);
                obs.next(evt.data);
            });
            return () => es.close();
        });
    }
}

AppComponent

import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>Angular Server-Sent Events</h1>
    <ul>
        <li *ngFor="let message of messages">
             {{ message }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit, OnDestroy {
    private sseStream: Subscription;
    messages:Array<string> = [];

    constructor(private sseService: SseService){
    }

    ngOnInit() {
        this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
                        .subscribe(message => {
                            messages.push(message);
                        });
    }

    ngOnDestroy() {
        if (this.sseStream) {
            this.sseStream.unsubscribe();
        }
    }
}
Welltimed answered 29/12, 2017 at 16:2 Comment(4)
A bit of explanation would've been nice, but I cant stop from upvoting this anyway :)Florina
Very Sexy! Self explanatory, very nicely done abahet. Perfect, Thank you!Cassilda
how would you add this into NGRX Effects?Benavidez
Any example of how to use the returned close()?Claviform
P
11

You can't use the Http class of Angular2 to handle server side events since it's based on the XHR object.

You could leverage the EventSource object:

var source = new EventSource('/...');
source.addListener('message', (event) => {
  (...)
});

See these articles:

Paraffinic answered 23/4, 2016 at 19:10 Comment(1)
Hi Thierry, This is a slightly off topic question: is it encouraged to use SSE today? Is SSE going to be superseded by http2? Are there better alternatives to SSE? I noticed that some browsers don't support SSE at all...Footworn
A
1

To add to Thierry's answer, By Default the event type is 'message'. However the event type could be anything like ('chat', 'log' etc.,) based on the server side implementation. In my case, the first two events from the server were 'message' and the rest of them were 'log'. My code looks as below

var source = new EventSource('/...');
source.addListener('message', message => {
  (...)
});
source.addListener('log', log => {
  (...)
});
Automate answered 26/1, 2018 at 7:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.