rocket/response/stream/
sse.rs

1use std::borrow::Cow;
2
3use tokio::io::AsyncRead;
4use tokio::time::Duration;
5use futures::stream::{self, Stream, StreamExt};
6use futures::future::ready;
7
8use crate::request::Request;
9use crate::response::{self, Response, Responder, stream::{ReaderStream, RawLinedEvent}};
10use crate::http::ContentType;
11
12/// A Server-Sent `Event` (SSE) in a Server-Sent [`struct@EventStream`].
13///
14/// A server-sent event is either a _field_ or a _comment_. Comments can be
15/// constructed via [`Event::comment()`] while fields can be constructed via
16/// [`Event::data()`], [`Event::json()`], and [`Event::retry()`].
17///
18/// ```rust
19/// use rocket::tokio::time::Duration;
20/// use rocket::response::stream::Event;
21///
22/// // A `data` event with message "Hello, SSE!".
23/// let event = Event::data("Hello, SSE!");
24///
25/// // The same event but with event name of `hello`.
26/// let event = Event::data("Hello, SSE!").event("hello");
27///
28/// // A `retry` event to set the client-side reconnection time.
29/// let event = Event::retry(Duration::from_secs(5));
30///
31/// // An event with an attached comment, event name, and ID.
32/// let event = Event::data("Hello, SSE!")
33///     .with_comment("just a hello message")
34///     .event("hello")
35///     .id("1");
36/// ```
37///
38/// We largely defer to [MDN's using server-sent events] documentation for
39/// client-side details but reproduce, in our words, relevant server-side
40/// documentation here.
41///
42/// [MDN's using server-sent events]:
43/// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
44///
45/// # Pitfalls
46///
47/// Server-Sent Events suffer from certain pitfalls. We encourage readers to
48/// read through [pitfalls](struct@EventStream#pitfalls) before making use of
49/// Rocket's SSE support.
50///
51/// # Comments
52///
53/// A server-sent _comment_, created via [`Event::comment()`], is an event that
54/// appears only in the raw server-sent event data stream and is inaccessible by
55/// most clients. This includes JavaScript's `EventSource`. As such, they serve
56/// little utility beyond debugging a raw data stream and keeping a connection
57/// alive. See [heartbeat](struct@EventStream#heartbeat) for information on
58/// Rocket's `EventStream` keep-alive.
59///
60/// # Fields
61///
62/// A server-sent field can be one of four kinds:
63///
64///   * `retry`
65///
66///     A `retry` event, created via [`Event::retry()`], sets the reconnection
67///     time on the client side. It is the duration the client will wait before
68///     attempting to reconnect when a connection is lost. Most applications
69///     will not need to set a `retry`, opting instead to use the
70///     implementation's default or to reconnect manually on error.
71///
72///   * `id`
73///
74///     Sets the event id to associate all subsequent fields with. This value
75///     cannot be retrieved directly via most clients, including JavaScript
76///     `EventSource`. Instead, it is sent by the implementation on reconnection
77///     via the `Last-Event-ID` header. An `id` can be attached to other fields
78///     via the [`Event::id()`] builder method.
79///
80///   * `event`
81///
82///     Sets the event name to associate the next `data` field with. In
83///     JavaScript's `EventSource`, this is the event to be listened for, which
84///     defaults to `message`. An `event` can be attached to other fields via
85///     the [`Event::event()`] builder method.
86///
87///   * `data`
88///
89///     Sends data to dispatch as an event at the client. In JavaScript's
90///     `EventSource`, this (and only this) results in an event handler for
91///     `event`, specified just prior, being triggered. A data field can be
92///     created via the [`Event::data()`] or [`Event::json()`] constructors.
93///
94/// # Implementation Notes
95///
96/// A constructed `Event` _always_ emits its fields in the following order:
97///
98///   1. `comment`
99///   2. `retry`
100///   3. `id`
101///   4. `event`
102///   5. `data`
103///
104/// The `event` and `id` fields _cannot_ contain new lines or carriage returns.
105/// Rocket's default implementation automatically converts new lines and
106/// carriage returns in `event` and `id` fields to spaces.
107///
108/// The `data` and `comment` fields _cannot_ contain carriage returns. Rocket
109/// converts the unencoded sequence `\r\n` and the isolated `\r` into a
110/// protocol-level `\n`, that is, in such a way that they are interpreted as
111/// `\n` at the client. For example, the raw message `foo\r\nbar\rbaz` is
112/// received as `foo\nbar\nbaz` at the client-side. Encoded sequences, such as
113/// those emitted by [`Event::json()`], have no such restrictions.
114#[derive(Clone, Eq, PartialEq, Hash, Debug)]
115pub struct Event {
116    comment: Option<Cow<'static, str>>,
117    retry: Option<Duration>,
118    id: Option<Cow<'static, str>>,
119    event: Option<Cow<'static, str>>,
120    data: Option<Cow<'static, str>>,
121}
122
123impl Event {
124    // We hide this since we never want to construct an `Event` with nothing.
125    fn new() -> Self {
126        Event { comment: None, retry: None, id: None, event: None, data: None, }
127    }
128
129    /// Creates a new `Event` with an empty data field.
130    ///
131    /// This is exactly equivalent to `Event::data("")`.
132    ///
133    /// # Example
134    ///
135    /// ```rust
136    /// use rocket::response::stream::Event;
137    ///
138    /// let event = Event::empty();
139    /// ```
140    pub fn empty() -> Self {
141        Event::data("")
142    }
143
144    /// Creates a new `Event` with a data field of `data` serialized as JSON.
145    ///
146    /// # Example
147    ///
148    /// ```rust
149    /// use rocket::serde::Serialize;
150    /// use rocket::response::stream::Event;
151    ///
152    /// #[derive(Serialize)]
153    /// #[serde(crate = "rocket::serde")]
154    /// struct MyData<'r> {
155    ///     string: &'r str,
156    ///     number: usize,
157    /// }
158    ///
159    /// let data = MyData { string: "hello!", number: 10 };
160    /// let event = Event::json(&data);
161    /// ```
162    #[cfg(feature = "json")]
163    #[cfg_attr(nightly, doc(cfg(feature = "json")))]
164    pub fn json<T: serde::Serialize>(data: &T) -> Self {
165        let string = serde_json::to_string(data).unwrap_or_default();
166        Self::data(string)
167    }
168
169    /// Creates a new `Event` with a data field containing the raw `data`.
170    ///
171    /// # Raw SSE is Lossy
172    ///
173    /// Unencoded carriage returns cannot be expressed in the protocol. Thus,
174    /// any carriage returns in `data` will not appear at the client-side.
175    /// Instead, the sequence `\r\n` and the isolated `\r` will each appear as
176    /// `\n` at the client-side. For example, the message `foo\r\nbar\rbaz` is
177    /// received as `foo\nbar\nbaz` at the client-side.
178    ///
179    /// See [pitfalls](struct@EventStream#pitfalls) for more details.
180    ///
181    /// # Example
182    ///
183    /// ```rust
184    /// use rocket::response::stream::Event;
185    ///
186    /// // A `data` event with message "Hello, SSE!".
187    /// let event = Event::data("Hello, SSE!");
188    /// ```
189    pub fn data<T: Into<Cow<'static, str>>>(data: T) -> Self {
190        Self { data: Some(data.into()), ..Event::new() }
191    }
192
193    /// Creates a new comment `Event`.
194    ///
195    /// As with [`Event::data()`], unencoded carriage returns cannot be
196    /// expressed in the protocol. Thus, any carriage returns in `data` will
197    /// not appear at the client-side. For comments, this is generally not a
198    /// concern as comments are discarded by client-side libraries.
199    ///
200    /// # Example
201    ///
202    /// ```rust
203    /// use rocket::response::stream::Event;
204    ///
205    /// let event = Event::comment("bet you'll never see me!");
206    /// ```
207    pub fn comment<T: Into<Cow<'static, str>>>(data: T) -> Self {
208        Self { comment: Some(data.into()), ..Event::new() }
209    }
210
211    /// Creates a new retry `Event`.
212    ///
213    /// # Example
214    ///
215    /// ```rust
216    /// use rocket::response::stream::Event;
217    /// use rocket::tokio::time::Duration;
218    ///
219    /// let event = Event::retry(Duration::from_millis(250));
220    /// ```
221    pub fn retry(period: Duration) -> Self {
222        Self { retry: Some(period), ..Event::new() }
223    }
224
225    /// Sets the value of the 'event' (event type) field.
226    ///
227    /// Event names may not contain new lines `\n` or carriage returns `\r`. If
228    /// `event` _does_ contain new lines or carriage returns, they are replaced
229    /// with spaces (` `) before being sent to the client.
230    ///
231    /// # Example
232    ///
233    /// ```rust
234    /// use rocket::response::stream::Event;
235    ///
236    /// // The event name is "start".
237    /// let event = Event::data("hi").event("start");
238    ///
239    /// // The event name is "then end", with `\n` replaced with ` `.
240    /// let event = Event::data("bye").event("then\nend");
241    /// ```
242    pub fn event<T: Into<Cow<'static, str>>>(mut self, event: T) -> Self {
243        self.event = Some(event.into());
244        self
245    }
246
247    /// Sets the value of the 'id' (last event ID) field.
248    ///
249    /// Event IDs may not contain new lines `\n` or carriage returns `\r`. If
250    /// `id` _does_ contain new lines or carriage returns, they are replaced
251    /// with spaces (` `) before being sent to the client.
252    ///
253    /// # Example
254    ///
255    /// ```rust
256    /// use rocket::response::stream::Event;
257    ///
258    /// // The event ID is "start".
259    /// let event = Event::data("hi").id("start");
260    ///
261    /// // The event ID is "then end", with `\n` replaced with ` `.
262    /// let event = Event::data("bye").id("then\nend");
263    /// ```
264    /// Sets the value of the 'id' field. It may not contain newlines.
265    pub fn id<T: Into<Cow<'static, str>>>(mut self, id: T) -> Self {
266        self.id = Some(id.into());
267        self
268    }
269
270    /// Sets or replaces the value of the `data` field.
271    ///
272    /// # Example
273    ///
274    /// ```rust
275    /// use rocket::response::stream::Event;
276    ///
277    /// // The data "hello" will be sent.
278    /// let event = Event::data("hi").with_data("hello");
279    ///
280    /// // The two below are equivalent.
281    /// let event = Event::comment("bye").with_data("goodbye");
282    /// let event = Event::data("goodbye").with_comment("bye");
283    /// ```
284    pub fn with_data<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
285        self.data = Some(data.into());
286        self
287    }
288
289    /// Sets or replaces the value of the `comment` field.
290    ///
291    /// # Example
292    ///
293    /// ```rust
294    /// use rocket::response::stream::Event;
295    ///
296    /// // The comment "🚀" will be sent.
297    /// let event = Event::comment("Rocket is great!").with_comment("🚀");
298    ///
299    /// // The two below are equivalent.
300    /// let event = Event::comment("bye").with_data("goodbye");
301    /// let event = Event::data("goodbye").with_comment("bye");
302    /// ```
303    pub fn with_comment<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
304        self.comment = Some(data.into());
305        self
306    }
307
308    /// Sets or replaces the value of the `retry` field.
309    ///
310    /// # Example
311    ///
312    /// ```rust
313    /// use rocket::response::stream::Event;
314    /// use rocket::tokio::time::Duration;
315    ///
316    /// // The reconnection will be set to 10 seconds.
317    /// let event = Event::retry(Duration::from_millis(500))
318    ///     .with_retry(Duration::from_secs(10));
319    ///
320    /// // The two below are equivalent.
321    /// let event = Event::comment("bye").with_retry(Duration::from_millis(500));
322    /// let event = Event::retry(Duration::from_millis(500)).with_comment("bye");
323    /// ```
324    pub fn with_retry(mut self, period: Duration) -> Self {
325        self.retry = Some(period);
326        self
327    }
328
329    fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
330        let events = [
331            self.comment.map(|v| RawLinedEvent::many("", v)),
332            self.retry.map(|r| RawLinedEvent::one("retry", format!("{}", r.as_millis()))),
333            self.id.map(|v| RawLinedEvent::one("id", v)),
334            self.event.map(|v| RawLinedEvent::one("event", v)),
335            self.data.map(|v| RawLinedEvent::many("data", v)),
336            Some(RawLinedEvent::raw("")),
337        ];
338
339        stream::iter(events).filter_map(ready)
340    }
341}
342
343/// A potentially infinite stream of Server-Sent [`Event`]s (SSE).
344///
345/// An `EventStream` can be constructed from any [`Stream`] of items of type
346/// `Event`. The stream can be constructed directly via [`EventStream::from()`]
347/// or through generator syntax via [`EventStream!`].
348///
349/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
350///
351/// # Responder
352///
353/// `EventStream` is a (potentially infinite) responder. The response
354/// `Content-Type` is set to [`EventStream`](const@ContentType::EventStream).
355/// The body is [unsized](crate::response::Body#unsized), and values are sent as
356/// soon as they are yielded by the internal iterator.
357///
358/// ## Heartbeat
359///
360/// A heartbeat comment is injected into the internal stream and sent at a fixed
361/// interval. The comment is discarded by clients and serves only to keep the
362/// connection alive; it does not interfere with application data. The interval
363/// defaults to 30 seconds but can be adjusted with
364/// [`EventStream::heartbeat()`].
365///
366/// # Examples
367///
368/// Use [`EventStream!`] to yield an infinite series of "ping" SSE messages to
369/// the client, one per second:
370///
371/// ```rust
372/// # use rocket::*;
373/// use rocket::response::stream::{Event, EventStream};
374/// use rocket::tokio::time::{self, Duration};
375///
376/// #[get("/events")]
377/// fn stream() -> EventStream![] {
378///     EventStream! {
379///         let mut interval = time::interval(Duration::from_secs(1));
380///         loop {
381///             yield Event::data("ping");
382///             interval.tick().await;
383///         }
384///     }
385/// }
386/// ```
387///
388/// Yield 9 events: 3 triplets of `retry`, `data`, and `comment` events:
389///
390/// ```rust
391/// # use rocket::get;
392/// use rocket::response::stream::{Event, EventStream};
393/// use rocket::tokio::time::Duration;
394///
395/// #[get("/events")]
396/// fn events() -> EventStream![] {
397///     EventStream! {
398///         for i in 0..3 {
399///             yield Event::retry(Duration::from_secs(10));
400///             yield Event::data(format!("{}", i)).id("cat").event("bar");
401///             yield Event::comment("silly boy");
402///         }
403///     }
404/// }
405/// ```
406///
407/// The syntax of `EventStream!` as an expression is identical to that of
408/// [`stream!`](crate::response::stream::stream). For how to gracefully
409/// terminate an otherwise infinite stream, see [graceful
410/// shutdown](crate::response::stream#graceful-shutdown).
411///
412/// # Borrowing
413///
414/// If an `EventStream` contains a borrow, the extended type syntax
415/// `EventStream![Event + '_]` must be used:
416///
417/// ```rust
418/// # use rocket::get;
419/// use rocket::State;
420/// use rocket::response::stream::{Event, EventStream};
421///
422/// #[get("/events")]
423/// fn events(ctxt: &State<bool>) -> EventStream![Event + '_] {
424///     EventStream! {
425///         // By using `ctxt` in the stream, the borrow is moved into it. Thus,
426///         // the stream object contains a borrow, prompting the '_ annotation.
427///         if *ctxt.inner() {
428///             yield Event::data("hi");
429///         }
430///     }
431/// }
432/// ```
433///
434/// See [`stream#borrowing`](crate::response::stream#borrowing) for further
435/// details on borrowing in streams.
436///
437/// # Pitfalls
438///
439/// Server-Sent Events are a rather simple mechanism, though there are some
440/// pitfalls to be aware of.
441///
442///  * **Buffering**
443///
444///    Protocol restrictions complicate implementing an API that does not
445///    buffer. As such, if you are sending _lots_ of data, consider sending the
446///    data via multiple data fields (with events to signal start and end).
447///    Alternatively, send _one_ event which instructs the client to fetch the
448///    data from another endpoint which in-turn streams the data.
449///
450///  * **Raw SSE requires UTF-8 data**
451///
452///    Only UTF-8 data can be sent via SSE. If you need to send arbitrary bytes,
453///    consider encoding it, for instance, as JSON using [`Event::json()`].
454///    Alternatively, as described before, use SSE as a notifier which alerts
455///    the client to fetch the data from elsewhere.
456///
457///  * **Raw SSE is Lossy**
458///
459///    Data sent via SSE cannot contain new lines `\n` or carriage returns `\r`
460///    due to interference with the line protocol.
461///
462///    The protocol allows expressing new lines as multiple messages, however,
463///    and Rocket automatically transforms a message of `foo\nbar` into two
464///    messages, `foo` and `bar`, so that they are reconstructed (automatically)
465///    as `foo\nbar` on the client-side. For messages that only contain new
466///    lines `\n`, the conversion is lossless.
467///
468///    However, the protocol has no mechanism for expressing carriage returns
469///    and thus it is not possible to send unencoded carriage returns via SSE.
470///    Rocket handles carriage returns like it handles new lines: it splits the
471///    data into multiple messages. Thus, a sequence of `\r\n` becomes `\n` at
472///    the client side. A single `\r` that is not part of an `\r\n` sequence
473///    also becomes `\n` at the client side. As a result, the message
474///    `foo\r\nbar\rbaz` is read as `foo\nbar\nbaz` at the client-side.
475///
476///    To send messages losslessly, they must be encoded first, for instance, by
477///    using [`Event::json()`].
478///
479///  * **Clients reconnect ad-infinitum**
480///
481///    The [SSE standard] stipulates: _"Clients will reconnect if the connection
482///    is closed; a client can be told to stop reconnecting using the HTTP 204
483///    No Content response code."_ As a result, clients will typically reconnect
484///    exhaustively until either they choose to disconnect or they receive a
485///    `204 No Content` response.
486///
487///    [SSE standard]: https://html.spec.whatwg.org/multipage/server-sent-events.html
488pub struct EventStream<S> {
489    stream: S,
490    heartbeat: Option<Duration>,
491}
492
493impl<S: Stream<Item = Event>> EventStream<S> {
494    /// Sets a "ping" interval for this `EventStream` to avoid connection
495    /// timeouts when no data is being transferred. The default `interval` is 30
496    /// seconds.
497    ///
498    /// The ping is implemented by sending an empty comment to the client every
499    /// `interval` seconds.
500    ///
501    /// # Example
502    ///
503    /// ```rust
504    /// # use rocket::get;
505    /// use rocket::response::stream::{Event, EventStream};
506    /// use rocket::tokio::time::Duration;
507    ///
508    /// #[get("/events")]
509    /// fn events() -> EventStream![] {
510    ///     // Remove the default heartbeat.
511    ///     # let event_stream = rocket::futures::stream::pending();
512    ///     EventStream::from(event_stream).heartbeat(None);
513    ///
514    ///     // Set the heartbeat interval to 15 seconds.
515    ///     # let event_stream = rocket::futures::stream::pending();
516    ///     EventStream::from(event_stream).heartbeat(Duration::from_secs(15));
517    ///
518    ///     // Do the same but for a generated `EventStream`:
519    ///     let stream = EventStream! {
520    ///         yield Event::data("hello");
521    ///     };
522    ///
523    ///     stream.heartbeat(Duration::from_secs(15))
524    /// }
525    /// ```
526    pub fn heartbeat<H: Into<Option<Duration>>>(mut self, heartbeat: H) -> Self {
527        self.heartbeat = heartbeat.into();
528        self
529    }
530
531    fn heartbeat_stream(&self) -> Option<impl Stream<Item = RawLinedEvent>> {
532        use tokio::time::interval;
533        use tokio_stream::wrappers::IntervalStream;
534
535        self.heartbeat
536            .map(|beat| IntervalStream::new(interval(beat)))
537            .map(|stream| stream.map(|_| RawLinedEvent::raw(":")))
538    }
539
540    fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
541        use futures::future::Either;
542        use crate::ext::StreamExt;
543
544        let heartbeat_stream = self.heartbeat_stream();
545        let raw_events = self.stream.map(|e| e.into_stream()).flatten();
546        match heartbeat_stream {
547            Some(heartbeat) => Either::Left(raw_events.join(heartbeat)),
548            None => Either::Right(raw_events)
549        }
550    }
551
552    fn into_reader(self) -> impl AsyncRead {
553        ReaderStream::from(self.into_stream())
554    }
555}
556
557impl<S: Stream<Item = Event>> From<S> for EventStream<S> {
558    /// Creates an `EventStream` from a [`Stream`] of [`Event`]s.
559    ///
560    /// Use `EventStream::from()` to construct an `EventStream` from an already
561    /// existing stream. Otherwise, prefer to use [`EventStream!`].
562    ///
563    /// # Example
564    ///
565    /// ```rust
566    /// use rocket::response::stream::{Event, EventStream};
567    /// use rocket::futures::stream;
568    ///
569    /// let raw = stream::iter(vec![Event::data("a"), Event::data("b")]);
570    /// let stream = EventStream::from(raw);
571    /// ```
572    fn from(stream: S) -> Self {
573        EventStream { stream, heartbeat: Some(Duration::from_secs(30)), }
574    }
575}
576
577impl<'r, S: Stream<Item = Event> + Send + 'r> Responder<'r, 'r> for EventStream<S> {
578    fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
579        Response::build()
580            .header(ContentType::EventStream)
581            .raw_header("Cache-Control", "no-cache")
582            .raw_header("Expires", "0")
583            .streamed_body(self.into_reader())
584            .ok()
585    }
586}
587
588crate::export! {
589    /// Type and stream expression macro for [`struct@EventStream`].
590    ///
591    /// See [`stream!`](crate::response::stream::stream) for the syntax
592    /// supported by this macro. In addition to that syntax, this macro can also
593    /// be called with no arguments, `EventStream![]`, as shorthand for
594    /// `EventStream![Event]`.
595    ///
596    /// See [`struct@EventStream`] and the [module level
597    /// docs](crate::response::stream#typed-streams) for usage details.
598    macro_rules! EventStream {
599        () => ($crate::_typed_stream!(EventStream, $crate::response::stream::Event));
600        ($($s:tt)*) => ($crate::_typed_stream!(EventStream, $($s)*));
601    }
602}
603
604#[cfg(test)]
605mod sse_tests {
606    use tokio::io::AsyncReadExt;
607    use tokio::time::{self, Duration};
608    use futures::stream::Stream;
609    use crate::response::stream::{stream, Event, EventStream, ReaderStream};
610
611    impl Event {
612        fn into_string(self) -> String {
613            crate::async_test(async move {
614                let mut string = String::new();
615                let mut reader = ReaderStream::from(self.into_stream());
616                reader.read_to_string(&mut string).await.expect("event -> string");
617                string
618            })
619        }
620    }
621
622    impl<S: Stream<Item = Event>> EventStream<S> {
623        fn into_string(self) -> String {
624            crate::async_test(async move {
625                let mut string = String::new();
626                let reader = self.into_reader();
627                tokio::pin!(reader);
628                reader.read_to_string(&mut string).await.expect("event stream -> string");
629                string
630            })
631        }
632    }
633
634    #[test]
635    fn test_event_data() {
636        let event = Event::data("a\nb");
637        assert_eq!(event.into_string(), "data:a\ndata:b\n\n");
638
639        let event = Event::data("a\n");
640        assert_eq!(event.into_string(), "data:a\ndata:\n\n");
641
642        let event = Event::data("cats make me happy!");
643        assert_eq!(event.into_string(), "data:cats make me happy!\n\n");
644
645        let event = Event::data("in the\njungle\nthe mighty\njungle");
646        assert_eq!(event.into_string(),
647            "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
648
649        let event = Event::data("in the\njungle\r\nthe mighty\rjungle");
650        assert_eq!(event.into_string(),
651            "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
652
653        let event = Event::data("\nb\n");
654        assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
655
656        let event = Event::data("\r\nb\n");
657        assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
658
659        let event = Event::data("\r\nb\r\n");
660        assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
661
662        let event = Event::data("\n\nb\n");
663        assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
664
665        let event = Event::data("\n\rb\n");
666        assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
667
668        let event = Event::data("\n\rb\r");
669        assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
670
671        let event = Event::comment("\n\rb\r");
672        assert_eq!(event.into_string(), ":\n:\n:b\n:\n\n");
673
674        let event = Event::data("\n\n\n");
675        assert_eq!(event.into_string(), "data:\ndata:\ndata:\ndata:\n\n");
676
677        let event = Event::data("\n");
678        assert_eq!(event.into_string(), "data:\ndata:\n\n");
679
680        let event = Event::data("");
681        assert_eq!(event.into_string(), "data:\n\n");
682    }
683
684    #[test]
685    fn test_event_fields() {
686        let event = Event::data("foo").id("moo");
687        assert_eq!(event.into_string(), "id:moo\ndata:foo\n\n");
688
689        let event = Event::data("foo").id("moo").with_retry(Duration::from_secs(45));
690        assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\n\n");
691
692        let event = Event::data("foo\nbar").id("moo").with_retry(Duration::from_secs(45));
693        assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\ndata:bar\n\n");
694
695        let event = Event::retry(Duration::from_secs(45));
696        assert_eq!(event.into_string(), "retry:45000\n\n");
697
698        let event = Event::comment("incoming data...");
699        assert_eq!(event.into_string(), ":incoming data...\n\n");
700
701        let event = Event::data("foo").id("moo").with_comment("cows, ey?");
702        assert_eq!(event.into_string(), ":cows, ey?\nid:moo\ndata:foo\n\n");
703
704        let event = Event::data("foo\nbar")
705            .id("moo")
706            .event("milk")
707            .with_retry(Duration::from_secs(3));
708
709        assert_eq!(event.into_string(), "retry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\n\n");
710
711        let event = Event::data("foo")
712            .id("moo")
713            .event("milk")
714            .with_comment("??")
715            .with_retry(Duration::from_secs(3));
716
717        assert_eq!(event.into_string(), ":??\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
718
719        let event = Event::data("foo")
720            .id("moo")
721            .event("milk")
722            .with_comment("?\n?")
723            .with_retry(Duration::from_secs(3));
724
725        assert_eq!(event.into_string(), ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
726
727        let event = Event::data("foo\r\nbar\nbaz")
728            .id("moo")
729            .event("milk")
730            .with_comment("?\n?")
731            .with_retry(Duration::from_secs(3));
732
733        assert_eq!(event.into_string(),
734            ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\ndata:baz\n\n");
735    }
736
737    #[test]
738    fn test_bad_chars() {
739        let event = Event::data("foo").id("dead\nbeef").event("m\noo");
740        assert_eq!(event.into_string(), "id:dead beef\nevent:m oo\ndata:foo\n\n");
741
742        let event = Event::data("f\no").id("d\r\nbe\rf").event("m\n\r");
743        assert_eq!(event.into_string(), "id:d  be f\nevent:m  \ndata:f\ndata:o\n\n");
744
745        let event = Event::data("f\no").id("\r\n\n\r\n\r\r").event("\n\rb");
746        assert_eq!(event.into_string(), "id:       \nevent:  b\ndata:f\ndata:o\n\n");
747    }
748
749    #[test]
750    fn test_event_stream() {
751        use futures::stream::iter;
752
753        let stream = EventStream::from(iter(vec![Event::data("foo")]));
754        assert_eq!(stream.into_string().replace(":\n\n", ""), "data:foo\n\n");
755
756        let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
757        assert_eq!(stream.into_string().replace(":\n\n", ""), "data:a\n\ndata:b\n\n");
758
759        let stream = EventStream::from(iter(vec![
760                Event::data("a\nb"),
761                Event::data("b"),
762                Event::data("c\n\nd"),
763                Event::data("e"),
764        ]));
765
766        assert_eq!(stream.into_string().replace(":\n\n", ""),
767            "data:a\ndata:b\n\ndata:b\n\ndata:c\ndata:\ndata:d\n\ndata:e\n\n");
768    }
769
770    #[test]
771    fn test_heartbeat() {
772        use futures::future::ready;
773        use futures::stream::{once, iter, StreamExt};
774
775        const HEARTBEAT: &str = ":\n";
776
777        // Set a heartbeat interval of 250ms. Send nothing for 600ms. We should
778        // get 2 or 3 heartbeats, the latter if one is sent eagerly. Maybe 4.
779        let raw = stream!(time::sleep(Duration::from_millis(600)).await;)
780            .map(|_| unreachable!());
781
782        let string = EventStream::from(raw)
783            .heartbeat(Duration::from_millis(250))
784            .into_string();
785
786        let heartbeats = string.matches(HEARTBEAT).count();
787        assert!(heartbeats >= 2 && heartbeats <= 4, "got {} beat(s)", heartbeats);
788
789        let stream = EventStream! {
790            time::sleep(Duration::from_millis(250)).await;
791            yield Event::data("foo");
792            time::sleep(Duration::from_millis(250)).await;
793            yield Event::data("bar");
794        };
795
796        // We expect: foo\n\n [heartbeat] bar\n\n [maybe heartbeat].
797        let string = stream.heartbeat(Duration::from_millis(350)).into_string();
798        let heartbeats = string.matches(HEARTBEAT).count();
799        assert!(heartbeats >= 1 && heartbeats <= 3, "got {} beat(s)", heartbeats);
800        assert!(string.contains("data:foo\n\n"), "string = {:?}", string);
801        assert!(string.contains("data:bar\n\n"), "string = {:?}", string);
802
803        // We shouldn't send a heartbeat if a message is immediately available.
804        let stream = EventStream::from(once(ready(Event::data("hello"))));
805        let string = stream.heartbeat(Duration::from_secs(1)).into_string();
806        assert_eq!(string, "data:hello\n\n", "string = {:?}", string);
807
808        // It's okay if we do it with two, though.
809        let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
810        let string = stream.heartbeat(Duration::from_secs(1)).into_string();
811        let heartbeats = string.matches(HEARTBEAT).count();
812        assert!(heartbeats <= 1);
813        assert!(string.contains("data:a\n\n"), "string = {:?}", string);
814        assert!(string.contains("data:b\n\n"), "string = {:?}", string);
815    }
816}