I'm using futures, tokio, hyper, and serde_json to request and deserialize some data that I need to hold until my next request. My initial thought was to make a struct containing the hyper::Chunk
and the deserialized data that borrows from the Chunk
, but couldn't get the lifetimes right. I tried using the rental crate, but I can't get this to work either. Perhaps I'm using the 'buffer
lifetime before declaring the buffer Vec
, but maybe I've messed something else up:
#[rental]
pub struct ChunkJson<T: serde::de::Deserialize<'buffer>> {
buffer: Vec<u8>,
json: T
}
Is there some way to make the lifetimes right or should I just use DeserializeOwned
and give up on zero-copy?
For more context, the following code works (periodically deserializing JSON from two URLs, retaining the results so we can do something with them both). I'd like to change my X
and Y
types to use Cow<'a, str>
for their fields, changing from DeserializeOwned
to Deserialize<'a>
. For this to work, I need to store the slice that has been deserialized for each, but I don't know how to do this. I'm looking for examples that use Serde's zero-copy deserialization and retain the result, or some idea for restructuring my code that would work.
#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate serde_json;
extern crate futures;
extern crate tokio_core;
extern crate tokio_periodic;
extern crate hyper;
use std::collections::HashMap;
use std::error::Error;
use futures::future;
use futures::Future;
use futures::stream::Stream;
use hyper::Client;
fn stream_json<'a, T: serde::de::DeserializeOwned + Send + 'a>
(handle: &tokio_core::reactor::Handle,
url: String,
period: u64)
-> Box<Stream<Item = T, Error = Box<Error>> + 'a> {
let client = Client::new(handle);
let timer = tokio_periodic::PeriodicTimer::new(handle).unwrap();
timer
.reset(::std::time::Duration::new(period, 0))
.unwrap();
Box::new(futures::Stream::zip(timer.from_err::<Box<Error>>(), futures::stream::unfold( (), move |_| {
let uri = url.parse::<hyper::Uri>().unwrap();
let get = client.get(uri).from_err::<Box<Error>>().and_then(|res| {
res.body().concat().from_err::<Box<Error>>().and_then(|chunks| {
let p: Result<T, Box<Error>> = serde_json::from_slice::<T>(chunks.as_ref()).map_err(|e| Box::new(e) as Box<Error>);
match p {
Ok(json) => future::ok((json, ())),
Err(err) => future::err(err)
}
})
});
Some(get)
})).map(|x| { x.1 }))
}
#[derive(Serialize, Deserialize, Debug)]
pub struct X {
foo: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Y {
bar: String,
}
fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let x_stream = stream_json::<HashMap<String, X>>(&handle, "http://localhost/X".to_string(), 2);
let y_stream = stream_json::<HashMap<String, Y>>(&handle, "http://localhost/Y".to_string(), 5);
let mut xy_stream = x_stream.merge(y_stream);
let mut last_x = HashMap::new();
let mut last_y = HashMap::new();
loop {
match core.run(futures::Stream::into_future(xy_stream)) {
Ok((Some(item), stream)) => {
match item {
futures::stream::MergedItem::First(x) => last_x = x,
futures::stream::MergedItem::Second(y) => last_y = y,
futures::stream::MergedItem::Both(x, y) => {
last_x = x;
last_y = y;
}
}
println!("\nx = {:?}", &last_x);
println!("y = {:?}", &last_y);
// Do more stuff with &last_x and &last_y
xy_stream = stream;
}
Ok((None, stream)) => xy_stream = stream,
Err(_) => {
panic!("error");
}
}
}
}
When trying to solve a complicated programming problem, it's very useful to remove as much as you can. Take your code and remove what you can until the problem goes away. Tweak your code a bit and keep removing until you can't any more. Then, turn the problem around and build from the smallest piece and work back to the error. Doing both of these will show you where the problem lies.
First, let's make sure we deserialize correctly:
Here, I forgot to add the
#[serde(borrow)]
attribute, so I'm glad I did this test!Next, we can introduce the rental crate:
Next we try to create a rental of
Chunk
:Unfortunately, this fails:
Oops! Checking the docs for rental, we can add
#[target_ty_hack="[u8]"]
to thedata
field. This leads to:That's annoying; since we can't implement that trait for
Chunk
, we just need to boxChunk
, proving that it has a stable address:I also looked to see if there is a way to get a
Vec<u8>
back out ofChunk
, but it doesn't appear to exist. That would have been another solution with less allocation and indirection.At this point, "all" that's left is to integrate this back into the futures code. It's a lot of work for anyone but you to recreate that, but I don't foresee any obvious problems in doing so.