I'm trying to build a simple pipeline-like functionality that executes each stage of the pipeline is separate threads and glues them all together with channel passing.
Pipe::source(buffer)
.pipe(|input, output| {...})
.pipe(|input, output| {...})
.sink(writer)
I cannot for the life of me figure out the function signature for the pipe()
function. Here's my code:
use std::sync::mpsc::channel;
use std::io::{ChanReader,ChanWriter};
use std::thread::Thread;
struct Pipe {
incoming: ChanReader
}
impl Pipe {
fn source(source: &mut Buffer) -> Pipe {
let (tx, rx) = channel();
let reader = ChanReader::new(rx);
let mut writer = ChanWriter::new(tx);
loop {
match source.read_char() {
Ok(c) => writer.write_char(c),
Err(_) => break
};
};
Pipe { incoming: reader }
}
fn sink(&mut self, sink: &mut Writer) {
loop {
match self.incoming.read_char() {
Ok(c) => sink.write_char(c),
Err(_) => break
};
};
}
fn pipe(&self, transform: Box<Fn(&mut ChanReader, &mut ChanWriter)+Send>) -> Pipe {
let (tx, rx) = channel();
let reader = ChanReader::new(rx);
let mut writer = ChanWriter::new(tx);
Thread::spawn(move || {
transform(&self.incoming, &writer);
});
Pipe { incoming: reader }
}
}
And the compiler error:
src/lib.rs:39:28: 41:10 error: cannot infer an appropriate lifetime due to conflicting requirements
src/lib.rs:39 Thread::spawn(move || {
src/lib.rs:40 transform(&self.incoming, &writer);
src/lib.rs:41 });
src/lib.rs:39:9: 39:22 note: first, the lifetime cannot outlive the expression at 39:8...
src/lib.rs:39 Thread::spawn(move || {
^~~~~~~~~~~~~
src/lib.rs:39:9: 39:22 note: ...so that the declared lifetime parameter bounds are satisfied
src/lib.rs:39 Thread::spawn(move || {
^~~~~~~~~~~~~
src/lib.rs:39:9: 41:11 note: but, the lifetime must be valid for the call at 39:8...
src/lib.rs:39 Thread::spawn(move || {
src/lib.rs:40 transform(&self.incoming, &writer);
src/lib.rs:41 });
src/lib.rs:39:28: 41:10 note: ...so that argument is valid for the call
src/lib.rs:39 Thread::spawn(move || {
src/lib.rs:40 transform(&self.incoming, &writer);
src/lib.rs:41 });
src/lib.rs:39:9: 39:22 error: declared lifetime bound not satisfied
src/lib.rs:39 Thread::spawn(move || {
^~~~~~~~~~~~~
src/lib.rs:34:87: 44:6 note: lifetime parameter instantiated with the anonymous lifetime #1 defined on the block at 34:86
src/lib.rs:34 fn pipe(&self, transform: Box<Fn(&mut ChanReader, &mut ChanWriter)+Send>) -> Pipe {
src/lib.rs:35 let (tx, rx) = channel();
src/lib.rs:36 let reader = ChanReader::new(rx);
src/lib.rs:37 let mut writer = ChanWriter::new(tx);
src/lib.rs:38
src/lib.rs:39 Thread::spawn(move || {
...
note: but lifetime parameter must outlive the static lifetime
error: aborting due to 2 previous errors
Could not compile `pipes`.
I am using 1.0.0-dev
.
Thread::spawn
requires the function that is given to it to be'static
; the closure must thus consume all things that come into it. You have gone taking a reference toself.incoming
—a non-static reference. This won’t work; you must move the reader into it. The way to do this is probably to be consumingself
in the methodpipe
rather than taking it by reference:Now,
self
andwriter
are moved into the spawned thread and everything is dandy.(For good measure I unboxed your closure as there’s no reason for it to be boxed and changed it to
FnOnce
for maximal expressiveness.)