RxJS: takeUntil() Angular component's ngOnDest

2019-01-09 01:27发布

tl;dr: Basically I want to marry Angular's ngOnDestroy with the Rxjs takeUntil() operator. -- is that possible?

I have an Angular component that opens several Rxjs subscriptions. These need to be closed when the component is destroyed.

A simple solution for this would be:

class myComponent {

  private subscriptionA;
  private subscriptionB;
  private subscriptionC;

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.subscriptionA = this.serviceA.subscribe(...);
    this.subscriptionB = this.serviceB.subscribe(...);
    this.subscriptionC = this.serviceC.subscribe(...);
  }

  ngOnDestroy() {
    this.subscriptionA.unsubscribe();
    this.subscriptionB.unsubscribe();
    this.subscriptionC.unsubscribe();
  }

}

This works, but it's a bit redundant. I especially don't like that - The unsubscribe() is somewhere else, so you gotta remember that these are linked. - The component state is polluted with the subscription.

I would much prefer using the takeUntil() operator or something similar, to make it look like this:

class myComponent {

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    const destroy = Observable.fromEvent(???).first();
    this.subscriptionA = this.serviceA.subscribe(...).takeUntil(destroy);
    this.subscriptionB = this.serviceB.subscribe(...).takeUntil(destroy);
    this.subscriptionC = this.serviceC.subscribe(...).takeUntil(destroy);
  }

}

Is there a destroy event or something similar that would let me use takeUntil() or another way to simplify the component architecture like that? I realize I could create an event myself in the constructor or something that gets triggered within ngOnDestroy() but that would in the end not make things that much simpler to read.

4条回答
该账号已被封号
2楼-- · 2019-01-09 01:53

Well, this comes down to what you mean by closing a subscription. There're basically two ways to do this:

  1. Using an operator that completes the chain (such as takeWhile()).
  2. Unsubscribe from the source Observable.

It's good to know that these two aren't the same.

When using for example takeWhile() you make the operator send complete notification which is propagated to your observers. So if you define:

...
.subscribe(..., ..., () => doWhatever());

Then when you complete the chain with eg. takeWhile() the doWhatever() function will be called.

For example it could look like this:

const Observable = Rx.Observable;
const Subject = Rx.Subject;

let source = Observable.timer(0, 1000);
let subject = new Subject();

source.takeUntil(subject).subscribe(null, null, () => console.log('complete 1'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 2'));
source.takeUntil(subject).subscribe(null, null, () => console.log('complete 3'));

setTimeout(() => {
  subject.next();
}, 3000);

After 3s all the complete callbacks will be called.

On the other hand when you unsubscribe you're saying that you're no longer interested in the items produced by the source Observable. However this doesn't mean the source has to complete. You just don't care any more.

This means that you can collect all Subscriptions from .subscribe(...) calls and unsubscribe all of them at once:

let subscriptions = new Rx.Subscription();
let source = Observable.timer(0, 1000);

subscriptions.add(source.subscribe(null, null, () => console.log('complete 1')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 2')));
subscriptions.add(source.subscribe(null, null, () => console.log('complete 3')));

setTimeout(() => {
  subscriptions.unsubscribe();
}, 3000);

Now after 3s delay nothing will be printed to console because we unsubscribed and no complete callback was invoked.

So what you want to use is up to you and your use-case. Just be aware that unsubscribing is not the same as completing even though I guess in your situation it doesn't really matter.

查看更多
戒情不戒烟
3楼-- · 2019-01-09 01:53

Using the componentDestroyed() function from the npm package ng2-rx-componentdestroyed is by far the easiest way to use takeUntil:

@Component({
  selector: 'foo',
  templateUrl: './foo.component.html'
})
export class FooComponent implements OnInit, OnDestroy {
  ngOnInit() {
    Observable.interval(1000)
      .takeUntil(componentDestroyed(this)) // <--- magic is here!
      .subscribe(console.log);
  }

  ngOnDestroy() {}
}

Here's a version of componentDestroyed() to include directly in your code:

// Based on https://www.npmjs.com/package/ng2-rx-componentdestroyed
import { OnDestroy } from '@angular/core';
import { ReplaySubject } from 'rxjs/ReplaySubject';

export function componentDestroyed(component: OnDestroy) {
  const oldNgOnDestroy = component.ngOnDestroy;
  const destroyed$ = new ReplaySubject<void>(1);
  component.ngOnDestroy = () => {
    oldNgOnDestroy.apply(component);
    destroyed$.next(undefined);
    destroyed$.complete();
  };
  return destroyed$;
}
查看更多
祖国的老花朵
4楼-- · 2019-01-09 02:04

If your component is directly tied to a route, you can avoid adding state by leveraging Router events with takeUntil(). That way, as soon as you navigate away from the component, it will clean up its subscriptions automatically for you.

import { Component, OnInit } from '@angular/core';
import { ActivatedRoute, Router } from '@angular/router';
import { MyService } from './my.service';
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/takeUntil';

@Component({
    ...
})
export class ExampleComopnent implements OnInit {

    constructor(private router: Router, private myService: MyService) {
    }

    ngOnInit() {
        this.myService.methodA()
            .takeUntil(this.router.events)
            .subscribe(dataA => {
                ...
            });

        this.myService.methodB()
            .takeUntil(this.router.events)
            .subscribe(dataB => {
                ...
            });
    }
}

Note: This simple example does not take into account guarded routes or canceled route navigation. If there is a chance that one of the router events could be triggered but route navigation gets cancelled, you'll need to filter on the router events so it gets triggered at the appropriate point - for example, after the Route Guard check or once navigation is complete.

this.myService.methodA()
    .takeUntil(this.router.events.filter(e => e instanceOf NavigationEnd))
    .subscribe(dataA => {
        ...
    });
查看更多
ら.Afraid
5楼-- · 2019-01-09 02:10

You could leverage a ReplaySubject for that:

class myComponent {
  private destroyed$: ReplaySubject<boolean> = new ReplaySubject(1);

  constructor(
    private serviceA: ServiceA,
    private serviceB: ServiceB,
    private serviceC: ServiceC) {}

  ngOnInit() {
    this.serviceA
      .takeUntil(this.destroyed$)
      .subscribe(...);
    this.serviceB
      .takeUntil(this.destroyed$)
      .subscribe(...);
    this.serviceC
      .takeUntil(this.destroyed$)
      .subscribe(...);
  }

  ngOnDestroy() {
    this.destroyed$.next(true);
    this.destroyed$.complete();
  }
}
查看更多
登录 后发表回答