stream

Macro stream 

Source
pub macro stream($($t:tt)*) {
    ...
}
Available on nightly only.
Expand description

Retrofitted support for Streams with yield, for await syntax.

This macro takes any series of statements and expands them into an expression of type impl Stream<Item = T>, a stream that yields elements of type T. It supports any Rust statement syntax with the following extensions:

  • yield expr

    Yields the result of evaluating expr to the caller (the stream consumer). expr must be of type T.

  • for await x in stream { .. }

    awaits the next element in stream, binds it to x, and executes the block with the binding. stream must implement Stream<Item = T>; the type of x is T.

  • ? short-circuits stream termination on Err

ยงExamples

use rocket::response::stream::stream;
use rocket::futures::stream::Stream;

fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
    stream! {
        for s in &["hi", "there"]{
            yield s.to_string();
        }

        for await n in stream {
            yield format!("n: {}", n);
        }
    }
}

use rocket::futures::stream::{self, StreamExt};

let stream = f(stream::iter(vec![3, 7, 11]));
let strings: Vec<_> = stream.collect().await;
assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);

Using ? on an Err short-circuits stream termination:

use std::io;

use rocket::response::stream::stream;
use rocket::futures::stream::Stream;

fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
    where S: Stream<Item = io::Result<&'static str>>
{
    stream! {
        for await s in stream {
            let num = s?.parse();
            let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
            yield Ok(num);
        }
    }
}

use rocket::futures::stream::{self, StreamExt};

let e = io::Error::last_os_error();
let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
let results: Vec<_> = stream.collect().await;
assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));