I have an angular 2 service that fetch data from an API
this service has 3 subscribers (defined in Components) each doing something else with the data (different graphs)
I'm noticing that I'm doing three GET requests to the API while what i want to achieve is one request and that the subscribers will share the data
I've looks into HOT and COLD observable and tried the .share() on the observable but I'm still doing 3 individual calls
Update, adding code
Service
import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';
import {Observable} from 'rxjs/Rx';
// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';
import { StationCompliance } from './model/StationCompliance';
@Injectable()
export class StationComplianceService {
private url = '/api/read/stations';
constructor(private http : Http) {
console.log('Started Station compliance service');
}
getStationCompliance() : Observable<StationCompliance []> {
return this.http.get(this.url)
.map((res:Response) => res.json())
.catch((error:any) => Observable.throw(error.json().error || 'Server Error'));
}
}
Component 1
import { Component, OnInit } from '@angular/core';
import { CHART_DIRECTIVES } from 'angular2-highcharts';
import { StationComplianceService } from '../station-compliance.service';
@Component({
selector: 'app-up-down-graph',
templateUrl: './up-down-graph.component.html',
styleUrls: ['./up-down-graph.component.css']
})
export class UpDownGraphComponent implements OnInit {
graphData;
errorMessage: string;
options;
constructor(private stationService : StationComplianceService) { }
ngOnInit() {
this.getStationTypes();
}
getStationTypes(){
this.stationService.getStationCompliance()
.subscribe(
graphData => {
this.graphData = graphData;
this.options = {
chart : {type: 'pie',
plotShadow: true
},
plotOptions : {
showInLegend: true
},
title : {text: 'Up and Down devices'},
series: [{
data: this.processStationType(this.graphData)
}]
}
},
error => this.errorMessage = <any>error
);
}
Other two components are almost the same, they just show other graph
I encountered a similar problem and solved it using Aran's suggestion to reference Cory Rylan's Angular 2 Observable Data Services blog post. The key for me was using BehaviorSubject
. Here's the snippets of the code that ultimately worked for me.
Data Service:
The data service creates an internal BehaviorSubject
to cache the data once when the service is initialized. Consumers use the subscribeToDataService()
method to access the data.
import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { Observable } from 'rxjs/Observable';
import { Data } from './data';
import { properties } from '../../properties';
@Injectable()
export class DataService {
allData: Data[] = new Array<Data>();
allData$: BehaviorSubject<Data[]>;
constructor(private http: Http) {
this.initializeDataService();
}
initializeDataService() {
if (!this.allData$) {
this.allData$ = <BehaviorSubject<Data[]>> new BehaviorSubject(new Array<Data>());
this.http.get(properties.DATA_API)
.map(this.extractData)
.catch(this.handleError)
.subscribe(
allData => {
this.allData = allData;
this.allData$.next(allData);
},
error => console.log("Error subscribing to DataService: " + error)
);
}
}
subscribeToDataService(): Observable<Data[]> {
return this.allData$.asObservable();
}
// other methods have been omitted
}
Component:
Components can subscribe to the data service upon initialization.
export class TestComponent implements OnInit {
allData$: Observable<Data[]>;
constructor(private dataService: DataService) {
}
ngOnInit() {
this.allData$ = this.dataService.subscribeToDataService();
}
}
Component Template:
The template can then iterate over the observable as necessary using the async pipe.
*ngFor="let data of allData$ | async"
Subscribers are updated each time the next()
method is called on the BehaviorSubject
in the data service.
The issue that you have in your code is that you are returning a new observable each time your function is called. This is because http.get
is creating a new Observable each time it is called. The way to solve this could be to store the observable (via closure) in the service which will ensure that all of the subjects are subscribing to the same observable. This isn't perfect code, but I had a similar issue and this solved my problem for the time being.
import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';
import {Observable} from 'rxjs/Rx';
// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';
import { StationCompliance } from './model/StationCompliance';
@Injectable()
export class StationComplianceService {
private url = '/api/read/stations';
constructor(private http : Http) {
console.log('Started Station compliance service');
}
private stationComplianceObservable: Rx.Observable<StationCompliance[]>;
getStationCompliance() : Observable<StationCompliance []> {
if(this.stationComplianceObservable){
return this.stationComplianceObservable;
}
this.stationComplianceObservable = this.http.get(this.url)
.debounce(1000)
.share()
.map((res:Response) => res.json())
.finally(function () { this.stationComplianceObservable = null})
.catch((error:any) => Observable.throw(error.json().error || 'Server Error'));
return this.stationComplianceObservable;
}
}
you can create a reactive data service and define a local Observable variable which is updated internally and subscribers can update themselves.
this article explains it properly
data services
The solution is save once created observable and make it shareable (by default it is not). So your service will look like:
@Injectable()
export class StationComplianceService {
private stationCompliance: StationCompliance;
private stream: Observable<StationCompliance []>;
private url = '/api/read/stations';
constructor(private http : Http) {
console.log('Started Station compliance service');
}
getStationCompliance() : Observable<StationCompliance []> {
/** is remote value is already fetched, just return it as Observable */
if (this.stationComliance) {
return Observable.of(this.stationComliance);
}
/** otherwise if stream already created, prevent another stream creation (exactly your question */
if (this.stream) {
return this.stream;
}
/** otherwise run remote data fetching */
this.stream = this.http.get(this.url)
.map((res:Response) => res.json())
.catch((error:any) => Observable.throw(error.json().error || 'Server Error'))
.share(); /** and make the stream shareable (by default it is not) */
return this.stream;
}
}
shareReplay
should do it now - "(...) valuable in situations where you know you will have late subscribers to a stream that need access to previously emitted values."
I know this thread is old, but the accepted answer helped me a lot and I would like to add some possible improvements using debounce, switchmap and a hacky global notification system (to do this proper ngrx should be used: https://ngrx.io/)
The premise of the concept is that a notification service can be used to push changed to all other services telling them to fetch their data:
export class NotifyerService {
constructor() { }
notifyer: Subject<any> = new Subject
notifyAll() {
console.log("ALL NOTIFIED")
this.notifyer.next("GO")
}
}
A subject is used, because calling .next(val) on a subject pushes the data to all listeners
In the service for a particular componenet (in your case "DataService") you can manage the data aquisition, and caching activity:
export class GetDataService {
// cache the incoming data
cache: Subject<any> = new Subject
constructor(private http: HttpClient,
private notifyerService: NotifyerService) {
// subscribe to any notification from central message broker
this.notifyerService.notifyer.pipe(
// filtering can be used to perform different actions based on different notifications
filter(val => val == "GO"),
// prevent notification spam by debouncing
debounceTime(300),
// SUBSCRIBE to the output of getData, cancelling previous subscription
switchMap(() => this.getData())
).subscribe(res => {
console.log("expensive server request")
// save data in cache, notify listeners
this.cache.next(res)
})
}
// expensive function which gets data
getData(): Observable<any> {
return this.http.get<any>(BASE_URL);
}
}
The key concept in the above code is that you setup a cache object, and update it whenever there is a notification. In the constructor, we want to pipe all future notifications through a series of operators:
- Filtering can be used if you want to only update the cache if the
notification service outputs certain things (in this case "GO"). If
you start doing this, ngrx will almost certainly be used instead.
- debounceTime prevents your server being spammed with many requests (if say, the notifications are based on user input)
- switchMap is a super important operator for caching and state change. switchMap runs the function inside (in this case, the getData function) and subscribes to its output. This means that the output of switchMap will be the subscription to the server data.
- Another important feature of switchMap is that is cancels previous subscriptions. This means that if your server request has not returned before another notification arrives, it will cancel the old one and ping the server again.
Now after all of that, the server data will be placed in the cache with the .next(res) method.
Now all the final component needs to do is listen to the cache for updates, and handle appropriatly:
export class ButtonclickerComponent implements OnInit {
value: any;
constructor(private getDataService: GetDataService,
private notifyerService: NotifyerService) { }
ngOnInit() {
// listen to cache for updates
this.getDataService.cache.pipe(
// can do something specific to this component if have multiple subscriptions off same cache
map(x => x)
// subsc
).subscribe(x => { console.log(x); this.value = x.value })
}
onClick() {
// notify all components of state changes
this.notifyerService.notifyAll()
}
}
The concept in action:
Angular App on button click
Server response