I would like to create a RxJs Observable from an EventSource (server sent events).
I tried the following:
import {Component, OnInit} from 'angular2/core';
import {Subject, Observable} from 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
someStrings:string[] = [];
ngOnInit() {
let eventSource = new EventSource('/interval-sse-observable');
let observable = Observable.create(eventSource);
observable.subscribe({
next: aString => this.someStrings.push(aString.data),
error: err => console.error('something wrong occurred: ' + err)
});
}
}
But I get the following exception:
EXCEPTION: Error: Uncaught (in promise): EXCEPTION: TypeError: this._subscribe is not a function in [null]
ORIGINAL EXCEPTION: TypeError: this._subscribe is not a function
ORIGINAL STACKTRACE:
TypeError: this._subscribe is not a function
at Observable.subscribe (https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js:11210:29)
at AppComponent.ngOnInit (http://localhost:8080/scripts/app.component.ts!transpiled:30:28)
at AbstractChangeDetector.ChangeDetector_HostAppComponent_0.detectChangesInRecordsInternal (viewFactory_HostAppComponent:21:99)
at AbstractChangeDetector.detectChangesInRecords (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9689:14)
at AbstractChangeDetector.runDetectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9672:12)
at AbstractChangeDetector.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9661:12)
at ChangeDetectorRef_.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:5280:16)
at https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13048:27
at Array.forEach (native)
at ApplicationRef_.tick (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13047:34)
For completeness' sake, here is the contents of my index.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>sse demo</title>
<!-- 1. Load libraries -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/angular2-polyfills.js"></script>
<script src="https://code.angularjs.org/tools/system.js"></script>
<script src="https://code.angularjs.org/tools/typescript.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js"></script>
<script src="https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/http.dev.js"></script>
<!-- 2. Configure SystemJS -->
<script>
System.config({
transpiler: 'typescript',
typescriptOptions: { emitDecoratorMetadata: true }
});
System.import('./scripts/app.ts')
.then(null, console.error.bind(console));
</script>
</head>
<body>
<my-app>Loading...</my-app>
</body>
</html>
Can someone please help?
edit 1: Following Yurzui's advice, I modified my code as follows:
ngOnInit() {
let observable = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(console.log(x));
eventSource.onerror = x => observer.error(console.log('EventSource failed'));
return () => {
eventSource.close();
};
});
observable.subscribe({
next: aString => this.someStrings.push(aString.data),
error: err => console.error('something wrong occurred: ' + err)
});
}
It does log the first message in the console as follows:
MessageEvent {isTrusted: true, data: "c374a15b-b37d-498e-8ab0-49643b79c1bb", origin: "http://localhost:8080", lastEventId: "", source: null…}bubbles: falsecancelBubble: falsecancelable: falsecurrentTarget: EventSourcedata: "c374a15b-b37d-498e-8ab0-49643b79c1bb"defaultPrevented: falseeventPhase: 0isTrusted: trueisTrusted: truelastEventId: ""origin: "http://localhost:8080"path: Array[0]ports: nullreturnValue: truesource: nullsrcElement: EventSourcetarget: EventSourcetimeStamp: 6257.125type: "message"__proto__: MessageEvent
Rx.js:10982 Uncaught TypeError: Cannot read property 'data' of undefinedSystem.register.exports_1.execute.AppComponent.ngOnInit.observable.subscribe.next @ app.component.ts:29SafeSubscriber.__tryOrUnsub @ Rx.js:10979SafeSubscriber.next @ Rx.js:10934Subscriber._next @ Rx.js:10894Subscriber.next @ Rx.js:10871System.register.exports_1.execute.AppComponent.ngOnInit.Rx_1.Observable.create.eventSource.onmessage @ app.component.ts:21
Now if instead of logging the x
variable in the console I just pass it to the next method as follows:
eventSource.onmessage = x => observer.next(x);
The server sent events are retrieved by the client (I see them in the chrome dev tools) but nothing is displayed in the template indicating the array of strings is not populated...
By the way I had to remove the JSON.parse(x.data)
as it was causing an error.