1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
use std::borrow::Cow;
use std::io::{self, Cursor};
use std::task::{Context, Poll};
use std::pin::Pin;
use std::cmp::min;

use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf, Take};

/// Low-level serialization of fields in text/event-stream format.
///
/// Based on https://html.spec.whatwg.org/multipage/server-sent-events.html,
/// reproduced here for quick reference. Retrieved 2021-04-17.
///
/// ```text
/// stream        = [ bom ] *event
/// event         = *( comment / field ) end-of-line
/// comment       = colon *any-char end-of-line
/// field         = 1*name-char [ colon [ space ] *any-char ] end-of-line
/// end-of-line   = ( cr lf / cr / lf )
///
/// ; characters
/// lf            = %x000A ; U+000A LINE FEED (LF)
/// cr            = %x000D ; U+000D CARRIAGE RETURN (CR)
/// space         = %x0020 ; U+0020 SPACE
/// colon         = %x003A ; U+003A COLON (:)
/// bom           = %xFEFF ; U+FEFF BYTE ORDER MARK
/// name-char     = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
///                 ; a scalar value other than:
///                 ; U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
/// any-char      = %x0000-0009 / %x000B-000C / %x000E-10FFFF
///                 ; a scalar value other than:
///                 ; U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)/
/// ```
///
/// Notice that Multiple encodings are possible for the same data, especially in
/// the choice of newline. This implementation always uses only "\n" (LF).
///
/// Serializes (via `AsyncRead`) as a series of "${name}:${value}\n" events.
/// Either or both `name` and `value` may be empty. When the name is empty, this
/// is a comment. Otherwise, this is a field.
#[derive(Debug)]
pub struct RawLinedEvent {
    name: Cursor<Cow<'static, [u8]>>,
    value: Take<Cursor<Cow<'static, [u8]>>>,
    state: State,
}

/// Converts a `Cow<str>` to a `Cow<[u8]>`.
fn farm(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
    match cow {
        Cow::Borrowed(slice) => Cow::Borrowed(slice.as_bytes()),
        Cow::Owned(vec) => Cow::Owned(vec.into_bytes())
    }
}

/// Farms `cow`, replacing `\r`, `\n`, and `:` with ` ` in the process.
///
/// This converts any string into a valid event `name`.
fn farm_name(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
    let mut i = 0;
    let mut cow = farm(cow);
    while i < cow.len() {
        if let Some(k) = memchr::memchr3(b'\r', b'\n', b':', &cow[i..]) {
            cow.to_mut()[i + k] = b' ';
            // This can't overflow as i + k + 1 <= len, since we found a char.
            i += k + 1;
        } else {
            break;
        }
    }

    cow
}

/// Farms `cow`, replacing `\r` and `\n` with ` ` in the process.
///
/// This converts any string into a valid event `value`.
fn farm_value(cow: Cow<'_, str>) -> Cow<'_, [u8]> {
    let mut i = 0;
    let mut cow = farm(cow);
    while i < cow.len() {
        if let Some(k) = memchr::memchr2(b'\r', b'\n', &cow[i..]) {
            cow.to_mut()[i + k] = b' ';
            // This can't overflow as i + k + 1 <= len, since we found a char.
            i += k + 1;
        } else {
            break;
        }
    }

    cow
}

impl RawLinedEvent {
    /// Create a `RawLinedEvent` from a valid, prefarmed `name` and `value`.
    fn prefarmed(name: Cow<'static, [u8]>, value: Cow<'static, [u8]>) -> RawLinedEvent {
        let name = Cursor::new(name);
        let mut value = Cursor::new(value).take(0);
        advance(&mut value);
        RawLinedEvent { name, value, state: State::Name }
    }

    /// Create a `RawLinedEvent` from potentially invalid `name` and `value`
    /// where `value` is not allowed to be multiple lines.
    ///
    /// Characters `\n`, `\r`, and ':' in `name` and characters `\r` \`n` in
    /// `value` `are replaced with a space ` `.
    pub fn one<N, V>(name: N, value: V) -> RawLinedEvent
        where N: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>
    {
        RawLinedEvent::prefarmed(farm_name(name.into()), farm_value(value.into()))
    }

    /// Create a `RawLinedEvent` from potentially invalid `name` and `value`
    /// where `value` is allowed to be multiple lines.
    ///
    /// Characters `\n`, `\r`, and ':' in `name` are replaced with a space ` `.
    /// `value` is allowed to contain any character. New lines (`\r\n` or `\n`)
    /// and carriage returns `\r` result in a new event being emitted.
    pub fn many<N, V>(name: N, value: V) -> RawLinedEvent
        where N: Into<Cow<'static, str>>, V: Into<Cow<'static, str>>
    {
        RawLinedEvent::prefarmed(farm_name(name.into()), farm(value.into()))
    }

    /// Create a `RawLinedEvent` from known value `value`. The value is emitted
    /// directly with _no_ name and suffixed with a `\n`.
    pub fn raw<V: Into<Cow<'static, str>>>(value: V) -> RawLinedEvent {
        let value = value.into();
        let len = value.len();
        RawLinedEvent {
            name: Cursor::new(Cow::Borrowed(&[])),
            value: Cursor::new(farm(value)).take(len as u64),
            state: State::Value
        }
    }
}

