How to merge iterator of streams?

2019-07-25 09:33发布

I'm writing a program that scrapes links from oxfordlearnersdictionaries.com using Rust. I'm using hyper and futures.

I have a collection of links to each section and use stream::unfold to construct access to each page:

// Returns the links scraped, and probably Uri to the next page.
fn process_body_and_return_next(body: Body) -> (Vec<String>, Option<Uri>) { ... }

// In main()
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle);
let uris = ...

let jobs = uris.map(|uri| {
    stream::unfold(Some(uri), |uri| {
        uri.map(|uri| {
            client
                .get(uri)
                .and_then(|res| res.body().concat2())
                .map(process_body_and_return_next)
        })
    })
});

Now I've got a impl Iterator<Item = impl Stream<Item = Vec<String>>>. How can I merge it to a single Stream of Vecs, like using stream::futures_unordered to merge Futures?


Edit: I tried combine stream::iter_ok and stream::Stream::flatten:

let flattened = ::futures::stream::iter_ok(jobs)
    .flatten();

But that's not efficient since I want to send several web request asynchronously. The combined Stream should produce value whenever a inner Stream is ready.

标签: rust
2条回答
爷、活的狠高调
2楼-- · 2019-07-25 09:44

An iterator may be turned into a stream using futures::stream::iter_ok, this allows your iterator of streams to be turned into a stream of streams:

::futures::stream::iter_ok(jobs)

You can then flatten this stream of streams into a single stream of all items using Stream::flatten():

let flattened = ::futures::stream::iter_ok(jobs)
    .flatten();
查看更多
Root(大扎)
3楼-- · 2019-07-25 10:05

The select combinator takes two Streams and yields when one of the two streams is ready.

In order to select from more than two streams, you can chain calls to select. However, since you don't know in advance the number of streams you have to select on, you'll have to box the intermediate streams in order to erase the specific Stream type, so that the program type-checks.

extern crate futures;

use futures::Stream;

fn select_all<'a, I, T, E>(seq: I) -> Box<Stream<Item = T, Error = E> + 'a>
where
    I: IntoIterator,
    I::Item: Stream<Item = T, Error = E> + 'a,
    T: 'a,
    E: 'a,
{
    let mut iter = seq.into_iter();
    let mut result = Box::new(iter.next().expect("got an empty list of streams"))
        as Box<Stream<Item = T, Error = E>>;
    while let Some(next) = iter.next() {
        result = Box::new(result.select(next));
    }

    result
}

There's certainly a more efficient way to implement this, though. There is a select_all combinator for futures, but there isn't one yet for streams. Perhaps you could implement it yourself and submit it as a pull request!

查看更多
登录 后发表回答