rocket/response/stream/
raw_sse.rs

1use std::borrow::Cow;
2use std::io::{self, Cursor};
3use std::task::{Context, Poll};
4use std::pin::Pin;
5use std::cmp::min;
6
7use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take};
8
9/// Low-level serialization of fields in text/event-stream format.
10///
11/// Based on https://html.spec.whatwg.org/multipage/server-sent-events.html,
12/// reproduced here for quick reference. Retrieved 2021-04-17.
13///
14/// ```text
15/// stream        = [ bom ] *event
16/// event         = *( comment / field ) end-of-line
17/// comment       = colon *any-char end-of-line
18/// field         = 1*name-char [ colon [ space ] *any-char ] end-of-line
19/// end-of-line   = ( cr lf / cr / lf )
20///
21/// ; characters
22/// lf            = %x000A ; U+000A LINE FEED (LF)
23/// cr            = %x000D ; U+000D CARRIAGE RETURN (CR)
24/// space         = %x0020 ; U+0020 SPACE
25/// colon         = %x003A ; U+003A COLON (:)
26/// bom           = %xFEFF ; U+FEFF BYTE ORDER MARK
27/// name-char     = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
28///                 ; a scalar value other than:
29///                 ; U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
30/// any-char      = %x0000-0009 / %x000B-000C / %x000E-10FFFF
31///                 ; a scalar value other than:
32///                 ; U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)/
33/// ```
34///
35/// Notice that Multiple encodings are possible for the same data, especially in
36/// the choice of newline. This implementation always uses only "\n" (LF).
37///
38/// Serializes (via `AsyncRead`) as a series of "${name}:${value}\n" events.
39/// Either or both `name` and `value` may be empty. When the name is empty, this
40/// is a comment. Otherwise, this is a field.
41#[derive(Debug)]
42pub struct RawLinedEvent {
43    name: Cursor<Cow<'static, [u8]>>,
44    value: Take<Cursor<Cow<'static, [u8]>>>,
45    state: State,
46}
47
48/// Converts a `Cow<str>` to a `Cow<[u8]>`.
49fn farm(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
50    match cow {
51        Cow::Borrowed(slice) => Cow::Borrowed(slice.as_bytes()),
52        Cow::Owned(vec) => Cow::Owned(vec.into_bytes())
53    }
54}
55
56/// Farms `cow`, replacing `\r`, `\n`, and `:` with ` ` in the process.
57///
58/// This converts any string into a valid event `name`.
59fn farm_name(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
60    let mut i = 0;
61    let mut cow = farm(cow);
62    while i < cow.len() {
63        if let Some(k) = memchr::memchr3(b'\r', b'\n', b':', &cow[i..]) {
64            cow.to_mut()[i + k] = b' ';
65            // This can't overflow as i + k + 1 <= len, since we found a char.
66            i += k + 1;
67        } else {
68            break;
69        }
70    }
71
72    cow
73}
74
75/// Farms `cow`, replacing `\r` and `\n` with ` ` in the process.
76///
77/// This converts any string into a valid event `value`.
78fn farm_value(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
79    let mut i = 0;
80    let mut cow = farm(cow);
81    while i < cow.len() {
82        if let Some(k) = memchr::memchr2(b'\r', b'\n', &cow[i..]) {
83            cow.to_mut()[i + k] = b' ';
84            // This can't overflow as i + k + 1 <= len, since we found a char.
85            i += k + 1;
86        } else {
87            break;
88        }
89    }
90
91    cow
92}
93
94impl RawLinedEvent {
95    /// Create a `RawLinedEvent` from a valid, prefarmed `name` and `value`.
96    fn prefarmed(name: Cow<'static, [u8]>, value: Cow<'static, [u8]>) -> RawLinedEvent {
97        let name = Cursor::new(name);
98        let mut value = Cursor::new(value).take(0);
99        advance(&mut value);
100        RawLinedEvent { name, value, state: State::Name }
101    }
102
103    /// Create a `RawLinedEvent` from potentially invalid `name` and `value`
104    /// where `value` is not allowed to be multiple lines.
105    ///
106    /// Characters `\n`, `\r`, and ':' in `name` and characters `\r` \`n` in
107    /// `value` `are replaced with a space ` `.
108    pub fn one<N, V>(name: N, value: V) -> RawLinedEvent
109        where N: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>
110    {
111        RawLinedEvent::prefarmed(farm_name(name.into()), farm_value(value.into()))
112    }
113
114    /// Create a `RawLinedEvent` from potentially invalid `name` and `value`
115    /// where `value` is allowed to be multiple lines.
116    ///
117    /// Characters `\n`, `\r`, and ':' in `name` are replaced with a space ` `.
118    /// `value` is allowed to contain any character. New lines (`\r\n` or `\n`)
119    /// and carriage returns `\r` result in a new event being emitted.
120    pub fn many<N, V>(name: N, value: V) -> RawLinedEvent
121        where N: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>
122    {
123        RawLinedEvent::prefarmed(farm_name(name.into()), farm(value.into()))
124    }
125
126    /// Create a `RawLinedEvent` from known value `value`. The value is emitted
127    /// directly with _no_ name and suffixed with a `\n`.
128    pub fn raw<V: Into<Cow<'static, str>>>(value: V) -> RawLinedEvent {
129        let value = value.into();
130        let len = value.len();
131        RawLinedEvent {
132            name: Cursor::new(Cow::Borrowed(&[])),
133            value: Cursor::new(farm(value)).take(len as u64),
134            state: State::Value
135        }
136    }
137}
138
139/// The `AsyncRead`er state.
140#[derive(Debug, PartialEq, Copy, Clone)]
141enum State {
142    Name,
143    Colon,
144    Value,
145    NewLine,
146    Done
147}
148
149/// Find the next new-line (`\n` or `\r`) character in `buf` beginning at the
150/// current cursor position and sets the limit to be at that position.
151fn advance<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
152    // Technically, the position need not be <= len, so we right it.
153    let pos = min(buf.get_ref().get_ref().as_ref().len() as u64, buf.get_ref().position());
154    let inner = buf.get_ref().get_ref().as_ref();
155    let next = memchr::memchr2(b'\n', b'\r', &inner[(pos as usize)..])
156        .map(|i| pos + i as u64)
157        .unwrap_or_else(|| inner.len() as u64);
158
159    let limit = next - pos;
160    buf.set_limit(limit);
161}
162
163/// If the cursor in `buf` is currently at an `\r`, `\r\n` or `\n`, sets the
164/// cursor position to be _after_ the characters.
165fn skip<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
166    let pos = min(buf.get_ref().get_ref().as_ref().len() as u64, buf.get_ref().position());
167    match buf.get_ref().get_ref().as_ref().get(pos as usize) {
168        // This cannot overflow as clearly `buf.len() >= pos + 1`.
169        Some(b'\n') => buf.get_mut().set_position(pos + 1),
170        Some(b'\r') => {
171            let next = (pos as usize).saturating_add(1);
172            if buf.get_ref().get_ref().as_ref().get(next) == Some(&b'\n') {
173                // This cannot overflow as clearly `buf.len() >= pos + 2`.
174                buf.get_mut().set_position(pos + 2);
175            } else {
176                // This cannot overflow as clearly `buf.len() >= pos + 1`.
177                buf.get_mut().set_position(pos + 1);
178            }
179        }
180        _ => return,
181    }
182}
183
184
185macro_rules! dbg_assert_ready {
186    ($e:expr) => ({
187        let poll = $e;
188        debug_assert!(poll.is_ready());
189        ::futures::ready!(poll)
190    })
191}
192
193// NOTE: The correctness of this implementation depends on the types of `name`
194// and `value` having `AsyncRead` implementations that always return `Ready`.
195// Otherwise, we may return `Pending` after having written data to `buf` which
196// violates the contract. This can happen because even after a successful
197// partial or full read of `name`, we loop back to a `ready!(name.poll())` if
198// `buf` was not completely filled. So, we return `Pending` if that poll does.
199impl AsyncRead for RawLinedEvent {
200    fn poll_read(
201        mut self: Pin<&mut Self>,
202        cx: &mut Context<'_>,
203        buf: &mut ReadBuf<'_>,
204    ) -> Poll<io::Result<()>> {
205        use bytes::Buf;
206
207        loop {
208            if buf.remaining() == 0 {
209                return Poll::Ready(Ok(()));
210            }
211
212            match self.state {
213                State::Name => {
214                    dbg_assert_ready!(Pin::new(&mut self.name).poll_read(cx, buf))?;
215                    if !self.name.has_remaining() {
216                        self.name.set_position(0);
217                        self.state = State::Colon;
218                    }
219                }
220                State::Colon => {
221                    // Note that we've checked `buf.remaining() != 0`.
222                    buf.put_slice(&[b':']);
223                    self.state = State::Value;
224                }
225                State::Value => {
226                    dbg_assert_ready!(Pin::new(&mut self.value).poll_read(cx, buf))?;
227                    if self.value.limit() == 0 {
228                        self.state = State::NewLine;
229                    }
230                }
231                State::NewLine => {
232                    // Note that we've checked `buf.remaining() != 0`.
233                    buf.put_slice(&[b'\n']);
234                    if self.value.get_ref().has_remaining() {
235                        skip(&mut self.value);
236                        advance(&mut self.value);
237                        self.state = State::Name;
238                    } else {
239                        self.state = State::Done;
240                    }
241                }
242                State::Done => return Poll::Ready(Ok(()))
243            }
244        }
245    }
246}