rocket_ws/
lib.rs

1//! WebSocket support for Rocket.
2//!
3//! This crate implements support for WebSockets via Rocket's [connection
4//! upgrade API](rocket::Response#upgrading) and
5//! [tungstenite](tokio_tungstenite).
6//!
7//! # Usage
8//!
9//! Depend on the crate. Here, we rename the dependency to `ws` for convenience:
10//!
11//! ```toml
12//! [dependencies]
13//! ws = { package = "rocket_ws", version = "0.1.1" }
14//! ```
15//!
16//! Then, use [`WebSocket`] as a request guard in any route and either call
17//! [`WebSocket::channel()`] or return a stream via [`Stream!`] or
18//! [`WebSocket::stream()`] in the handler. The examples below are equivalent:
19//!
20//! ```rust
21//! # use rocket::get;
22//! # use rocket_ws as ws;
23//! #
24//! #[get("/echo?channel")]
25//! fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
26//!     use rocket::futures::{SinkExt, StreamExt};
27//!
28//!     ws.channel(move |mut stream| Box::pin(async move {
29//!         while let Some(message) = stream.next().await {
30//!             let _ = stream.send(message?).await;
31//!         }
32//!
33//!         Ok(())
34//!     }))
35//! }
36//!
37//! #[get("/echo?stream")]
38//! fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
39//!     ws::Stream! { ws =>
40//!         for await message in ws {
41//!             yield message?;
42//!         }
43//!     }
44//! }
45//!
46//! #[get("/echo?compose")]
47//! fn echo_compose(ws: ws::WebSocket) -> ws::Stream!['static] {
48//!     ws.stream(|io| io)
49//! }
50//! ```
51//!
52//! WebSocket connections are configurable via [`WebSocket::config()`]:
53//!
54//! ```rust
55//! # use rocket::get;
56//! # use rocket_ws as ws;
57//! #
58//! #[get("/echo")]
59//! fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
60//!     let ws = ws.config(ws::Config {
61//!         max_send_queue: Some(5),
62//!         ..Default::default()
63//!     });
64//!
65//!     ws::Stream! { ws =>
66//!         for await message in ws {
67//!             yield message?;
68//!         }
69//!     }
70//! }
71//! ```
72
73#![doc(html_root_url = "https://api.rocket.rs/v0.5/rocket_ws")]
74#![doc(html_favicon_url = "https://rocket.rs/images/favicon.ico")]
75#![doc(html_logo_url = "https://rocket.rs/images/logo-boxed.png")]
76
77mod tungstenite {
78    #[doc(inline)] pub use tokio_tungstenite::tungstenite::*;
79}
80
81mod duplex;
82mod websocket;
83
84pub use self::websocket::{WebSocket, Channel};
85
86/// A WebSocket message.
87///
88/// A value of this type is typically constructed by calling `.into()` on a
89/// supported message type. This includes strings via `&str` and `String` and
90/// bytes via `&[u8]` and `Vec<u8>`:
91///
92/// ```rust
93/// # use rocket::get;
94/// # use rocket_ws as ws;
95/// #
96/// #[get("/echo")]
97/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
98///     ws::Stream! { ws =>
99///         yield "Hello".into();
100///         yield String::from("Hello").into();
101///         yield (&[1u8, 2, 3][..]).into();
102///         yield vec![1u8, 2, 3].into();
103///     }
104/// }
105/// ```
106///
107/// Other kinds of messages can be constructed directly:
108///
109/// ```rust
110/// # use rocket::get;
111/// # use rocket_ws as ws;
112/// #
113/// #[get("/echo")]
114/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
115///     ws::Stream! { ws =>
116///         yield ws::Message::Ping(vec![b'h', b'i'])
117///     }
118/// }
119/// ```
120pub use self::tungstenite::Message;
121
122/// WebSocket connection configuration.
123///
124/// The default configuration for a [`WebSocket`] can be changed by calling
125/// [`WebSocket::config()`] with a value of this type. The defaults are obtained
126/// via [`Default::default()`]. You don't generally need to reconfigure a
127/// `WebSocket` unless you're certain you need different values. In other words,
128/// this structure should rarely be used.
129///
130/// # Example
131///
132/// ```rust
133/// # use rocket::get;
134/// # use rocket_ws as ws;
135/// use rocket::data::ToByteUnit;
136///
137/// #[get("/echo")]
138/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
139///     let ws = ws.config(ws::Config {
140///         // Enable backpressure with a max send queue size of `5`.
141///         max_send_queue: Some(5),
142///         // Decrease the maximum (complete) message size to 4MiB.
143///         max_message_size: Some(4.mebibytes().as_u64() as usize),
144///         // Decrease the maximum size of _one_ frame (not message) to 1MiB.
145///         max_frame_size: Some(1.mebibytes().as_u64() as usize),
146///         // Use the default values for the rest.
147///         ..Default::default()
148///     });
149///
150///     ws::Stream! { ws =>
151///         for await message in ws {
152///             yield message?;
153///         }
154///     }
155/// }
156/// ```
157///
158/// **Original `tungstenite` Documentation Follows**
159///
160pub use self::tungstenite::protocol::WebSocketConfig as Config;
161
162/// Structures for constructing raw WebSocket frames.
163pub mod frame {
164    #[doc(hidden)] pub use crate::Message;
165    pub use crate::tungstenite::protocol::frame::{CloseFrame, Frame};
166    pub use crate::tungstenite::protocol::frame::coding::CloseCode;
167}
168
169/// Types representing incoming and/or outgoing `async` [`Message`] streams.
170pub mod stream {
171    pub use crate::duplex::DuplexStream;
172    pub use crate::websocket::MessageStream;
173}
174
175/// Library [`Error`](crate::result::Error) and
176/// [`Result`](crate::result::Result) types.
177pub mod result {
178    pub use crate::tungstenite::error::{Result, Error};
179}
180
181/// Type and expression macro for `async` WebSocket [`Message`] streams.
182///
183/// This macro can be used both where types are expected or
184/// where expressions are expected.
185///
186/// # Type Position
187///
188/// When used in a type position, the macro invoked as `Stream['r]` expands to:
189///
190/// - [`MessageStream`]`<'r, impl `[`Stream`]`<Item = `[`Result`]`<`[`Message`]`>>> + 'r>`
191///
192/// The lifetime need not be specified as `'r`. For instance, `Stream['request]`
193/// is valid and expands as expected:
194///
195/// - [`MessageStream`]`<'request, impl `[`Stream`]`<Item = `[`Result`]`<`[`Message`]`>>> + 'request>`
196///
197/// As a convenience, when the macro is invoked as `Stream![]`, the lifetime
198/// defaults to `'static`. That is, `Stream![]` is equivalent to
199/// `Stream!['static]`.
200///
201/// [`MessageStream`]: crate::stream::MessageStream
202/// [`Stream`]: rocket::futures::stream::Stream
203/// [`Result`]: crate::result::Result
204/// [`Message`]: crate::Message
205///
206/// # Expression Position
207///
208/// When invoked as an expression, the macro behaves similarly to Rocket's
209/// [`stream!`](rocket::response::stream::stream) macro. Specifically, it
210/// supports `yield` and `for await` syntax. It is invoked as follows:
211///
212/// ```rust
213/// # use rocket::get;
214/// use rocket_ws as ws;
215///
216/// #[get("/")]
217/// fn echo(ws: ws::WebSocket) -> ws::Stream![] {
218///     ws::Stream! { ws =>
219///         for await message in ws {
220///             yield message?;
221///             yield "foo".into();
222///             yield vec![1, 2, 3, 4].into();
223///         }
224///     }
225/// }
226/// ```
227///
228/// It enjoins the following type requirements:
229///
230///   * The type of `ws` _must_ be [`WebSocket`]. `ws` can be any ident.
231///   * The type of yielded expressions (`expr` in `yield expr`) _must_ be [`Message`].
232///   * The `Err` type of expressions short-circuited with `?` _must_ be [`Error`].
233///
234/// [`Error`]: crate::result::Error
235///
236/// The macro takes any series of statements and expands them into an expression
237/// of type `impl Stream<Item = `[`Result`]`<T>>`, a stream that `yield`s elements of
238/// type [`Result`]`<T>`. It automatically converts yielded items of type `T` into
239/// `Ok(T)`. It supports any Rust statement syntax with the following
240/// extensions:
241///
242///   * `?` short-circuits stream termination on `Err`
243///
244///     The type of the error value must be [`Error`].
245///     <br /> <br />
246///
247///   * `yield expr`
248///
249///     Yields the result of evaluating `expr` to the caller (the stream
250///     consumer) wrapped in `Ok`.
251///
252///     `expr` must be of type `T`.
253///     <br /> <br />
254///
255///   * `for await x in stream { .. }`
256///
257///     `await`s the next element in `stream`, binds it to `x`, and executes the
258///     block with the binding.
259///
260///     `stream` must implement `Stream<Item = T>`; the type of `x` is `T`.
261///
262/// ### Examples
263///
264/// Borrow from the request. Send a single message and close:
265///
266/// ```rust
267/// # use rocket::get;
268/// use rocket_ws as ws;
269///
270/// #[get("/hello/<user>")]
271/// fn ws_hello(ws: ws::WebSocket, user: &str) -> ws::Stream!['_] {
272///     ws::Stream! { ws =>
273///         yield user.into();
274///     }
275/// }
276/// ```
277///
278/// Borrow from the request with explicit lifetime:
279///
280/// ```rust
281/// # use rocket::get;
282/// use rocket_ws as ws;
283///
284/// #[get("/hello/<user>")]
285/// fn ws_hello<'r>(ws: ws::WebSocket, user: &'r str) -> ws::Stream!['r] {
286///     ws::Stream! { ws =>
287///         yield user.into();
288///     }
289/// }
290/// ```
291///
292/// Emit several messages and short-circuit if the client sends a bad message:
293///
294/// ```rust
295/// # use rocket::get;
296/// use rocket_ws as ws;
297///
298/// #[get("/")]
299/// fn echo(ws: ws::WebSocket) -> ws::Stream![] {
300///     ws::Stream! { ws =>
301///         for await message in ws {
302///             for i in 0..5u8 {
303///                 yield i.to_string().into();
304///             }
305///
306///             yield message?;
307///         }
308///     }
309/// }
310/// ```
311///
312#[macro_export]
313macro_rules! Stream {
314    () => ($crate::Stream!['static]);
315    ($l:lifetime) => (
316        $crate::stream::MessageStream<$l, impl rocket::futures::Stream<
317            Item = $crate::result::Result<$crate::Message>
318        > + $l>
319    );
320    ($channel:ident => $($token:tt)*) => (
321        let ws: $crate::WebSocket = $channel;
322        ws.stream(move |$channel| rocket::async_stream::try_stream! {
323            $($token)*
324        })
325    );
326}