tokio with multiqueue sometimes hangs, sometimes w

2019-08-16 03:13发布

问题:

I'm trying to benchmark the crate multiqueue with tokio to implement something along the lines of publisher/subscriber by making Streams that can be iterated. I'm not convinced on the efficiency (I may need dozens or hundreds of listeners which filter on the items and the single publisher will be publishing somewhere around 10 messages per millisecond), so I'd like to benchmark the approach before I commit to it. However, right now, I'm encountering a strange bug where sometimes the tokio::timer::Interval just doesn't seem to fire at all.

The full code is below:

#![feature(test)]

extern crate futures;
extern crate multiqueue;
extern crate test;
extern crate tokio;

#[cfg(test)]
mod tests {
    use super::*;
    use futures::future::lazy;
    use futures::sync::mpsc::{channel, Receiver, Sender};
    use futures::{Async, Poll, Stream};
    use futures::{Future, Sink};
    use test::Bencher;
    use tokio::timer::Interval;

    #[bench]
    fn bench_many(b: &mut Bencher) {
        tokio::run(lazy(|| {
            let (tx, rx) = multiqueue::mpmc_fut_queue(1000);
            tokio::spawn(
                Interval::new_interval(std::time::Duration::from_micros(100))
                    .take(100)
                    .map(|_| 100)
                    .map_err(|e| {
                        eprintln!("Got interval error = {:?}", e);
                    })
                    .fold(tx, |tx, num| {
                        println!("Sending {}", num);
                        tx.send(num).map_err(|e| println!("send err = {:?}", e))
                    })
                    .map(|_| ()),
            );

            for i in 0..3 {
                println!("Starting");
                let rx = rx.clone();
                tokio::spawn(rx.for_each(move |num| {
                    println!("{} Got a num! {}", i, num);
                    Ok(())
                }));
            }

            Ok(())
        }));
    }
}

and I'm running it with cargo bench. futures is on version "0.1", tokio is on version "0.1", and multiqueue is on version "0.3".

Sometimes, the whole test completes with many messages of "[0-2] Got a num! 100" and "Sending 100", but sometimes it hangs either in the middle (after several "Sending" and "Got a" messages) or hangs just with 3 "Starting" messages.

I suspect this may be an issue with the number of tasks I can run at the same time with tokio, but I don't really understand why this would be a limitation I'd be running into as both types of tasks I'm spawning yield time to the executor frequently.

How can I make this more reliable?