Creating an RxJS Observable from a (server sent) EventSource
Asked Answered
F

2

9

I would like to create a RxJs Observable from an EventSource (server sent events).

I tried the following:

import {Component, OnInit} from 'angular2/core';
import {Subject, Observable}  from 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    someStrings:string[] = [];

    ngOnInit() {
        let eventSource = new EventSource('/interval-sse-observable');
        let observable = Observable.create(eventSource);
        observable.subscribe({
            next: aString => this.someStrings.push(aString.data),
            error: err => console.error('something wrong occurred: ' + err)
        });
    }
}

But I get the following exception:

EXCEPTION: Error: Uncaught (in promise): EXCEPTION: TypeError: this._subscribe is not a function in [null]
ORIGINAL EXCEPTION: TypeError: this._subscribe is not a function
ORIGINAL STACKTRACE:
TypeError: this._subscribe is not a function
    at Observable.subscribe (https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js:11210:29)
    at AppComponent.ngOnInit (http://localhost:8080/scripts/app.component.ts!transpiled:30:28)
    at AbstractChangeDetector.ChangeDetector_HostAppComponent_0.detectChangesInRecordsInternal (viewFactory_HostAppComponent:21:99)
    at AbstractChangeDetector.detectChangesInRecords (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9689:14)
    at AbstractChangeDetector.runDetectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9672:12)
    at AbstractChangeDetector.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9661:12)
    at ChangeDetectorRef_.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:5280:16)
    at https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13048:27
    at Array.forEach (native)
    at ApplicationRef_.tick (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13047:34)

For completeness' sake, here is the contents of my index.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>sse demo</title>
    <!-- 1. Load libraries -->
    <script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/angular2-polyfills.js"></script>
    <script src="https://code.angularjs.org/tools/system.js"></script>
    <script src="https://code.angularjs.org/tools/typescript.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js"></script>
    <script src="https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/http.dev.js"></script>

    <!-- 2. Configure SystemJS -->
    <script>
        System.config({
            transpiler: 'typescript',
            typescriptOptions: { emitDecoratorMetadata: true }
        });
        System.import('./scripts/app.ts')
                .then(null, console.error.bind(console));
    </script>
</head>
<body>

<my-app>Loading...</my-app>

</body>
</html>

Can someone please help?

edit 1: Following Yurzui's advice, I modified my code as follows:

ngOnInit() {
    let observable = Observable.create(observer => {
        const eventSource = new EventSource('/interval-sse-observable');
        eventSource.onmessage = x => observer.next(console.log(x));
        eventSource.onerror = x => observer.error(console.log('EventSource failed'));

        return () => {
            eventSource.close();
        };
    });
    observable.subscribe({
        next: aString => this.someStrings.push(aString.data),
        error: err => console.error('something wrong occurred: ' + err)
    });
}

It does log the first message in the console as follows:

MessageEvent {isTrusted: true, data: "c374a15b-b37d-498e-8ab0-49643b79c1bb", origin: "http://localhost:8080", lastEventId: "", source: null…}bubbles: falsecancelBubble: falsecancelable: falsecurrentTarget: EventSourcedata: "c374a15b-b37d-498e-8ab0-49643b79c1bb"defaultPrevented: falseeventPhase: 0isTrusted: trueisTrusted: truelastEventId: ""origin: "http://localhost:8080"path: Array[0]ports: nullreturnValue: truesource: nullsrcElement: EventSourcetarget: EventSourcetimeStamp: 6257.125type: "message"__proto__: MessageEvent
Rx.js:10982 Uncaught TypeError: Cannot read property 'data' of undefinedSystem.register.exports_1.execute.AppComponent.ngOnInit.observable.subscribe.next @ app.component.ts:29SafeSubscriber.__tryOrUnsub @ Rx.js:10979SafeSubscriber.next @ Rx.js:10934Subscriber._next @ Rx.js:10894Subscriber.next @ Rx.js:10871System.register.exports_1.execute.AppComponent.ngOnInit.Rx_1.Observable.create.eventSource.onmessage @ app.component.ts:21

Now if instead of logging the x variable in the console I just pass it to the next method as follows:

eventSource.onmessage = x => observer.next(x);

The server sent events are retrieved by the client (I see them in the chrome dev tools) but nothing is displayed in the template indicating the array of strings is not populated...

By the way I had to remove the JSON.parse(x.data) as it was causing an error.

Fordham answered 24/4, 2016 at 18:36 Comment(1)
github.com/ReactiveX/rxjs/issues/1644 (by you?)Royceroyd
C
17

You could use the following code to manually create Observable for EventSource stream:

export class AppComponent implements OnInit {
  someStrings:string[] = [];

  constructor(private zone: NgZone) {}

  ngOnInit(){
    const observable = Observable.create(observer => {
      const eventSource = new EventSource('/interval-sse-observable');
      eventSource.onmessage = x => observer.next(x.data);
      eventSource.onerror = x => observer.error(x);

      return () => {
        eventSource.close();
      };
    });

    this.subscription = observable.subscribe({
      next: guid => {
        this.zone.run(() => this.someStrings.push(guid));
      },
      error: err => console.error('something wrong occurred: ' + err)
    });
  }
}

// somewhere
// this.subscription.unsubscribe()

Don't forget to import the NgZone class:

import {Component, OnInit, NgZone} from '@angular/core';

See also Angular2 View Not Changing After Data Is Updated

Christhood answered 24/4, 2016 at 19:30 Comment(7)
Thanks @yurzui, I have edited my post to take into account your reply.Fordham
Strange, i've tried this solution and it works. What you get an error?Christhood
No error and no output. Do you have a working plunker/fiddle?Fordham
I've added plunker exampleChristhood
I'll look into that. Thanks a lot Yurzui. FYI, the creation of observable will soon be available in RxJs 5 see: github.com/ReactiveX/rxjs/issues/1644Fordham
Thanks a lot Yurzui.Fordham
How do you close the eventSource connection using observable instance in this case?Educatee
C
3

I should complete Yurzui's answer:

In my case, working with Angular 6 I had some weird behavior when assigning a function to onmessage. I therefore added an event listener instead and it worked like a charm:

const observable = Observable.create(observer => {
  const eventSource = new EventSource('/interval-sse-observable');
  eventSource.addEventListener("message", (event: MessageEvent) => observer.next(event.data));
  eventSource.addEventListener("error", (event: MessageEvent) => observer.error(event));

  return () => {
    eventSource.close();
  };
});
Crump answered 31/7, 2019 at 13:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.