pub macro stream($($t:tt)*) { ... }
Expand description
Retrofitted support for Stream
s with yield
, for await
syntax.
This macro takes any series of statements and expands them into an
expression of type impl Stream<Item = T>
, a stream that yield
s
elements of type T
. It supports any Rust statement syntax with the
following extensions:
-
yield expr
Yields the result of evaluating
expr
to the caller (the stream consumer).expr
must be of typeT
. -
for await x in stream { .. }
await
s the next element instream
, binds it tox
, and executes the block with the binding.stream
must implementStream<Item = T>
; the type ofx
isT
. -
?
short-circuits stream termination onErr
§Examples
use rocket::response::stream::stream;
use rocket::futures::stream::Stream;
fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
stream! {
for s in &["hi", "there"]{
yield s.to_string();
}
for await n in stream {
yield format!("n: {}", n);
}
}
}
use rocket::futures::stream::{self, StreamExt};
let stream = f(stream::iter(vec![3, 7, 11]));
let strings: Vec<_> = stream.collect().await;
assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);
Using ?
on an Err
short-circuits stream termination:
use std::io;
use rocket::response::stream::stream;
use rocket::futures::stream::Stream;
fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
where S: Stream<Item = io::Result<&'static str>>
{
stream! {
for await s in stream {
let num = s?.parse();
let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
yield Ok(num);
}
}
}
use rocket::futures::stream::{self, StreamExt};
let e = io::Error::last_os_error();
let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
let results: Vec<_> = stream.collect().await;
assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));