Angular 2 Http, Observables and recursive requests

2020-02-01 02:17发布

问题:

I have a REST endpoint that returns a list of items, max 1000 items at a time. If there are more than 1000 items, the response has HTTP status 206 and there's a Next-Range header that I can use in my next request for getting more items.

I'm working on an Angular 2 application and trying to implement this with Http and Observable. My problem is that I don't know how to merge multiple Observables depending on how many pages of items there are and finally return one Observable that my component can subscribe to.

Here's where I've got with my current TypeScript implementation:

// NOTE: Non-working example!

getAllItems(): Observable<any[]> {
  // array of all items, possibly received with multiple requests
  const allItems: any[] = [];

  // inner function for getting a range of items
  const getRange = (range?: string) => {
    const headers: Headers = new Headers();
    if (range) {
      headers.set('Range', range);
    }

    return this.http.get('http://api/endpoint', { headers })
      .map((res: Response) => {
        // add all to received items
        // (maybe not needed if the responses can be merged some other way?)
        allItems.push.apply(allItems, res.json());

        // partial content
        if (res.status === 206) {
          const nextRange = res.headers.get('Next-Range');

          // get next range of items
          return getRange(nextRange);
        }

        return allItems;
      });
  };

  // get first range
  return getRange();
}

However, this doesn't work. If I understood it correctly, an Observable is returned as the value of the initial Observable and not the array of items.

回答1:

You can implement this using the expand operator. What you actually want to do is create a recursive flatmap. That's exactly what the operator expand was created for.

Here is the code snippet of how this works:

let times = true;
// This is a mock method for your http.get call
const httpMock = () => {
  if(times) {
    times = false;
    return Rx.Observable.of({items: ["1", "2", "3"], next: true});
  } else {
    return Rx.Observable.of({items: ["4", "5", "6"], next: false});
  }
}

httpMock()
  .expand(obj => {
    // In your case, the obj will be the response
    // implement your logic here if the 206 http header is found
    if(obj.next) {
      // If you have next values, just call the http.get method again
      // In my example it's the httpMock
      return httpMock();
    } else {
      return Rx.Observable.empty();
    }
  })
  .map(obj => obj.items.flatMap(array => array)) 
  .reduce((acc, x) => acc.concat(x), []);
  .subscribe((val) => console.log(val));

What is does is mock a first http request, that has a 'next' property to true. This matches your 206 header. We then make a second call which has the 'next' property to false.

The result is an array containing the results from both the requests. It's applicable for more requests as well thanks to the expand operator.

Working jsbin example can be found here: http://jsbin.com/wowituluqu/edit?js,console

EDIT: updated to work with an http call that returns an array from arrays and the end result is a single array that contains all the elements form the arrays.

If you wish to have as a result an array with the separate arrays from the request still inside, just remove the flatmap and return the items directly. Update codepen here: http://codepen.io/anon/pen/xRZyaZ?editors=0010#0



回答2:

I got it working with minor tweaks to KwintenP's example:

// service.ts

getAllItems(): Observable<any[]> {
  const getRange = (range?: string): Observable<any> => {
    const headers: Headers = new Headers();
    if (range) {
      headers.set('Range', range);
    }

    return this.http.get('http://api/endpoint', { headers });
  };

  return getRange().expand((res: Response) => {
    if (res.status === 206) {
      const nextRange = res.headers.get('Next-Range');

      return getRange(nextRange);
    } else {
      return Observable.empty();
    }
  }).map((res: Response) => res.json());
}

In the component that subscribes to the Observable, I had to add a completed handler:

// component.ts

const temp = [];

service.getAllItems().subscribe(
  items => {
    // page received, push items to temp
    temp.push.apply(temp, items);
  },
  err => {
    // handle error
  },
  () => {
    // completed, expose temp to component
    this.items = temp;
  }
);


回答3:

On latest version, angular 6+ (response by itself returns JSON), RxJs 6+ (Uses operators in pipeable fashion).

getAllItems(): Observable<any[]> {
 const getRange = (range?: string): Observable<any> => {
 const headers: Headers = new Headers();
 if (range) {
   headers.set('Range', range);
 }
 return this.http.get('http://api/endpoint', { headers });
};

return getRange().pipe(expand((res: Response) => {
  if (res['status'] === 206) {
   const nextRange = res['headers'].get('Next-Range');
   return getRange(nextRange);
  } else {
  return EMPTY;
  }
 }));
}


回答4:

Just in case someone else runs in to this. The pattern I'm using is using the same concept of expand. However this is really the 'complete' example when you need to transform the responses from the server into a different kind of Observable like Visa Kopu's example above.

I broke out each 'step' so the flow is captured in methods (instead of writing the most compact version of it). I think it is a bit more learnable this way.

import {Injectable} from '@angular/core';
import {HttpClient, HttpParams, HttpResponse} from '@angular/common/http';
import {EMPTY, Observable} from 'rxjs';
import {expand, map} from 'rxjs/operators';

// this service is consuming a backend api that is calling/proxying a Salesforce query that is paginated
@Injectable({providedIn: 'root'})
export class ExampleAccountService {

    constructor(protected http: HttpClient) {
    }

    // this method maps the 'pages' of AccountsResponse objects to a single Observable array of Account objects
    allAccounts(): Observable<Account[]> {
        const accounts: Account[] = [];
        return this.aPageOfAccounts(null).pipe(
            map((ret: HttpResponse<AccountsResponse>) => {
                for (const account of ret.body.accounts) {
                    accounts.push(account);
                }
                return accounts;
            })
        );
    }

    // recursively fetch pages of accounts until there are no more pages
    private aPageOfAccounts(page): Observable<HttpResponse<AccountsResponse>> {
        return this.fetchAccountsFromServer(page).pipe(
            expand((res: HttpResponse<AccountsResponse>) => {
                if (res.body.nextRecordsUrl) {
                    return this.aPageOfAccounts(res.body.nextRecordsUrl);
                } else {
                    return EMPTY;
                }
            }));
    }

    // this one does the actual fetch to the server
    private fetchAccountsFromServer(page: string): Observable<HttpResponse<AccountsResponse>> {
        const options = createRequestOption({page});
        return this.http.get<AccountsResponse>(`https://wherever.com/accounts/page`,
            {params: options, observe: 'response'});
    }
}

export class AccountsResponse {
    constructor(public totalSize?: number,
                public done?: boolean,
                public nextRecordsUrl?: string,
                public accounts?: Account[]) {
    }
}

export class Account {
    constructor(public id?: string,
                public name?: string
    ) {

    }
}

export const createRequestOption = (req?: any): HttpParams => {
    let options: HttpParams = new HttpParams();
    if (req) {
        Object.keys(req).forEach((key) => {
            if (key !== 'sort') {
                options = options.set(key, req[key]);
            }
        });
        if (req.sort) {
            req.sort.forEach((val) => {
                options = options.append('sort', val);
            });
        }
    }
    return options;
};


回答5:

The answers above are useful. I had to fetch data using a paging API in a recursive manner, and created the code snippet which computes factorial.