I'm having a hard time wrapping my brain around observables in Angular. I'm coming from the world of PHP, where things are definitely not async.
I have a component that simply displays a list of messages for a general topic. For now, I have one topic that all messages belong to. If the topic doesn't exist, then it should be created. The message and topic calls are all done through a REST api.
In a non-async world, I would program it in order. The message service would see if the topic exists. If it doesn't, then it has the topic service create it. After it has the topic, it then fetches all of the messages within that topic.
I understand that you subscribe to an observable, but what happens when there needs to be a series of things that need to happen in order? The angular 2 http documentation goes through a very simple example of updating the hero list when one call is made.
Component
export class NotificationListComponent implements OnInit {
constructor(private _notificationService:NotificationService) {
}
***
ngOnInit() {
this.getNotifications();
}
getNotifications() {
this._notificationService.getNotifications()
.subscribe(
notifications => this.notifications = notifications,
error => this.errorMessage = <any>error);
}
Notification Service
...
getNotifications() {
// call the topic service here for general topic??
return this.http.get('/messages?order[datesent]=DESC')
.map((res) => {
return res.json()["hydra:member"];
})
.map((notifications:Array<any>) => {
let result:Array<Notification> = [];
notifications.forEach((jsonNotification) => {
var Notification:Notification = {
message: jsonNotification.message,
topic: jsonNotification.topic,
datesent: new Date(jsonNotification.datesent),
};
result.push(Notification);
});
return result;
})
.catch(this.handleError);
}
...
Topic Service
...
getGeneralTopic() {
var generalTopic;
this.http.get('/topics?name=general')
.map((res) => {
return res.json()["hydra:member"][0];
})
.do(data => console.log(data))
.subscribe(generalTopic => res);
}
...
How to reason about Observables?
Observables deal with streams. Streams can be pretty much anything, but you can think of them as abstract array of asynchronous events. This is useful because you can now reason about them more clearly:
- abstract because Observable can produce a value of any type:
String, Boolean, Object
- array because Observable has
Operators
that work similar to JavaScript's array methods: map(), filter(), reduce()
- of because Observable is a wrapper for value(s)
- asynchronous because Observable may or may not execute
- events because Observable needs to be triggered
When to use Observables?
You want to use Observables typically when you need to perform a task or an action that involves several steps. This can be your starting point, you can later simplify or increase complexity as needed.
You should have a "plan" or at least a vague idea what those steps should be. Sounds obvious, but many problems/issues occur because you don't know what you want (;
Once you have planned an action (as an array of steps) you can start from either end, but I think it's better to start from the end. At least until you learn more.
I have a component that simply displays a list of messages for a general topic. For now, I have one topic that all messages belong to. If the topic doesn't exist, then it should be created. The message and topic calls are all done through a REST api.
In a non-async world, I would program it in order. The message service would see if the topic exists. If it doesn't, then it has the topic service create it. After it has the topic, it then fetches all of the messages within that topic.
For your use case The Plan would be: ["(create topic)", "select topic", "show messages"]
. messages
are abstract array, select
and create
are asynchronous events.
How to use an Observable?
As I said above, let's start from the end - "show messages"
.
<div *ngFor="#message of messages"></div>
We know we're dealing with Observable.of(messages)
(this is how you would manually create it). Next, you need to 'fill' the stream of messages, and you can do it with Http
service that returns Observable
. Since messages you get from the server are wrapped in several "layers" by Http
service, we can leverage ability of Observable to chain operators (operators return Observable
) and get to the messages we need:
getMessages(topic) {
this.http.get("/messages?topic=" + topic)
.map(response => response.json())
// chain other operators here...
.filter(message => !message.private)
}
You can use whatever Operators you need here... which leads to the next big thing about Observables:
“Hot” and “Cold” Observables
Observables are cold by default. This means that when you create an observable you just describe what it should do. It won't execute these actions immediately (like Promises
do) it needs to be triggered.
You trigger it by subscribing to it, either manually with subscribe()
method, or you can let Angular make it hot with async
pipe (which does subscribing for you).
getMessages(topic) {
this.http.get("/messages?topic=" + topic)
.map(response => response.json())
.subscribe(messages => this.messages = messages);
}
Watching for changes
Next thing to do (or previous since we're going backwards in The Plan) is to "select topic"
. It would be nice to watch the value of selected topic and respond to it's change by loading new messages. This can be done with a Subject
.
A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.
We can setup topic$
to do this like so:
class ListComponent {
public messages;
public topic$ = new Subject();
constructor(private http: Http) {
this.getMessages('posts');
this.topic$
.subscribe(topic => this.getMessages(topic));
}
getMessages(topic: string) {....}
selectTopic(topic: string) {
this.topic$.next(topic)
}
}
Wrap up
Last step is to "(create topic)"
if one doesn't exist. Let's assume server would return an error if topic doesn't exist:
getMessages(topic: string) {
this.http.get(API_URL + topic)
.map(response => response.json())
.subscribe(
messages => this.messages = messages,
error => this.createTopic(topic)
);
}
createTopic(topic: string) {
this.http.post(API_URL + topic, { body: JSON.stringify(topic) })
.map(response => response.json())
.subscribe();
}
Here's the working plunker with this example. As you can see it's not hard to do (50-ish lines of code...). You can easily move things around and create services where you need.
In fact, Reactive Programming allows to create asynchronous data streams. This means that you can leverage
The HTTP support of Angular2 leverages operators (flatMap, ...) to link streams together and implement complex processing chains. This means that you can use all concepts of Reactive Programming / RxJS to make an HTTP request part of an asynchronous data stream.
Here a simple sample that allows to execute a request based on the value of a form input (when updates occur):
this.inputControl.valueChanges.debounceTime(500).flatMap(
val => {
// Execute ab
return this.http.get(`http://...?filter=${val}`);
}).subscribe((data) => {
this.places = data;
});
Operators can also allow to implement easily several issues in the context of HTTP:
Ensure to receive the latest when requests are executed in sequence (for example based on user input). Sample of the switchMap
operator that cancels the previous in-progress requests when a new one is triggered
this.ctrl.valueChanges.switchMap(
val => {
return this.http.get(`http://...?filter=${val}`);
});
Buffer events and trigger the last one after an amount of time. Sample that waits for an inactivity of 500ms before executing the request based on the latest value of the input:
this.ctrl.valueChanges.debounceTime(500).flatMap(
val => {
return this.http.get(`http://...?filter=${val}`);
});
Retry when requests fail. Sample of retry each 500ms and within 2s
var params = new URLSearchParams();
params.set('placename_startsWith', searchText);
params.set('username', 'XXX');
return this.http.get('http://api.geonames.org/postalCodeSearchJSON',
{ search: params })
.retryWhen(error => error.delay(500))
.timeout(2000, return new Error('delay exceeded'))
.map(res => res.json().postalCodes);
Here are some articles regarding Reactive Programming that could help you:
- The introduction to Reactive Programming you've been missing by André Staltz - https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
- Everything is a stream by Rob Wormald - http://slides.com/robwormald/everything-is-a-stream
Though I'm not sure to understand your question, possibly Observable.selectMany
and Observable.and/thenDo/when
can help you.
https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/selectmany.md
https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/when.md
In your case, I think it makes you to create a transaction of fetching and inserting.