/// The `AsyncRead`er state.
#[derive(Debug, PartialEq, Copy, Clone)]
enum State {
    Name,
    Colon,
    Value,
    NewLine,
    Done
}

/// Find the next new-line (`\n` or `\r`) character in `buf` beginning at the
/// current cursor position and sets the limit to be at that position.
fn advance<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
    // Technically, the position need not be <= len, so we right it.
    let pos = min(buf.get_ref().get_ref().as_ref().len() as u64, buf.get_ref().position());
    let inner = buf.get_ref().get_ref().as_ref();
    let next = memchr::memchr2(b'\n', b'\r', &inner[(pos as usize)..])
        .map(|i| pos + i as u64)
        .unwrap_or_else(|| inner.len() as u64);

    let limit = next - pos;
    buf.set_limit(limit);
}

/// If the cursor in `buf` is currently at an `\r`, `\r\n` or `\n`, sets the
/// cursor position to be _after_ the characters.
fn skip<T: AsRef<[u8]> + Unpin>(buf: &mut Take<Cursor<T>>) {
    let pos = min(buf.get_ref().get_ref().as_ref().len() as u64, buf.get_ref().position());
    match buf.get_ref().get_ref().as_ref().get(pos as usize) {
        // This cannot overflow as clearly `buf.len() >= pos + 1`.
        Some(b'\n') => buf.get_mut().set_position(pos + 1),
        Some(b'\r') => {
            let next = (pos as usize).saturating_add(1);
            if buf.get_ref().get_ref().as_ref().get(next) == Some(&b'\n') {
                // This cannot overflow as clearly `buf.len() >= pos + 2`.
                buf.get_mut().set_position(pos + 2);
            } else {
                // This cannot overflow as clearly `buf.len() >= pos + 1`.
                buf.get_mut().set_position(pos + 1);
            }
        }
        _ => (),
    }
}

macro_rules! dbg_assert_ready {
    ($e:expr) => ({
        let poll = $e;
        debug_assert!(poll.is_ready());
        ::futures::ready!(poll)
    })
}

// NOTE: The correctness of this implementation depends on the types of `name`
// and `value` having `AsyncRead` implementations that always return `Ready`.
// Otherwise, we may return `Pending` after having written data to `buf` which
// violates the contract. This can happen because even after a successful
// partial or full read of `name`, we loop back to a `ready!(name.poll())` if
// `buf` was not completely filled. So, we return `Pending` if that poll does.
impl AsyncRead for RawLinedEvent {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        use bytes::Buf;

        loop {
            if buf.remaining() == 0 {
                return Poll::Ready(Ok(()));
            }

            match self.state {
                State::Name => {
                    dbg_assert_ready!(Pin::new(&mut self.name).poll_read(cx, buf))?;
                    if !self.name.has_remaining() {
                        self.name.set_position(0);
                        self.state = State::Colon;
                    }
                }
                State::Colon => {
                    // Note that we've checked `buf.remaining() != 0`.
                    buf.put_slice(&[b':']);
                    self.state = State::Value;
                }
                State::Value => {
                    dbg_assert_ready!(Pin::new(&mut self.value).poll_read(cx, buf))?;
                    if self.value.limit() == 0 {
                        self.state = State::NewLine;
                    }
                }
                State::NewLine => {
                    // Note that we've checked `buf.remaining() != 0`.
                    buf.put_slice(&[b'\n']);
                    if self.value.get_ref().has_remaining() {
                        skip(&mut self.value);
                        advance(&mut self.value);
                        self.state = State::Name;
                    } else {
                        self.state = State::Done;
                    }
                }
                State::Done => return Poll::Ready(Ok(()))
            }
        }
    }
}