pub macro stream($($t:tt)*) {
...
}Available on
nightly only.Expand description
Retrofitted support for Streams with yield, for await syntax.
This macro takes any series of statements and expands them into an
expression of type impl Stream<Item = T>, a stream that yields
elements of type T. It supports any Rust statement syntax with the
following extensions:
-
yield exprYields the result of evaluating
exprto the caller (the stream consumer).exprmust be of typeT. -
for await x in stream { .. }awaits the next element instream, binds it tox, and executes the block with the binding.streammust implementStream<Item = T>; the type ofxisT. -
?short-circuits stream termination onErr
ยงExamples
use rocket::response::stream::stream;
use rocket::futures::stream::Stream;
fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
stream! {
for s in &["hi", "there"]{
yield s.to_string();
}
for await n in stream {
yield format!("n: {}", n);
}
}
}
use rocket::futures::stream::{self, StreamExt};
let stream = f(stream::iter(vec![3, 7, 11]));
let strings: Vec<_> = stream.collect().await;
assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);Using ? on an Err short-circuits stream termination:
use std::io;
use rocket::response::stream::stream;
use rocket::futures::stream::Stream;
fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
where S: Stream<Item = io::Result<&'static str>>
{
stream! {
for await s in stream {
let num = s?.parse();
let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
yield Ok(num);
}
}
}
use rocket::futures::stream::{self, StreamExt};
let e = io::Error::last_os_error();
let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
let results: Vec<_> = stream.collect().await;
assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));