Angular2/Websocket: how to return an observable for incoming websocket messages
Asked Answered
H

3

11

I'm going to use Angular2 to receive websocket incoming messages and update a webpage based on those received messages. Right now, I'm using a dummy echo websocket service and will replace it.

From my understanding, the function which receive websocket messages has to return an observable that is subscribed by a handler who will update the webpage. But I can't figure out how to return an observable.

Code snippet is attached below. The MonitorService creates a websocket connection and return an observable containing the received messages.

@Injectable()
export class MonitorService {

    private actionUrl: string;
    private headers: Headers;
    private websocket: any;
    private receivedMsg: any;
    constructor(private http: Http, private configuration: AppConfiguration) {

        this.actionUrl = configuration.BaseUrl + 'monitor/';
        this.headers = new Headers();
        this.headers.append('Content-Type', 'application/json');
        this.headers.append('Accept', 'application/json');
    }

    public GetInstanceStatus = (): Observable<Response> => {
        this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
        this.websocket.onopen =  (evt) => {
            this.websocket.send("Hello World");
        };

        this.websocket.onmessage = (evt) => { 
            this.receivedMsg = evt;
        };

        return new Observable(this.receivedMsg).share();
    }

}

Below is another component which subscribes to the observable returned from above and updates webpages correspondingly.

export class InstanceListComponent {
  private instanceStatus: boolean
  private instanceName: string
  private instanceIcon: string
  constructor(private monitor: MonitorService) { 
    this.monitor.GetInstanceStatus().subscribe((result) => {
        this.setInstanceProperties(result);
    });
  }

  setInstanceProperties(res:any) {
    this.instanceName = res.Instance.toUpperCase();
    this.instanceStatus = res.Status;
    if (res.Status == true)
    {
      this.instanceIcon = "images/icon/healthy.svg#Layer_1";
    } else {
      this.instanceIcon = "images/icon/cancel.svg#cancel";
    }
  }
}

Now, I'm running into this error in the browser console TypeError: this._subscribe is not a function

Henn answered 25/4, 2016 at 21:24 Comment(0)
A
11

I put it on a plunker and I added a function for sending message to the Websocket endpoint. Here is the important edit:

public GetInstanceStatus(): Observable<any>{
    this.websocket = new WebSocket("ws://echo.websocket.org/"); //dummy echo websocket service
    this.websocket.onopen =  (evt) => {
        this.websocket.send("Hello World");
    };
    return Observable.create(observer=>{
        this.websocket.onmessage = (evt) => { 
            observer.next(evt);
        };
    })
    .share();
}

Update
As you mentioned in your comment, a better alternative way is to use Observable.fromEvent()

websocket = new WebSocket("ws://echo.websocket.org/");
public GetInstanceStatus(): Observable<Event>{
    return Observable.fromEvent(this.websocket,'message');
}

plunker example for Observable.fromEvent();

Also, you can do it using WebSocketSubject, although, it doesn't look like it's ready yet (as of rc.4):

constructor(){
  this.websocket = WebSocketSubject.create("ws://echo.websocket.org/");
}

public sendMessage(text:string){
  let msg = {msg:text};
  this.websocket.next(JSON.stringify(msg));
} 

plunker example

Allomorphism answered 26/4, 2016 at 4:0 Comment(4)
Thanks! And, it turns out that returning observable like this also works return Observable.fromEvent(websocket, 'message'); Do you have any ideas about their differences?Henn
@BingLu They are almost the same. The fromEvent way is just a wrapper for the event, which is what I did in my answer. I doubt there is any difference. But, it's a better way than my answer nonetheless :D. I will update it my answer to include it. ThanksAllomorphism
@Abdulrahman I know it was a long time ago, but I wonder if you could update the Plunker to use fromEvent. Btw, shouldn't WebSocketSubject be used for streaming data? (I'm just wondering, I'm new to websockets).Ruffi
@drbishop I updated my answer and added two new plunker example for fromEvent and WebSocketSubject. About using WebSocketSubject, I just knew about it from you, it seems like it's new and not ready yet. Once it gets ready, It will be my first choice.Allomorphism
R
0

Get onMessage data from socket.

import { Injectable } from '@angular/core';
import {Observable} from 'rxjs/Rx';


@Injectable()

export class HpmaDashboardService {
private socketUrl: any = 'ws://127.0.0.0/util/test/dataserver/ws';
private websocket: any;      

public GetAllInstanceStatus(objStr): Observable<any> {
      this.websocket = new WebSocket(this.socketUrl);
      this.websocket.onopen =  (evt) => {
        this.websocket.send(JSON.stringify(objStr));
      };
      return Observable.create(observer => {
        this.websocket.onmessage = (evt) => {
          observer.next(evt);
        };
      }).map(res => res.data).share();
 }

**Get only single mesage from socket.**

    public GetSingleInstanceStatus(objStr): Observable<any> {
      this.websocket = new WebSocket(this.socketUrl);
      this.websocket.onopen =  (evt) => {
        this.websocket.send(JSON.stringify(objStr));
      };
      return Observable.create(observer => {
        this.websocket.onmessage = (evt) => {
          observer.next(evt);
          this.websocket.close();
        };
      }).map(res => res.data).share();
  }

}
Regularly answered 6/7, 2017 at 4:42 Comment(0)
A
0

A different approach I used is with subject:

export class WebSocketClient {
   private client: WebSocket | undefined;
   private subject = new Subject<string>();

   ...
   private connect() {
      const client = new WebSocket(fakeUrl);
      const client.onmessage = (event) => {
          this.subject.next(event.data);
      };
   }

   private watch() { return this.subject } // can be mapped
}

And using it will be in my opinion clearer:

const client = new WebSocketClient(); // can also be injected
client.connect();
client.watch().subscribe(x => ...);

Happy coding!

Arhna answered 8/3, 2022 at 10:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.