rocket/response/stream/raw_sse.rs
1use std::borrow::Cow;
2use std::io::{self, Cursor};
3use std::task::{Context, Poll};
4use std::pin::Pin;
5use std::cmp::min;
6
7use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take};
8
9/// Low-level serialization of fields in text/event-stream format.
10///
11/// Based on https://html.spec.whatwg.org/multipage/server-sent-events.html,
12/// reproduced here for quick reference. Retrieved 2021-04-17.
13///
14/// ```text
15/// stream = [ bom ] *event
16/// event = *( comment / field ) end-of-line
17/// comment = colon *any-char end-of-line
18/// field = 1*name-char [ colon [ space ] *any-char ] end-of-line
19/// end-of-line = ( cr lf / cr / lf )
20///
21/// ; characters
22/// lf = %x000A ; U+000A LINE FEED (LF)
23/// cr = %x000D ; U+000D CARRIAGE RETURN (CR)
24/// space = %x0020 ; U+0020 SPACE
25/// colon = %x003A ; U+003A COLON (:)
26/// bom = %xFEFF ; U+FEFF BYTE ORDER MARK
27/// name-char = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
28/// ; a scalar value other than:
29/// ; U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
30/// any-char = %x0000-0009 / %x000B-000C / %x000E-10FFFF
31/// ; a scalar value other than:
32/// ; U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)/
33/// ```
34///
35/// Notice that Multiple encodings are possible for the same data, especially in
36/// the choice of newline. This implementation always uses only "\n" (LF).
37///
38/// Serializes (via `AsyncRead`) as a series of "${name}:${value}\n" events.
39/// Either or both `name` and `value` may be empty. When the name is empty, this
40/// is a comment. Otherwise, this is a field.
41#[derive(Debug)]
42pub struct RawLinedEvent {
43 name: Cursor<Cow<'static, [u8]>>,
44 value: Take<Cursor<Cow<'static, [u8]>>>,
45 state: State,
46}
47
48/// Converts a `Cow<str>` to a `Cow<[u8]>`.
49fn farm(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
50 match cow {
51 Cow::Borrowed(slice) => Cow::Borrowed(slice.as_bytes()),
52 Cow::Owned(vec) => Cow::Owned(vec.into_bytes())
53 }
54}
55
56/// Farms `cow`, replacing `\r`, `\n`, and `:` with ` ` in the process.
57///
58/// This converts any string into a valid event `name`.
59fn farm_name(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
60 let mut i = 0;
61 let mut cow = farm(cow);
62 while i < cow.len() {
63 if let Some(k) = memchr::memchr3(b'\r', b'\n', b':', &cow[i..]) {
64 cow.to_mut()[i + k] = b' ';
65 // This can't overflow as i + k + 1 <= len, since we found a char.
66 i += k + 1;
67 } else {
68 break;
69 }
70 }
71
72 cow
73}
74
75/// Farms `cow`, replacing `\r` and `\n` with ` ` in the process.
76///
77/// This converts any string into a valid event `value`.
78fn farm_value(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
79 let mut i = 0;
80 let mut cow = farm(cow);
81 while i < cow.len() {
82 if let Some(k) = memchr::memchr2(b'\r', b'\n', &cow[i..]) {
83 cow.to_mut()[i + k] = b' ';
84 // This can't overflow as i + k + 1 <= len, since we found a char.
85 i += k + 1;
86 } else {
87 break;
88 }
89 }
90
91 cow
92}
93
94impl RawLinedEvent {
95 /// Create a `RawLinedEvent` from a valid, prefarmed `name` and `value`.
96 fn prefarmed(name: Cow<'static, [u8]>, value: Cow<'static, [u8]>) -> RawLinedEvent {
97 let name = Cursor::new(name);
98 let mut value = Cursor::new(value).take(0);
99 advance(&mut value);
100 RawLinedEvent { name, value, state: State::Name }
101 }
102
103 /// Create a `RawLinedEvent` from potentially invalid `name` and `value`
104 /// where `value` is not allowed to be multiple lines.
105 ///
106 /// Characters `\n`, `\r`, and ':' in `name` and characters `\r` \`n` in
107 /// `value` `are replaced with a space ` `.
108 pub fn one<N, V>(name: N, value: V) -> RawLinedEvent
109 where N: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>
110 {
111 RawLinedEvent::prefarmed(farm_name(name.into()), farm_value(value.into()))
112 }
113
114 /// Create a `RawLinedEvent` from potentially invalid `name` and `value`
115 /// where `value` is allowed to be multiple lines.
116 ///
117 /// Characters `\n`, `\r`, and ':' in `name` are replaced with a space ` `.
118 /// `value` is allowed to contain any character. New lines (`\r\n` or `\n`)
119 /// and carriage returns `\r` result in a new event being emitted.
120 pub fn many<N, V>(name: N, value: V) -> RawLinedEvent
121 where N: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>
122 {
123 RawLinedEvent::prefarmed(farm_name(name.into()), farm(value.into()))
124 }
125
126 /// Create a `RawLinedEvent` from known value `value`. The value is emitted
127 /// directly with _no_ name and suffixed with a `\n`.
128 pub fn raw<V: Into<Cow<'static, str>>>(value: V) -> RawLinedEvent {
129 let value = value.into();
130 let len = value.len();
131 RawLinedEvent {
132 name: Cursor::new(Cow::Borrowed(&[])),
133 value: Cursor::new(farm(value)).take(len as u64),
134 state: State::Value
135 }
136 }
137}
138
139/// The `AsyncRead`er state.
140#[derive(Debug, PartialEq, Copy, Clone)]
141enum State {
142 Name,
143 Colon,
144 Value,
145 NewLine,
146 Done
147}
148
149/// Find the next new-line (`\n` or `\r`) character in `buf` beginning at the
150/// current cursor position and sets the limit to be at that position.
151fn advance<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
152 // Technically, the position need not be <= len, so we right it.
153 let pos = min(buf.get_ref().get_ref().as_ref().len() as u64, buf.get_ref().position());
154 let inner = buf.get_ref().get_ref().as_ref();
155 let next = memchr::memchr2(b'\n', b'\r', &inner[(pos as usize)..])
156 .map(|i| pos + i as u64)
157 .unwrap_or_else(|| inner.len() as u64);
158
159 let limit = next - pos;
160 buf.set_limit(limit);
161}
162
163/// If the cursor in `buf` is currently at an `\r`, `\r\n` or `\n`, sets the
164/// cursor position to be _after_ the characters.
165fn skip<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
166 let pos = min(buf.get_ref().get_ref().as_ref().len() as u64, buf.get_ref().position());
167 match buf.get_ref().get_ref().as_ref().get(pos as usize) {
168 // This cannot overflow as clearly `buf.len() >= pos + 1`.
169 Some(b'\n') => buf.get_mut().set_position(pos + 1),
170 Some(b'\r') => {
171 let next = (pos as usize).saturating_add(1);
172 if buf.get_ref().get_ref().as_ref().get(next) == Some(&b'\n') {
173 // This cannot overflow as clearly `buf.len() >= pos + 2`.
174 buf.get_mut().set_position(pos + 2);
175 } else {
176 // This cannot overflow as clearly `buf.len() >= pos + 1`.
177 buf.get_mut().set_position(pos + 1);
178 }
179 }
180 _ => return,
181 }
182}
183
184
185macro_rules! dbg_assert_ready {
186 ($e:expr) => ({
187 let poll = $e;
188 debug_assert!(poll.is_ready());
189 ::futures::ready!(poll)
190 })
191}
192
193// NOTE: The correctness of this implementation depends on the types of `name`
194// and `value` having `AsyncRead` implementations that always return `Ready`.
195// Otherwise, we may return `Pending` after having written data to `buf` which
196// violates the contract. This can happen because even after a successful
197// partial or full read of `name`, we loop back to a `ready!(name.poll())` if
198// `buf` was not completely filled. So, we return `Pending` if that poll does.
199impl AsyncRead for RawLinedEvent {
200 fn poll_read(
201 mut self: Pin<&mut Self>,
202 cx: &mut Context<'_>,
203 buf: &mut ReadBuf<'_>,
204 ) -> Poll<io::Result<()>> {
205 use bytes::Buf;
206
207 loop {
208 if buf.remaining() == 0 {
209 return Poll::Ready(Ok(()));
210 }
211
212 match self.state {
213 State::Name => {
214 dbg_assert_ready!(Pin::new(&mut self.name).poll_read(cx, buf))?;
215 if !self.name.has_remaining() {
216 self.name.set_position(0);
217 self.state = State::Colon;
218 }
219 }
220 State::Colon => {
221 // Note that we've checked `buf.remaining() != 0`.
222 buf.put_slice(&[b':']);
223 self.state = State::Value;
224 }
225 State::Value => {
226 dbg_assert_ready!(Pin::new(&mut self.value).poll_read(cx, buf))?;
227 if self.value.limit() == 0 {
228 self.state = State::NewLine;
229 }
230 }
231 State::NewLine => {
232 // Note that we've checked `buf.remaining() != 0`.
233 buf.put_slice(&[b'\n']);
234 if self.value.get_ref().has_remaining() {
235 skip(&mut self.value);
236 advance(&mut self.value);
237 self.state = State::Name;
238 } else {
239 self.state = State::Done;
240 }
241 }
242 State::Done => return Poll::Ready(Ok(()))
243 }
244 }
245 }
246}