How do I iterate over a Vec of functions returning

2020-02-12 14:25发布

Is it possible to loop over a Vec, calling a method that returns a Future on each, and build a chain of Futures, to be evaluated (eventually) by the consumer? Whether to execute the later Futures would depend on the outcome of the earlier Futures in the Vec.

To clarify:

I'm working on an application that can fetch data from an arbitrary set of upstream sources.

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried, and so on.

Each source should be tried exactly once, and no source should be tried until all of the sources before have returned their results. Errors are logged, but otherwise ignored, passing the query to the next upstream data source.

I have some working code that does this for fetching metadata:

/// Attempts to read/write data to various external sources. These are
/// nested types, because a data source may exist as both a reader and a writer
struct StoreManager {
    /// Upstream data sources
    readers: Vec<Rc<RefCell<StoreRead>>>,
    /// Downstream data sinks
    writers: Vec<Rc<RefCell<StoreWrite>>>,
}

impl StoreRead for StoreManager {
    fn metadata(self: &Self, id: &Identifier) -> Box<Future<Option<Metadata>, Error>> {
       Box::new(ok(self.readers
            .iter()
            .map(|store| {
                executor::block_on(store.borrow().metadata(id)).unwrap_or_else(|err| {
                    error!("Error on metadata(): {:?}", err);
                    None
                })
            })
            .find(Option::is_some)
            .unwrap_or(None)))
    }
}

Aside from my unhappiness with all of the Box and Rc/RefCell nonsense, my real concern is with the executor::block_on() call. It blocks, waiting for each Future to return a result, before continuing to the next.

Given that it's possible to call fn_returning_future().or_else(|_| other_fn()) and so on, is it possible to build up a dynamic chain like this? Or is it a requirement to fully evaluate each Future in the iterator before moving to the next?

1条回答
相关推荐>>
2楼-- · 2020-02-12 14:38

You can use stream::unfold to convert a single value into a stream. In this case, we can use the IntoIter iterator as that single value.

extern crate futures; // 0.2.1

use futures::{executor, future, stream, Future, FutureExt, Stream, StreamExt};

type Error = ();

fn network_request(val: i32) -> impl Future<Item = i32, Error = Error> {        
    // Just for demonstration, don't do this in a real program
    use std::{thread, time::{Duration, Instant}};
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    future::ok(val * 100)
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = i32, Error = Error> {
    stream::unfold(vals.into_iter(), |mut vals| {
        match vals.next() {
            Some(v) => network_request(v).map(|v| Some((v, vals))).left_future(),
            None => future::ok(None).right_future(),
        }
    })
}

fn main() {
    let s = requests_in_sequence(vec![1, 2, 3]);
    let f = s.for_each(|v| {
        println!("-> {}", v);
        Ok(())
    });
    executor::block_on(f).unwrap();
}
Resolving 1 at Instant { tv_sec: 3416957, tv_nsec: 29270595 }
-> 100
Resolving 2 at Instant { tv_sec: 3416958, tv_nsec: 29450854 }
-> 200
Resolving 3 at Instant { tv_sec: 3416959, tv_nsec: 29624479 }
-> 300

To ignore Err and None, you have to shuttle the Error over to the Item, making the Item type a Result<Option<T>, Error>:

extern crate futures; // 0.2.1

use futures::{executor, future, stream, Future, FutureExt, Never, Stream, StreamExt};

struct Error;

fn network_request(val: i32) -> impl Future<Item = Option<i32>, Error = Error> {
    // Just for demonstration, don't do this in a real program
    use std::{thread, time::{Duration, Instant}};
    thread::sleep(Duration::from_millis(100));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => future::err(Error),          // An error
        2 => future::ok(None),            // No data
        _ => future::ok(Some(val * 100)), // Success
    }
}

fn requests_in_sequence<I>(vals: I) -> impl Stream<Item = Result<Option<i32>, Error>, Error = Never>
where
    I: IntoIterator<Item = i32>,
{
    stream::unfold(vals.into_iter(), |mut vals| {
        match vals.next() {
            Some(v) => {
                network_request(v)
                    .then(|v| future::ok(Some((v, vals)))) // Convert `Item` into `Result<Option<i32>, Error>`
                    .left_future()
            }
            None => future::ok(None).right_future(),
        }
    })
}

fn main() {
    let reqs = requests_in_sequence(vec![1, 2, 3, 4, 5]);

    let success = reqs
        .filter_map(|x| future::ok(x.ok())) // Ignore all `Result::Err`
        .filter_map(|x| future::ok(x))      // Ignore all `Option::None`
        .next();                            // Get first value

    match executor::block_on(success) {
        Ok((Some(v), _s)) => println!("First success: {}", v),
        Ok((None, _s)) => println!("No successful requests"),
        Err(_) => unreachable!("Impossible to fail"),
    }
}
Resolving 1 at Instant { tv_sec: 3428278, tv_nsec: 513758474 }
Resolving 2 at Instant { tv_sec: 3428278, tv_nsec: 614059691 }
Resolving 3 at Instant { tv_sec: 3428278, tv_nsec: 714256066 }
First success: 300

is it possible to build up a dynamic chain like this

Yes, but it involves a good amount of extra allocation and indirection, as well as requiring that there be at least one value to access:

fn requests_in_sequence(vals: Vec<i32>) -> Box<Future<Item = i32, Error = Error>> {
    let mut vals = vals.into_iter();

    let val1 = vals.next().expect("Need at least one value to start from");

    vals.fold(Box::new(network_request(val1)), |acc, val| {
        Box::new(acc.or_else(move |_| network_request(val)))
    })
}

See also:


is it a requirement to fully evaluate each Future in the iterator before moving to the next

Isn't that part of your own requirements? Emphasis mine:

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried

查看更多
登录 后发表回答