rocket/response/stream/sse.rs
1use std::borrow::Cow;
2
3use tokio::io::AsyncRead;
4use tokio::time::Duration;
5use futures::stream::{self, Stream, StreamExt};
6use futures::future::ready;
7
8use crate::request::Request;
9use crate::response::{self, Response, Responder, stream::{ReaderStream, RawLinedEvent}};
10use crate::http::ContentType;
11
12/// A Server-Sent `Event` (SSE) in a Server-Sent [`struct@EventStream`].
13///
14/// A server-sent event is either a _field_ or a _comment_. Comments can be
15/// constructed via [`Event::comment()`] while fields can be constructed via
16/// [`Event::data()`], [`Event::json()`], and [`Event::retry()`].
17///
18/// ```rust
19/// use rocket::tokio::time::Duration;
20/// use rocket::response::stream::Event;
21///
22/// // A `data` event with message "Hello, SSE!".
23/// let event = Event::data("Hello, SSE!");
24///
25/// // The same event but with event name of `hello`.
26/// let event = Event::data("Hello, SSE!").event("hello");
27///
28/// // A `retry` event to set the client-side reconnection time.
29/// let event = Event::retry(Duration::from_secs(5));
30///
31/// // An event with an attached comment, event name, and ID.
32/// let event = Event::data("Hello, SSE!")
33/// .with_comment("just a hello message")
34/// .event("hello")
35/// .id("1");
36/// ```
37///
38/// We largely defer to [MDN's using server-sent events] documentation for
39/// client-side details but reproduce, in our words, relevant server-side
40/// documentation here.
41///
42/// [MDN's using server-sent events]:
43/// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
44///
45/// # Pitfalls
46///
47/// Server-Sent Events suffer from certain pitfalls. We encourage readers to
48/// read through [pitfalls](struct@EventStream#pitfalls) before making use of
49/// Rocket's SSE support.
50///
51/// # Comments
52///
53/// A server-sent _comment_, created via [`Event::comment()`], is an event that
54/// appears only in the raw server-sent event data stream and is inaccessible by
55/// most clients. This includes JavaScript's `EventSource`. As such, they serve
56/// little utility beyond debugging a raw data stream and keeping a connection
57/// alive. See [heartbeat](struct@EventStream#heartbeat) for information on
58/// Rocket's `EventStream` keep-alive.
59///
60/// # Fields
61///
62/// A server-sent field can be one of four kinds:
63///
64/// * `retry`
65///
66/// A `retry` event, created via [`Event::retry()`], sets the reconnection
67/// time on the client side. It is the duration the client will wait before
68/// attempting to reconnect when a connection is lost. Most applications
69/// will not need to set a `retry`, opting instead to use the
70/// implementation's default or to reconnect manually on error.
71///
72/// * `id`
73///
74/// Sets the event id to associate all subsequent fields with. This value
75/// cannot be retrieved directly via most clients, including JavaScript
76/// `EventSource`. Instead, it is sent by the implementation on reconnection
77/// via the `Last-Event-ID` header. An `id` can be attached to other fields
78/// via the [`Event::id()`] builder method.
79///
80/// * `event`
81///
82/// Sets the event name to associate the next `data` field with. In
83/// JavaScript's `EventSource`, this is the event to be listened for, which
84/// defaults to `message`. An `event` can be attached to other fields via
85/// the [`Event::event()`] builder method.
86///
87/// * `data`
88///
89/// Sends data to dispatch as an event at the client. In JavaScript's
90/// `EventSource`, this (and only this) results in an event handler for
91/// `event`, specified just prior, being triggered. A data field can be
92/// created via the [`Event::data()`] or [`Event::json()`] constructors.
93///
94/// # Implementation Notes
95///
96/// A constructed `Event` _always_ emits its fields in the following order:
97///
98/// 1. `comment`
99/// 2. `retry`
100/// 3. `id`
101/// 4. `event`
102/// 5. `data`
103///
104/// The `event` and `id` fields _cannot_ contain new lines or carriage returns.
105/// Rocket's default implementation automatically converts new lines and
106/// carriage returns in `event` and `id` fields to spaces.
107///
108/// The `data` and `comment` fields _cannot_ contain carriage returns. Rocket
109/// converts the unencoded sequence `\r\n` and the isolated `\r` into a
110/// protocol-level `\n`, that is, in such a way that they are interpreted as
111/// `\n` at the client. For example, the raw message `foo\r\nbar\rbaz` is
112/// received as `foo\nbar\nbaz` at the client-side. Encoded sequences, such as
113/// those emitted by [`Event::json()`], have no such restrictions.
114#[derive(Clone, Eq, PartialEq, Hash, Debug)]
115pub struct Event {
116 comment: Option<Cow<'static, str>>,
117 retry: Option<Duration>,
118 id: Option<Cow<'static, str>>,
119 event: Option<Cow<'static, str>>,
120 data: Option<Cow<'static, str>>,
121}
122
123impl Event {
124 // We hide this since we never want to construct an `Event` with nothing.
125 fn new() -> Self {
126 Event { comment: None, retry: None, id: None, event: None, data: None, }
127 }
128
129 /// Creates a new `Event` with an empty data field.
130 ///
131 /// This is exactly equivalent to `Event::data("")`.
132 ///
133 /// # Example
134 ///
135 /// ```rust
136 /// use rocket::response::stream::Event;
137 ///
138 /// let event = Event::empty();
139 /// ```
140 pub fn empty() -> Self {
141 Event::data("")
142 }
143
144 /// Creates a new `Event` with a data field of `data` serialized as JSON.
145 ///
146 /// # Example
147 ///
148 /// ```rust
149 /// use rocket::serde::Serialize;
150 /// use rocket::response::stream::Event;
151 ///
152 /// #[derive(Serialize)]
153 /// #[serde(crate = "rocket::serde")]
154 /// struct MyData<'r> {
155 /// string: &'r str,
156 /// number: usize,
157 /// }
158 ///
159 /// let data = MyData { string: "hello!", number: 10 };
160 /// let event = Event::json(&data);
161 /// ```
162 #[cfg(feature = "json")]
163 #[cfg_attr(nightly, doc(cfg(feature = "json")))]
164 pub fn json<T: serde::Serialize>(data: &T) -> Self {
165 let string = serde_json::to_string(data).unwrap_or_default();
166 Self::data(string)
167 }
168
169 /// Creates a new `Event` with a data field containing the raw `data`.
170 ///
171 /// # Raw SSE is Lossy
172 ///
173 /// Unencoded carriage returns cannot be expressed in the protocol. Thus,
174 /// any carriage returns in `data` will not appear at the client-side.
175 /// Instead, the sequence `\r\n` and the isolated `\r` will each appear as
176 /// `\n` at the client-side. For example, the message `foo\r\nbar\rbaz` is
177 /// received as `foo\nbar\nbaz` at the client-side.
178 ///
179 /// See [pitfalls](struct@EventStream#pitfalls) for more details.
180 ///
181 /// # Example
182 ///
183 /// ```rust
184 /// use rocket::response::stream::Event;
185 ///
186 /// // A `data` event with message "Hello, SSE!".
187 /// let event = Event::data("Hello, SSE!");
188 /// ```
189 pub fn data<T: Into<Cow<'static, str>>>(data: T) -> Self {
190 Self { data: Some(data.into()), ..Event::new() }
191 }
192
193 /// Creates a new comment `Event`.
194 ///
195 /// As with [`Event::data()`], unencoded carriage returns cannot be
196 /// expressed in the protocol. Thus, any carriage returns in `data` will
197 /// not appear at the client-side. For comments, this is generally not a
198 /// concern as comments are discarded by client-side libraries.
199 ///
200 /// # Example
201 ///
202 /// ```rust
203 /// use rocket::response::stream::Event;
204 ///
205 /// let event = Event::comment("bet you'll never see me!");
206 /// ```
207 pub fn comment<T: Into<Cow<'static, str>>>(data: T) -> Self {
208 Self { comment: Some(data.into()), ..Event::new() }
209 }
210
211 /// Creates a new retry `Event`.
212 ///
213 /// # Example
214 ///
215 /// ```rust
216 /// use rocket::response::stream::Event;
217 /// use rocket::tokio::time::Duration;
218 ///
219 /// let event = Event::retry(Duration::from_millis(250));
220 /// ```
221 pub fn retry(period: Duration) -> Self {
222 Self { retry: Some(period), ..Event::new() }
223 }
224
225 /// Sets the value of the 'event' (event type) field.
226 ///
227 /// Event names may not contain new lines `\n` or carriage returns `\r`. If
228 /// `event` _does_ contain new lines or carriage returns, they are replaced
229 /// with spaces (` `) before being sent to the client.
230 ///
231 /// # Example
232 ///
233 /// ```rust
234 /// use rocket::response::stream::Event;
235 ///
236 /// // The event name is "start".
237 /// let event = Event::data("hi").event("start");
238 ///
239 /// // The event name is "then end", with `\n` replaced with ` `.
240 /// let event = Event::data("bye").event("then\nend");
241 /// ```
242 pub fn event<T: Into<Cow<'static, str>>>(mut self, event: T) -> Self {
243 self.event = Some(event.into());
244 self
245 }
246
247 /// Sets the value of the 'id' (last event ID) field.
248 ///
249 /// Event IDs may not contain new lines `\n` or carriage returns `\r`. If
250 /// `id` _does_ contain new lines or carriage returns, they are replaced
251 /// with spaces (` `) before being sent to the client.
252 ///
253 /// # Example
254 ///
255 /// ```rust
256 /// use rocket::response::stream::Event;
257 ///
258 /// // The event ID is "start".
259 /// let event = Event::data("hi").id("start");
260 ///
261 /// // The event ID is "then end", with `\n` replaced with ` `.
262 /// let event = Event::data("bye").id("then\nend");
263 /// ```
264 /// Sets the value of the 'id' field. It may not contain newlines.
265 pub fn id<T: Into<Cow<'static, str>>>(mut self, id: T) -> Self {
266 self.id = Some(id.into());
267 self
268 }
269
270 /// Sets or replaces the value of the `data` field.
271 ///
272 /// # Example
273 ///
274 /// ```rust
275 /// use rocket::response::stream::Event;
276 ///
277 /// // The data "hello" will be sent.
278 /// let event = Event::data("hi").with_data("hello");
279 ///
280 /// // The two below are equivalent.
281 /// let event = Event::comment("bye").with_data("goodbye");
282 /// let event = Event::data("goodbye").with_comment("bye");
283 /// ```
284 pub fn with_data<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
285 self.data = Some(data.into());
286 self
287 }
288
289 /// Sets or replaces the value of the `comment` field.
290 ///
291 /// # Example
292 ///
293 /// ```rust
294 /// use rocket::response::stream::Event;
295 ///
296 /// // The comment "🚀" will be sent.
297 /// let event = Event::comment("Rocket is great!").with_comment("🚀");
298 ///
299 /// // The two below are equivalent.
300 /// let event = Event::comment("bye").with_data("goodbye");
301 /// let event = Event::data("goodbye").with_comment("bye");
302 /// ```
303 pub fn with_comment<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
304 self.comment = Some(data.into());
305 self
306 }
307
308 /// Sets or replaces the value of the `retry` field.
309 ///
310 /// # Example
311 ///
312 /// ```rust
313 /// use rocket::response::stream::Event;
314 /// use rocket::tokio::time::Duration;
315 ///
316 /// // The reconnection will be set to 10 seconds.
317 /// let event = Event::retry(Duration::from_millis(500))
318 /// .with_retry(Duration::from_secs(10));
319 ///
320 /// // The two below are equivalent.
321 /// let event = Event::comment("bye").with_retry(Duration::from_millis(500));
322 /// let event = Event::retry(Duration::from_millis(500)).with_comment("bye");
323 /// ```
324 pub fn with_retry(mut self, period: Duration) -> Self {
325 self.retry = Some(period);
326 self
327 }
328
329 fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
330 let events = [
331 self.comment.map(|v| RawLinedEvent::many("", v)),
332 self.retry.map(|r| RawLinedEvent::one("retry", format!("{}", r.as_millis()))),
333 self.id.map(|v| RawLinedEvent::one("id", v)),
334 self.event.map(|v| RawLinedEvent::one("event", v)),
335 self.data.map(|v| RawLinedEvent::many("data", v)),
336 Some(RawLinedEvent::raw("")),
337 ];
338
339 stream::iter(events).filter_map(ready)
340 }
341}
342
343/// A potentially infinite stream of Server-Sent [`Event`]s (SSE).
344///
345/// An `EventStream` can be constructed from any [`Stream`] of items of type
346/// `Event`. The stream can be constructed directly via [`EventStream::from()`]
347/// or through generator syntax via [`EventStream!`].
348///
349/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
350///
351/// # Responder
352///
353/// `EventStream` is a (potentially infinite) responder. The response
354/// `Content-Type` is set to [`EventStream`](const@ContentType::EventStream).
355/// The body is [unsized](crate::response::Body#unsized), and values are sent as
356/// soon as they are yielded by the internal iterator.
357///
358/// ## Heartbeat
359///
360/// A heartbeat comment is injected into the internal stream and sent at a fixed
361/// interval. The comment is discarded by clients and serves only to keep the
362/// connection alive; it does not interfere with application data. The interval
363/// defaults to 30 seconds but can be adjusted with
364/// [`EventStream::heartbeat()`].
365///
366/// # Examples
367///
368/// Use [`EventStream!`] to yield an infinite series of "ping" SSE messages to
369/// the client, one per second:
370///
371/// ```rust
372/// # use rocket::*;
373/// use rocket::response::stream::{Event, EventStream};
374/// use rocket::tokio::time::{self, Duration};
375///
376/// #[get("/events")]
377/// fn stream() -> EventStream![] {
378/// EventStream! {
379/// let mut interval = time::interval(Duration::from_secs(1));
380/// loop {
381/// yield Event::data("ping");
382/// interval.tick().await;
383/// }
384/// }
385/// }
386/// ```
387///
388/// Yield 9 events: 3 triplets of `retry`, `data`, and `comment` events:
389///
390/// ```rust
391/// # use rocket::get;
392/// use rocket::response::stream::{Event, EventStream};
393/// use rocket::tokio::time::Duration;
394///
395/// #[get("/events")]
396/// fn events() -> EventStream![] {
397/// EventStream! {
398/// for i in 0..3 {
399/// yield Event::retry(Duration::from_secs(10));
400/// yield Event::data(format!("{}", i)).id("cat").event("bar");
401/// yield Event::comment("silly boy");
402/// }
403/// }
404/// }
405/// ```
406///
407/// The syntax of `EventStream!` as an expression is identical to that of
408/// [`stream!`](crate::response::stream::stream). For how to gracefully
409/// terminate an otherwise infinite stream, see [graceful
410/// shutdown](crate::response::stream#graceful-shutdown).
411///
412/// # Borrowing
413///
414/// If an `EventStream` contains a borrow, the extended type syntax
415/// `EventStream![Event + '_]` must be used:
416///
417/// ```rust
418/// # use rocket::get;
419/// use rocket::State;
420/// use rocket::response::stream::{Event, EventStream};
421///
422/// #[get("/events")]
423/// fn events(ctxt: &State<bool>) -> EventStream![Event + '_] {
424/// EventStream! {
425/// // By using `ctxt` in the stream, the borrow is moved into it. Thus,
426/// // the stream object contains a borrow, prompting the '_ annotation.
427/// if *ctxt.inner() {
428/// yield Event::data("hi");
429/// }
430/// }
431/// }
432/// ```
433///
434/// See [`stream#borrowing`](crate::response::stream#borrowing) for further
435/// details on borrowing in streams.
436///
437/// # Pitfalls
438///
439/// Server-Sent Events are a rather simple mechanism, though there are some
440/// pitfalls to be aware of.
441///
442/// * **Buffering**
443///
444/// Protocol restrictions complicate implementing an API that does not
445/// buffer. As such, if you are sending _lots_ of data, consider sending the
446/// data via multiple data fields (with events to signal start and end).
447/// Alternatively, send _one_ event which instructs the client to fetch the
448/// data from another endpoint which in-turn streams the data.
449///
450/// * **Raw SSE requires UTF-8 data**
451///
452/// Only UTF-8 data can be sent via SSE. If you need to send arbitrary bytes,
453/// consider encoding it, for instance, as JSON using [`Event::json()`].
454/// Alternatively, as described before, use SSE as a notifier which alerts
455/// the client to fetch the data from elsewhere.
456///
457/// * **Raw SSE is Lossy**
458///
459/// Data sent via SSE cannot contain new lines `\n` or carriage returns `\r`
460/// due to interference with the line protocol.
461///
462/// The protocol allows expressing new lines as multiple messages, however,
463/// and Rocket automatically transforms a message of `foo\nbar` into two
464/// messages, `foo` and `bar`, so that they are reconstructed (automatically)
465/// as `foo\nbar` on the client-side. For messages that only contain new
466/// lines `\n`, the conversion is lossless.
467///
468/// However, the protocol has no mechanism for expressing carriage returns
469/// and thus it is not possible to send unencoded carriage returns via SSE.
470/// Rocket handles carriage returns like it handles new lines: it splits the
471/// data into multiple messages. Thus, a sequence of `\r\n` becomes `\n` at
472/// the client side. A single `\r` that is not part of an `\r\n` sequence
473/// also becomes `\n` at the client side. As a result, the message
474/// `foo\r\nbar\rbaz` is read as `foo\nbar\nbaz` at the client-side.
475///
476/// To send messages losslessly, they must be encoded first, for instance, by
477/// using [`Event::json()`].
478///
479/// * **Clients reconnect ad-infinitum**
480///
481/// The [SSE standard] stipulates: _"Clients will reconnect if the connection
482/// is closed; a client can be told to stop reconnecting using the HTTP 204
483/// No Content response code."_ As a result, clients will typically reconnect
484/// exhaustively until either they choose to disconnect or they receive a
485/// `204 No Content` response.
486///
487/// [SSE standard]: https://html.spec.whatwg.org/multipage/server-sent-events.html
488pub struct EventStream<S> {
489 stream: S,
490 heartbeat: Option<Duration>,
491}
492
493impl<S: Stream<Item = Event>> EventStream<S> {
494 /// Sets a "ping" interval for this `EventStream` to avoid connection
495 /// timeouts when no data is being transferred. The default `interval` is 30
496 /// seconds.
497 ///
498 /// The ping is implemented by sending an empty comment to the client every
499 /// `interval` seconds.
500 ///
501 /// # Example
502 ///
503 /// ```rust
504 /// # use rocket::get;
505 /// use rocket::response::stream::{Event, EventStream};
506 /// use rocket::tokio::time::Duration;
507 ///
508 /// #[get("/events")]
509 /// fn events() -> EventStream![] {
510 /// // Remove the default heartbeat.
511 /// # let event_stream = rocket::futures::stream::pending();
512 /// EventStream::from(event_stream).heartbeat(None);
513 ///
514 /// // Set the heartbeat interval to 15 seconds.
515 /// # let event_stream = rocket::futures::stream::pending();
516 /// EventStream::from(event_stream).heartbeat(Duration::from_secs(15));
517 ///
518 /// // Do the same but for a generated `EventStream`:
519 /// let stream = EventStream! {
520 /// yield Event::data("hello");
521 /// };
522 ///
523 /// stream.heartbeat(Duration::from_secs(15))
524 /// }
525 /// ```
526 pub fn heartbeat<H: Into<Option<Duration>>>(mut self, heartbeat: H) -> Self {
527 self.heartbeat = heartbeat.into();
528 self
529 }
530
531 fn heartbeat_stream(&self) -> Option<impl Stream<Item = RawLinedEvent>> {
532 use tokio::time::interval;
533 use tokio_stream::wrappers::IntervalStream;
534
535 self.heartbeat
536 .map(|beat| IntervalStream::new(interval(beat)))
537 .map(|stream| stream.map(|_| RawLinedEvent::raw(":")))
538 }
539
540 fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
541 use futures::future::Either;
542 use crate::ext::StreamExt;
543
544 let heartbeat_stream = self.heartbeat_stream();
545 let raw_events = self.stream.map(|e| e.into_stream()).flatten();
546 match heartbeat_stream {
547 Some(heartbeat) => Either::Left(raw_events.join(heartbeat)),
548 None => Either::Right(raw_events)
549 }
550 }
551
552 fn into_reader(self) -> impl AsyncRead {
553 ReaderStream::from(self.into_stream())
554 }
555}
556
557impl<S: Stream<Item = Event>> From<S> for EventStream<S> {
558 /// Creates an `EventStream` from a [`Stream`] of [`Event`]s.
559 ///
560 /// Use `EventStream::from()` to construct an `EventStream` from an already
561 /// existing stream. Otherwise, prefer to use [`EventStream!`].
562 ///
563 /// # Example
564 ///
565 /// ```rust
566 /// use rocket::response::stream::{Event, EventStream};
567 /// use rocket::futures::stream;
568 ///
569 /// let raw = stream::iter(vec![Event::data("a"), Event::data("b")]);
570 /// let stream = EventStream::from(raw);
571 /// ```
572 fn from(stream: S) -> Self {
573 EventStream { stream, heartbeat: Some(Duration::from_secs(30)), }
574 }
575}
576
577impl<'r, S: Stream<Item = Event> + Send + 'r> Responder<'r, 'r> for EventStream<S> {
578 fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
579 Response::build()
580 .header(ContentType::EventStream)
581 .raw_header("Cache-Control", "no-cache")
582 .raw_header("Expires", "0")
583 .streamed_body(self.into_reader())
584 .ok()
585 }
586}
587
588crate::export! {
589 /// Type and stream expression macro for [`struct@EventStream`].
590 ///
591 /// See [`stream!`](crate::response::stream::stream) for the syntax
592 /// supported by this macro. In addition to that syntax, this macro can also
593 /// be called with no arguments, `EventStream![]`, as shorthand for
594 /// `EventStream![Event]`.
595 ///
596 /// See [`struct@EventStream`] and the [module level
597 /// docs](crate::response::stream#typed-streams) for usage details.
598 macro_rules! EventStream {
599 () => ($crate::_typed_stream!(EventStream, $crate::response::stream::Event));
600 ($($s:tt)*) => ($crate::_typed_stream!(EventStream, $($s)*));
601 }
602}
603
604#[cfg(test)]
605mod sse_tests {
606 use tokio::io::AsyncReadExt;
607 use tokio::time::{self, Duration};
608 use futures::stream::Stream;
609 use crate::response::stream::{stream, Event, EventStream, ReaderStream};
610
611 impl Event {
612 fn into_string(self) -> String {
613 crate::async_test(async move {
614 let mut string = String::new();
615 let mut reader = ReaderStream::from(self.into_stream());
616 reader.read_to_string(&mut string).await.expect("event -> string");
617 string
618 })
619 }
620 }
621
622 impl<S: Stream<Item = Event>> EventStream<S> {
623 fn into_string(self) -> String {
624 crate::async_test(async move {
625 let mut string = String::new();
626 let reader = self.into_reader();
627 tokio::pin!(reader);
628 reader.read_to_string(&mut string).await.expect("event stream -> string");
629 string
630 })
631 }
632 }
633
634 #[test]
635 fn test_event_data() {
636 let event = Event::data("a\nb");
637 assert_eq!(event.into_string(), "data:a\ndata:b\n\n");
638
639 let event = Event::data("a\n");
640 assert_eq!(event.into_string(), "data:a\ndata:\n\n");
641
642 let event = Event::data("cats make me happy!");
643 assert_eq!(event.into_string(), "data:cats make me happy!\n\n");
644
645 let event = Event::data("in the\njungle\nthe mighty\njungle");
646 assert_eq!(event.into_string(),
647 "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
648
649 let event = Event::data("in the\njungle\r\nthe mighty\rjungle");
650 assert_eq!(event.into_string(),
651 "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
652
653 let event = Event::data("\nb\n");
654 assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
655
656 let event = Event::data("\r\nb\n");
657 assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
658
659 let event = Event::data("\r\nb\r\n");
660 assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
661
662 let event = Event::data("\n\nb\n");
663 assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
664
665 let event = Event::data("\n\rb\n");
666 assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
667
668 let event = Event::data("\n\rb\r");
669 assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
670
671 let event = Event::comment("\n\rb\r");
672 assert_eq!(event.into_string(), ":\n:\n:b\n:\n\n");
673
674 let event = Event::data("\n\n\n");
675 assert_eq!(event.into_string(), "data:\ndata:\ndata:\ndata:\n\n");
676
677 let event = Event::data("\n");
678 assert_eq!(event.into_string(), "data:\ndata:\n\n");
679
680 let event = Event::data("");
681 assert_eq!(event.into_string(), "data:\n\n");
682 }
683
684 #[test]
685 fn test_event_fields() {
686 let event = Event::data("foo").id("moo");
687 assert_eq!(event.into_string(), "id:moo\ndata:foo\n\n");
688
689 let event = Event::data("foo").id("moo").with_retry(Duration::from_secs(45));
690 assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\n\n");
691
692 let event = Event::data("foo\nbar").id("moo").with_retry(Duration::from_secs(45));
693 assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\ndata:bar\n\n");
694
695 let event = Event::retry(Duration::from_secs(45));
696 assert_eq!(event.into_string(), "retry:45000\n\n");
697
698 let event = Event::comment("incoming data...");
699 assert_eq!(event.into_string(), ":incoming data...\n\n");
700
701 let event = Event::data("foo").id("moo").with_comment("cows, ey?");
702 assert_eq!(event.into_string(), ":cows, ey?\nid:moo\ndata:foo\n\n");
703
704 let event = Event::data("foo\nbar")
705 .id("moo")
706 .event("milk")
707 .with_retry(Duration::from_secs(3));
708
709 assert_eq!(event.into_string(), "retry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\n\n");
710
711 let event = Event::data("foo")
712 .id("moo")
713 .event("milk")
714 .with_comment("??")
715 .with_retry(Duration::from_secs(3));
716
717 assert_eq!(event.into_string(), ":??\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
718
719 let event = Event::data("foo")
720 .id("moo")
721 .event("milk")
722 .with_comment("?\n?")
723 .with_retry(Duration::from_secs(3));
724
725 assert_eq!(event.into_string(), ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
726
727 let event = Event::data("foo\r\nbar\nbaz")
728 .id("moo")
729 .event("milk")
730 .with_comment("?\n?")
731 .with_retry(Duration::from_secs(3));
732
733 assert_eq!(event.into_string(),
734 ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\ndata:baz\n\n");
735 }
736
737 #[test]
738 fn test_bad_chars() {
739 let event = Event::data("foo").id("dead\nbeef").event("m\noo");
740 assert_eq!(event.into_string(), "id:dead beef\nevent:m oo\ndata:foo\n\n");
741
742 let event = Event::data("f\no").id("d\r\nbe\rf").event("m\n\r");
743 assert_eq!(event.into_string(), "id:d be f\nevent:m \ndata:f\ndata:o\n\n");
744
745 let event = Event::data("f\no").id("\r\n\n\r\n\r\r").event("\n\rb");
746 assert_eq!(event.into_string(), "id: \nevent: b\ndata:f\ndata:o\n\n");
747 }
748
749 #[test]
750 fn test_event_stream() {
751 use futures::stream::iter;
752
753 let stream = EventStream::from(iter(vec![Event::data("foo")]));
754 assert_eq!(stream.into_string().replace(":\n\n", ""), "data:foo\n\n");
755
756 let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
757 assert_eq!(stream.into_string().replace(":\n\n", ""), "data:a\n\ndata:b\n\n");
758
759 let stream = EventStream::from(iter(vec![
760 Event::data("a\nb"),
761 Event::data("b"),
762 Event::data("c\n\nd"),
763 Event::data("e"),
764 ]));
765
766 assert_eq!(stream.into_string().replace(":\n\n", ""),
767 "data:a\ndata:b\n\ndata:b\n\ndata:c\ndata:\ndata:d\n\ndata:e\n\n");
768 }
769
770 #[test]
771 fn test_heartbeat() {
772 use futures::future::ready;
773 use futures::stream::{once, iter, StreamExt};
774
775 const HEARTBEAT: &str = ":\n";
776
777 // Set a heartbeat interval of 250ms. Send nothing for 600ms. We should
778 // get 2 or 3 heartbeats, the latter if one is sent eagerly. Maybe 4.
779 let raw = stream!(time::sleep(Duration::from_millis(600)).await;)
780 .map(|_| unreachable!());
781
782 let string = EventStream::from(raw)
783 .heartbeat(Duration::from_millis(250))
784 .into_string();
785
786 let heartbeats = string.matches(HEARTBEAT).count();
787 assert!(heartbeats >= 2 && heartbeats <= 4, "got {} beat(s)", heartbeats);
788
789 let stream = EventStream! {
790 time::sleep(Duration::from_millis(250)).await;
791 yield Event::data("foo");
792 time::sleep(Duration::from_millis(250)).await;
793 yield Event::data("bar");
794 };
795
796 // We expect: foo\n\n [heartbeat] bar\n\n [maybe heartbeat].
797 let string = stream.heartbeat(Duration::from_millis(350)).into_string();
798 let heartbeats = string.matches(HEARTBEAT).count();
799 assert!(heartbeats >= 1 && heartbeats <= 3, "got {} beat(s)", heartbeats);
800 assert!(string.contains("data:foo\n\n"), "string = {:?}", string);
801 assert!(string.contains("data:bar\n\n"), "string = {:?}", string);
802
803 // We shouldn't send a heartbeat if a message is immediately available.
804 let stream = EventStream::from(once(ready(Event::data("hello"))));
805 let string = stream.heartbeat(Duration::from_secs(1)).into_string();
806 assert_eq!(string, "data:hello\n\n", "string = {:?}", string);
807
808 // It's okay if we do it with two, though.
809 let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
810 let string = stream.heartbeat(Duration::from_secs(1)).into_string();
811 let heartbeats = string.matches(HEARTBEAT).count();
812 assert!(heartbeats <= 1);
813 assert!(string.contains("data:a\n\n"), "string = {:?}", string);
814 assert!(string.contains("data:b\n\n"), "string = {:?}", string);
815 }
816}