Module rocket::response::stream

source ·
Expand description

Potentially infinite async Stream response types.

A Stream<Item = T> is the async analog of an Iterator<Item = T>: it generates a sequence of values asynchronously, otherwise known as an async generator. Types in this module allow for returning responses that are streams.

§Raw Streams

Rust does not yet natively support syntax for creating arbitrary generators, and as such, for creating streams. To ameliorate this, Rocket exports stream!, which retrofit generator syntax, allowing raw impl Streams to be defined using yield and for await syntax:

use rocket::futures::stream::Stream;
use rocket::response::stream::stream;

fn make_stream() -> impl Stream<Item = u8> {
    stream! {
        for i in 0..3 {
            yield i;
        }
    }
}

See stream! for full usage details.

§Typed Streams

A raw stream is not a Responder, so it cannot be directly returned from a route handler. Instead, one of three typed streams may be used. Each typed stream places type bounds on the Item of the stream, allowing for Responder implementation on the stream itself.

Each typed stream exists both as a type and as a macro. They are:

Each type implements Responder; each macro can be invoked to generate a typed stream, exactly like stream! above. Additionally, each macro is also a type macro, expanding to a wrapped impl Stream<Item = $T>, where $T is the input to the macro.

As a concrete example, the route below produces an infinite series of "hello"s, one per second:

use rocket::tokio::time::{self, Duration};
use rocket::response::stream::TextStream;

/// Produce an infinite series of `"hello"`s, one per second.
#[get("/infinite-hellos")]
fn hello() -> TextStream![&'static str] {
    TextStream! {
        let mut interval = time::interval(Duration::from_secs(1));
        loop {
            yield "hello";
            interval.tick().await;
        }
    }
}

The TextStream![&'static str] invocation expands to:

TextStream<impl Stream<Item = &'static str>>

While the inner TextStream! { .. } invocation expands to:

TextStream::from(stream! { /* .. */ })

The expansions are identical for ReaderStream and ByteStream, with TextStream replaced with ReaderStream and ByteStream, respectively.

§Borrowing

A stream can yield borrowed values with no extra effort:

use rocket::State;
use rocket::response::stream::TextStream;

/// Produce a single string borrowed from the request.
#[get("/infinite-hellos")]
fn hello(string: &State<String>) -> TextStream![&str] {
    TextStream! {
        yield string.as_str();
    }
}

If the stream contains a borrowed value or uses one internally, Rust requires this fact be explicit with a lifetime annotation:

use rocket::State;
use rocket::response::stream::TextStream;

#[get("/")]
fn borrow1(ctxt: &State<bool>) -> TextStream![&'static str + '_] {
    TextStream! {
        // By using `ctxt` in the stream, the borrow is moved into it. Thus,
        // the stream object contains a borrow, prompting the '_ annotation.
        if *ctxt.inner() {
            yield "hello";
        }
    }
}

// Just as before but yielding an owned yield value.
#[get("/")]
fn borrow2(ctxt: &State<bool>) -> TextStream![String + '_] {
    TextStream! {
        if *ctxt.inner() {
            yield "hello".to_string();
        }
    }
}

// As before but _also_ return a borrowed value. Without it, Rust gives:
// - lifetime `'r` is missing in item created through this procedural macro
#[get("/")]
fn borrow3<'r>(ctxt: &'r State<bool>, s: &'r State<String>) -> TextStream![&'r str + 'r] {
    TextStream! {
        if *ctxt.inner() {
            yield s.as_str();
        }
    }
}

§Graceful Shutdown

Infinite responders, like the one defined in hello above, will prolong shutdown initiated via Shutdown::notify() for the defined grace period. After the grace period has elapsed, Rocket will abruptly terminate the responder.

To avoid abrupt termination, graceful shutdown can be detected via the Shutdown future, allowing the infinite responder to gracefully shut itself down. The following example modifies the previous hello with shutdown detection:

use rocket::Shutdown;
use rocket::response::stream::TextStream;
use rocket::tokio::select;
use rocket::tokio::time::{self, Duration};

/// Produce an infinite series of `"hello"`s, 1/second, until shutdown.
#[get("/infinite-hellos")]
fn hello(mut shutdown: Shutdown) -> TextStream![&'static str] {
    TextStream! {
        let mut interval = time::interval(Duration::from_secs(1));
        loop {
            select! {
                _ = interval.tick() => yield "hello",
                _ = &mut shutdown => {
                    yield "goodbye";
                    break;
                }
            };
        }
    }
}

Macros§

Structs§

  • A potentially infinite stream of bytes: any T: AsRef<[u8]>.
  • A Server-Sent Event (SSE) in a Server-Sent EventStream.
  • A potentially infinite stream of Server-Sent Events (SSE).
  • A stream that yields exactly one value.
  • An async reader that reads from a stream of async readers.
  • A potentially infinite stream of text: T: AsRef<str>.