Read buffered response with Angular2/RxJS

2019-06-23 19:23发布

问题:

I'm building a website that reads data from a backend. That data is computed on-the-fly and sent back to the client in a buffered manner. I.e. as soon as the first chunk is computed it is sent to the client, then it computes the next chunk and sends that to the client. This whole process happens in the same HTTP request. The client should not wait for the complete response to finish but handle each chunk by its own as soon as has been sent. Such responses can usually be consumed using the XHR progress handler (e.g. How to get progress from XMLHttpRequest).

How can I consume such a response with the HttpModule in Angular2 using RxJS and Observables?


Edit: peeskillet gave an excellent and detailed answer below. In addition, I did some further digging and found a feature request for the HttpModule of Angular and a StackOverflow question with another approach on how to solve it.

回答1:

Note: The following answer is only a POC. It is meant to educate on the architecture of Http, and also provide a simple working POC implementation. One should take a look at the source for XHRConnection for ideas on what else you should consider when implementing this.

When trying to implement this, I don't see any way to tap directly into the XHR. It seems maybe we need to just provide custom implementations of some of the components involved with using Http. The three main components that we should consider are

  • Connection
  • ConnectionBackend
  • Http

Http takes a ConnectionBackend as a argument to its constructor. When a request is made, say with get, Http creates a connection with ConnectionBackend.createConnection, and returns the Observable property of Connection (that's returned from createConnection). In the most stripped down (simplified) view, it looks like this

class XHRConnection implements Connection {
  response: Observable<Response>;
  constructor( request, browserXhr) {
    this.response = new Observable((observer: Observer<Response>) => {
      let xhr = browserXhr.create();
      let onLoad = (..) => {
        observer.next(new Response(...));
      };
      xhr.addEventListener('load', onLoad);
    })
  }
}

class XHRBackend implements ConnectionBackend {
  constructor(private browserXhr) {}
  createConnection(request): XHRConnection {
    return new XHRConnection(request, this.broswerXhr).response;
  }
}

class Http {
  constructor(private backend: ConnectionBackend) {}

  get(url, options): Observable<Response> {
    return this.backend.createConnection(createRequest(url, options)).response;
  }
}

So knowing this architecture, we can try to implement something similar.

For the Connection, here is the POC. Imports left out for brevity, but for the most part, everything can be imported from @angular/http, and the Observable/Observer can be imported from rxjs/{Type}.

export class Chunk {
  data: string;
}

export class ChunkedXHRConnection implements Connection {
  request: Request;
  response: Observable<Response>;
  readyState: ReadyState;

  chunks: Observable<Chunk>;

  constructor(req: Request, browserXHR: BrowserXhr, baseResponseOptions?: ResponseOptions) {
    this.request = req;
    this.chunks = new Observable<Chunk>((chunkObserver: Observer<Chunk>) => {
      let _xhr: XMLHttpRequest = browserXHR.build();
      let previousLen = 0;
      let onProgress = (progress: ProgressEvent) => {
        let text = _xhr.responseText;
        text = text.substring(previousLen);
        chunkObserver.next({ data: text });
        previousLen += text.length;

        console.log(`chunk data: ${text}`);
      };
      _xhr.addEventListener('progress', onProgress);
      _xhr.open(RequestMethod[req.method].toUpperCase(), req.url);
      _xhr.send(this.request.getBody());
      return () => {
        _xhr.removeEventListener('progress', onProgress);
        _xhr.abort();
      };
    });
  }
}

Here's we are just subscribing to the XHR progress event. Since the XHR.responseText spews out the entire concatenated text, we just substring to get chunks, and emit each chuck through the Observer.

For the XHRBackend, we have the following (nothing spectacular). Again, everything can be imported from @angular/http;

@Injectable()
export class ChunkedXHRBackend implements ConnectionBackend {
  constructor(
      private _browserXHR: BrowserXhr, private _baseResponseOptions: ResponseOptions,
      private _xsrfStrategy: XSRFStrategy) {}

  createConnection(request: Request): ChunkedXHRConnection {
    this._xsrfStrategy.configureRequest(request);
    return new ChunkedXHRConnection(request, this._browserXHR, this._baseResponseOptions);
  }
}

For Http, we will extend it, adding a getChunks method. You can add more methods if you want.

@Injectable()
export class ChunkedHttp extends Http {
  constructor(protected backend: ChunkedXHRBackend, protected defaultOptions: RequestOptions) {
    super(backend, defaultOptions);
  }

  getChunks(url, options?: RequestOptionsArgs): Observable<Chunk> {
    return this.backend.createConnection(
       new Request(mergeOptions(this.defaultOptions, options, RequestMethod.Get, url))).chunks;
  }
}

The mergeOptions method can be found in the Http source.

Now we can create a module for it. Users should directly use ChunkedHttp instead of Http. But because to don't attempt to override the Http token, you can still use Http if you need to.

@NgModule({
  imports: [ HttpModule ],
  providers: [
    {
      provide: ChunkedHttp,
      useFactory: (backend: ChunkedXHRBackend, options: RequestOptions) => {
        return new ChunkedHttp(backend, options);
      },
      deps: [ ChunkedXHRBackend, RequestOptions ]
    },
    ChunkedXHRBackend
  ]
})
export class ChunkedHttpModule {
}

We import the HttpModule because it provides other services that we need to be injected, but we don't want to have to reimplement those if we don't need to.

To test just import the ChunkedHttpModule into the AppModule. Also to test I used the following component

@Component({
  selector: 'app',
  encapsulation: ViewEncapsulation.None,
  template: `
    <button (click)="onClick()">Click Me!</button>
    <h4 *ngFor="let chunk of chunks">{{ chunk }}</h4>
  `,
  styleUrls: ['./app.style.css']
})
export class App {
  chunks: string[] = [];

  constructor(private http: ChunkedHttp) {}

  onClick() {
    this.http.getChunks('http://localhost:8080/api/resource')
      .subscribe(chunk => this.chunks.push(chunk.data));
  }
}

I have a backend endpoint set up where it just spits out "Message #x" in 10 chunks every half a second. And this is the result

There seems to be a bug somewhere. There's only nine :-). I think it's server side related.