Using RxJs and Angular 2 in order to deal with ser

2020-02-26 09:59发布

问题:

I am trying to display server-sent events emitted values in an angular 2 /RxJs app.

The backend regularly sends individual strings to the client through server-sent events.

I am not sure how to deal with the retrieved values on the angular 2/RxJs side.

Here is my client (a ng component):

import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable}     from 'rxjs/Observable';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings | async">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    constructor(private http:Http) {
    }

    errorMessage:string;
    someStrings:string[];

    ngOnInit() {
        this.getSomeStrings()
            .subscribe(
                aString => this.someStrings.push(aString),
                error => this.errorMessage = <any>error);
    }

    private getSomeStrings():Observable<string> {
        return this.http.get('interval-sse-observable')
            .map(this.extractData)
            .catch(this.handleError);
    }

    private extractData(res:Response) {
        if (res.status < 200 || res.status >= 300) {
            throw new Error('Bad response status: ' + res.status);
        }
        let body = res.json();
        return body || {};
    }

    private handleError(error:any) {
        // In a real world app, we might send the error to remote logging infrastructure
        let errMsg = error.message || 'Server error';
        console.error(errMsg); // log to console instead
        return Observable.throw(errMsg);
    }
}

The backend method is as follows (and uses RxJava):

   @ResponseStatus(HttpStatus.OK)
   @RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
    public SseEmitter tickSseObservable() {
        return RxResponse.sse(
                Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                        .map(tick -> randomUUID().toString())
        );
    }

I just noticed that the app hangs on the request and that nothing is displayed on the page.

I suspect there is an issue with my use of the map method i.e. .map(this.extractData).

I would just like to add the incoming strings to the array and display that array in the template which would update as the strings come in.

Can anyone please help?

edit: Here is a working solution (thanks to Thierry's answer below):

import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    someStrings:string[] = [];

    ngOnInit() {
        let source = new EventSource('/interval-sse-observable');
        source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
    }
}

回答1:

You can't use the Http class of Angular2 to handle server side events since it's based on the XHR object.

You could leverage the EventSource object:

var source = new EventSource('/...');
source.addListener('message', (event) => {
  (...)
});

See these articles:

  • https://dzone.com/articles/html5-server-sent-events
  • http://www.nurkiewicz.com/2015/07/server-sent-events-with-rxjava-and.html?m=1


回答2:

Here is a working example :

SseService

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';

declare var EventSource;

@Injectable()
export class SseService {

    constructor() {
    }

    observeMessages(sseUrl: string): Observable<string> {
        return new Observable<string>(obs => {
            const es = new EventSource(sseUrl);
            es.addEventListener('message', (evt) => {
                console.log(evt.data);
                obs.next(evt.data);
            });
            return () => es.close();
        });
    }
}

AppComponent

import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>Angular Server-Sent Events</h1>
    <ul>
        <li *ngFor="let message of messages">
             {{ message }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit, OnDestroy {
    private sseStream: Subscription;
    messages:Array<string> = [];

    constructor(private sseService: SseService){
    }

    ngOnInit() {
        this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
                        .subscribe(message => {
                            messages.push(message);
                        });
    }

    ngOnDestroy() {
        if (this.sseStream) {
            this.sseStream.unsubscribe();
        }
    }
}


回答3:

To add to Thierry's answer, By Default the event type is 'message'. However the event type could be anything like ('chat', 'log' etc.,) based on the server side implementation. In my case, the first two events from the server were 'message' and the rest of them were 'log'. My code looks as below

var source = new EventSource('/...'); source.addListener('message', message => { (...) }); source.addListener('log', log => { (...) });