rocket/response/stream/sse.rs
1use std::borrow::Cow;
2
3use tokio::io::AsyncRead;
4use tokio::time::{interval, Duration};
5use futures::{stream::{self, Stream}, future::Either};
6use tokio_stream::{StreamExt, wrappers::IntervalStream};
7
8use crate::request::Request;
9use crate::response::{self, Response, Responder, stream::{ReaderStream, RawLinedEvent}};
10use crate::http::ContentType;
11
12/// A Server-Sent `Event` (SSE) in a Server-Sent [`struct@EventStream`].
13///
14/// A server-sent event is either a _field_ or a _comment_. Comments can be
15/// constructed via [`Event::comment()`] while fields can be constructed via
16/// [`Event::data()`], [`Event::json()`], and [`Event::retry()`].
17///
18/// ```rust
19/// use rocket::tokio::time::Duration;
20/// use rocket::response::stream::Event;
21///
22/// // A `data` event with message "Hello, SSE!".
23/// let event = Event::data("Hello, SSE!");
24///
25/// // The same event but with event name of `hello`.
26/// let event = Event::data("Hello, SSE!").event("hello");
27///
28/// // A `retry` event to set the client-side reconnection time.
29/// let event = Event::retry(Duration::from_secs(5));
30///
31/// // An event with an attached comment, event name, and ID.
32/// let event = Event::data("Hello, SSE!")
33/// .with_comment("just a hello message")
34/// .event("hello")
35/// .id("1");
36/// ```
37///
38/// We largely defer to [MDN's using server-sent events] documentation for
39/// client-side details but reproduce, in our words, relevant server-side
40/// documentation here.
41///
42/// [MDN's using server-sent events]:
43/// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
44///
45/// # Pitfalls
46///
47/// Server-Sent Events suffer from certain pitfalls. We encourage readers to
48/// read through [pitfalls](struct@EventStream#pitfalls) before making use of
49/// Rocket's SSE support.
50///
51/// # Comments
52///
53/// A server-sent _comment_, created via [`Event::comment()`], is an event that
54/// appears only in the raw server-sent event data stream and is inaccessible by
55/// most clients. This includes JavaScript's `EventSource`. As such, they serve
56/// little utility beyond debugging a raw data stream and keeping a connection
57/// alive. See [heartbeat](struct@EventStream#heartbeat) for information on
58/// Rocket's `EventStream` keep-alive.
59///
60/// # Fields
61///
62/// A server-sent field can be one of four kinds:
63///
64/// * `retry`
65///
66/// A `retry` event, created via [`Event::retry()`], sets the reconnection
67/// time on the client side. It is the duration the client will wait before
68/// attempting to reconnect when a connection is lost. Most applications
69/// will not need to set a `retry`, opting instead to use the
70/// implementation's default or to reconnect manually on error.
71///
72/// * `id`
73///
74/// Sets the event id to associate all subsequent fields with. This value
75/// cannot be retrieved directly via most clients, including JavaScript
76/// `EventSource`. Instead, it is sent by the implementation on reconnection
77/// via the `Last-Event-ID` header. An `id` can be attached to other fields
78/// via the [`Event::id()`] builder method.
79///
80/// * `event`
81///
82/// Sets the event name to associate the next `data` field with. In
83/// JavaScript's `EventSource`, this is the event to be listened for, which
84/// defaults to `message`. An `event` can be attached to other fields via
85/// the [`Event::event()`] builder method.
86///
87/// * `data`
88///
89/// Sends data to dispatch as an event at the client. In JavaScript's
90/// `EventSource`, this (and only this) results in an event handler for
91/// `event`, specified just prior, being triggered. A data field can be
92/// created via the [`Event::data()`] or [`Event::json()`] constructors.
93///
94/// # Implementation Notes
95///
96/// A constructed `Event` _always_ emits its fields in the following order:
97///
98/// 1. `comment`
99/// 2. `retry`
100/// 3. `id`
101/// 4. `event`
102/// 5. `data`
103///
104/// The `event` and `id` fields _cannot_ contain new lines or carriage returns.
105/// Rocket's default implementation automatically converts new lines and
106/// carriage returns in `event` and `id` fields to spaces.
107///
108/// The `data` and `comment` fields _cannot_ contain carriage returns. Rocket
109/// converts the unencoded sequence `\r\n` and the isolated `\r` into a
110/// protocol-level `\n`, that is, in such a way that they are interpreted as
111/// `\n` at the client. For example, the raw message `foo\r\nbar\rbaz` is
112/// received as `foo\nbar\nbaz` at the client-side. Encoded sequences, such as
113/// those emitted by [`Event::json()`], have no such restrictions.
114#[derive(Clone, Eq, PartialEq, Hash, Debug)]
115pub struct Event {
116 comment: Option<Cow<'static, str>>,
117 retry: Option<Duration>,
118 id: Option<Cow<'static, str>>,
119 event: Option<Cow<'static, str>>,
120 data: Option<Cow<'static, str>>,
121}
122
123impl Event {
124 // We hide this since we never want to construct an `Event` with nothing.
125 fn new() -> Self {
126 Event { comment: None, retry: None, id: None, event: None, data: None, }
127 }
128
129 /// Creates a new `Event` with an empty data field.
130 ///
131 /// This is exactly equivalent to `Event::data("")`.
132 ///
133 /// # Example
134 ///
135 /// ```rust
136 /// use rocket::response::stream::Event;
137 ///
138 /// let event = Event::empty();
139 /// ```
140 pub fn empty() -> Self {
141 Event::data("")
142 }
143
144 /// Creates a new `Event` with a data field of `data` serialized as JSON.
145 ///
146 /// # Example
147 ///
148 /// ```rust
149 /// use rocket::serde::Serialize;
150 /// use rocket::response::stream::Event;
151 ///
152 /// #[derive(Serialize)]
153 /// #[serde(crate = "rocket::serde")]
154 /// struct MyData<'r> {
155 /// string: &'r str,
156 /// number: usize,
157 /// }
158 ///
159 /// let data = MyData { string: "hello!", number: 10 };
160 /// let event = Event::json(&data);
161 /// ```
162 #[cfg(feature = "json")]
163 #[cfg_attr(nightly, doc(cfg(feature = "json")))]
164 pub fn json<T: serde::Serialize>(data: &T) -> Self {
165 let string = serde_json::to_string(data).unwrap_or_default();
166 Self::data(string)
167 }
168
169 /// Creates a new `Event` with a data field containing the raw `data`.
170 ///
171 /// # Raw SSE is Lossy
172 ///
173 /// Unencoded carriage returns cannot be expressed in the protocol. Thus,
174 /// any carriage returns in `data` will not appear at the client-side.
175 /// Instead, the sequence `\r\n` and the isolated `\r` will each appear as
176 /// `\n` at the client-side. For example, the message `foo\r\nbar\rbaz` is
177 /// received as `foo\nbar\nbaz` at the client-side.
178 ///
179 /// See [pitfalls](struct@EventStream#pitfalls) for more details.
180 ///
181 /// # Example
182 ///
183 /// ```rust
184 /// use rocket::response::stream::Event;
185 ///
186 /// // A `data` event with message "Hello, SSE!".
187 /// let event = Event::data("Hello, SSE!");
188 /// ```
189 pub fn data<T: Into<Cow<'static, str>>>(data: T) -> Self {
190 Self { data: Some(data.into()), ..Event::new() }
191 }
192
193 /// Creates a new comment `Event`.
194 ///
195 /// As with [`Event::data()`], unencoded carriage returns cannot be
196 /// expressed in the protocol. Thus, any carriage returns in `data` will
197 /// not appear at the client-side. For comments, this is generally not a
198 /// concern as comments are discarded by client-side libraries.
199 ///
200 /// # Example
201 ///
202 /// ```rust
203 /// use rocket::response::stream::Event;
204 ///
205 /// let event = Event::comment("bet you'll never see me!");
206 /// ```
207 pub fn comment<T: Into<Cow<'static, str>>>(data: T) -> Self {
208 Self { comment: Some(data.into()), ..Event::new() }
209 }
210
211 /// Creates a new retry `Event`.
212 ///
213 /// # Example
214 ///
215 /// ```rust
216 /// use rocket::response::stream::Event;
217 /// use rocket::tokio::time::Duration;
218 ///
219 /// let event = Event::retry(Duration::from_millis(250));
220 /// ```
221 pub fn retry(period: Duration) -> Self {
222 Self { retry: Some(period), ..Event::new() }
223 }
224
225 /// Sets the value of the 'event' (event type) field.
226 ///
227 /// Event names may not contain new lines `\n` or carriage returns `\r`. If
228 /// `event` _does_ contain new lines or carriage returns, they are replaced
229 /// with spaces (` `) before being sent to the client.
230 ///
231 /// # Example
232 ///
233 /// ```rust
234 /// use rocket::response::stream::Event;
235 ///
236 /// // The event name is "start".
237 /// let event = Event::data("hi").event("start");
238 ///
239 /// // The event name is "then end", with `\n` replaced with ` `.
240 /// let event = Event::data("bye").event("then\nend");
241 /// ```
242 pub fn event<T: Into<Cow<'static, str>>>(mut self, event: T) -> Self {
243 self.event = Some(event.into());
244 self
245 }
246
247 /// Sets the value of the 'id' (last event ID) field.
248 ///
249 /// Event IDs may not contain new lines `\n` or carriage returns `\r`. If
250 /// `id` _does_ contain new lines or carriage returns, they are replaced
251 /// with spaces (` `) before being sent to the client.
252 ///
253 /// # Example
254 ///
255 /// ```rust
256 /// use rocket::response::stream::Event;
257 ///
258 /// // The event ID is "start".
259 /// let event = Event::data("hi").id("start");
260 ///
261 /// // The event ID is "then end", with `\n` replaced with ` `.
262 /// let event = Event::data("bye").id("then\nend");
263 /// ```
264 /// Sets the value of the 'id' field. It may not contain newlines.
265 pub fn id<T: Into<Cow<'static, str>>>(mut self, id: T) -> Self {
266 self.id = Some(id.into());
267 self
268 }
269
270 /// Sets or replaces the value of the `data` field.
271 ///
272 /// # Example
273 ///
274 /// ```rust
275 /// use rocket::response::stream::Event;
276 ///
277 /// // The data "hello" will be sent.
278 /// let event = Event::data("hi").with_data("hello");
279 ///
280 /// // The two below are equivalent.
281 /// let event = Event::comment("bye").with_data("goodbye");
282 /// let event = Event::data("goodbye").with_comment("bye");
283 /// ```
284 pub fn with_data<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
285 self.data = Some(data.into());
286 self
287 }
288
289 /// Sets or replaces the value of the `comment` field.
290 ///
291 /// # Example
292 ///
293 /// ```rust
294 /// use rocket::response::stream::Event;
295 ///
296 /// // The comment "🚀" will be sent.
297 /// let event = Event::comment("Rocket is great!").with_comment("🚀");
298 ///
299 /// // The two below are equivalent.
300 /// let event = Event::comment("bye").with_data("goodbye");
301 /// let event = Event::data("goodbye").with_comment("bye");
302 /// ```
303 pub fn with_comment<T: Into<Cow<'static, str>>>(mut self, data: T) -> Self {
304 self.comment = Some(data.into());
305 self
306 }
307
308 /// Sets or replaces the value of the `retry` field.
309 ///
310 /// # Example
311 ///
312 /// ```rust
313 /// use rocket::response::stream::Event;
314 /// use rocket::tokio::time::Duration;
315 ///
316 /// // The reconnection will be set to 10 seconds.
317 /// let event = Event::retry(Duration::from_millis(500))
318 /// .with_retry(Duration::from_secs(10));
319 ///
320 /// // The two below are equivalent.
321 /// let event = Event::comment("bye").with_retry(Duration::from_millis(500));
322 /// let event = Event::retry(Duration::from_millis(500)).with_comment("bye");
323 /// ```
324 pub fn with_retry(mut self, period: Duration) -> Self {
325 self.retry = Some(period);
326 self
327 }
328
329 fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
330 let events = [
331 self.comment.map(|v| RawLinedEvent::many("", v)),
332 self.retry.map(|r| RawLinedEvent::one("retry", format!("{}", r.as_millis()))),
333 self.id.map(|v| RawLinedEvent::one("id", v)),
334 self.event.map(|v| RawLinedEvent::one("event", v)),
335 self.data.map(|v| RawLinedEvent::many("data", v)),
336 Some(RawLinedEvent::raw("")),
337 ];
338
339 stream::iter(events).filter_map(|x| x)
340 }
341}
342
343/// A potentially infinite stream of Server-Sent [`Event`]s (SSE).
344///
345/// An `EventStream` can be constructed from any [`Stream`] of items of type
346/// `Event`. The stream can be constructed directly via [`EventStream::from()`]
347/// or through generator syntax via [`EventStream!`].
348///
349/// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
350///
351/// # Responder
352///
353/// `EventStream` is a (potentially infinite) responder. The response
354/// `Content-Type` is set to [`EventStream`](const@ContentType::EventStream).
355/// The body is [unsized](crate::response::Body#unsized), and values are sent as
356/// soon as they are yielded by the internal iterator.
357///
358/// ## Heartbeat
359///
360/// A heartbeat comment is injected into the internal stream and sent at a fixed
361/// interval. The comment is discarded by clients and serves only to keep the
362/// connection alive; it does not interfere with application data. The interval
363/// defaults to 30 seconds but can be adjusted with
364/// [`EventStream::heartbeat()`].
365///
366/// # Examples
367///
368/// Use [`EventStream!`] to yield an infinite series of "ping" SSE messages to
369/// the client, one per second:
370///
371/// ```rust
372/// # use rocket::*;
373/// use rocket::response::stream::{Event, EventStream};
374/// use rocket::tokio::time::{self, Duration};
375///
376/// #[get("/events")]
377/// fn stream() -> EventStream![] {
378/// EventStream! {
379/// let mut interval = time::interval(Duration::from_secs(1));
380/// loop {
381/// yield Event::data("ping");
382/// interval.tick().await;
383/// }
384/// }
385/// }
386/// ```
387///
388/// Yield 9 events: 3 triplets of `retry`, `data`, and `comment` events:
389///
390/// ```rust
391/// # use rocket::get;
392/// use rocket::response::stream::{Event, EventStream};
393/// use rocket::tokio::time::Duration;
394///
395/// #[get("/events")]
396/// fn events() -> EventStream![] {
397/// EventStream! {
398/// for i in 0..3 {
399/// yield Event::retry(Duration::from_secs(10));
400/// yield Event::data(format!("{}", i)).id("cat").event("bar");
401/// yield Event::comment("silly boy");
402/// }
403/// }
404/// }
405/// ```
406///
407/// The syntax of `EventStream!` as an expression is identical to that of
408/// [`stream!`](crate::response::stream::stream). For how to gracefully
409/// terminate an otherwise infinite stream, see [graceful
410/// shutdown](crate::response::stream#graceful-shutdown).
411///
412/// # Borrowing
413///
414/// If an `EventStream` contains a borrow, the extended type syntax
415/// `EventStream![Event + '_]` must be used:
416///
417/// ```rust
418/// # use rocket::get;
419/// use rocket::State;
420/// use rocket::response::stream::{Event, EventStream};
421///
422/// #[get("/events")]
423/// fn events(ctxt: &State<bool>) -> EventStream![Event + '_] {
424/// EventStream! {
425/// // By using `ctxt` in the stream, the borrow is moved into it. Thus,
426/// // the stream object contains a borrow, prompting the '_ annotation.
427/// if *ctxt.inner() {
428/// yield Event::data("hi");
429/// }
430/// }
431/// }
432/// ```
433///
434/// See [`stream#borrowing`](crate::response::stream#borrowing) for further
435/// details on borrowing in streams.
436///
437/// # Pitfalls
438///
439/// Server-Sent Events are a rather simple mechanism, though there are some
440/// pitfalls to be aware of.
441///
442/// * **Buffering**
443///
444/// Protocol restrictions complicate implementing an API that does not
445/// buffer. As such, if you are sending _lots_ of data, consider sending the
446/// data via multiple data fields (with events to signal start and end).
447/// Alternatively, send _one_ event which instructs the client to fetch the
448/// data from another endpoint which in-turn streams the data.
449///
450/// * **Raw SSE requires UTF-8 data**
451///
452/// Only UTF-8 data can be sent via SSE. If you need to send arbitrary bytes,
453/// consider encoding it, for instance, as JSON using [`Event::json()`].
454/// Alternatively, as described before, use SSE as a notifier which alerts
455/// the client to fetch the data from elsewhere.
456///
457/// * **Raw SSE is Lossy**
458///
459/// Data sent via SSE cannot contain new lines `\n` or carriage returns `\r`
460/// due to interference with the line protocol.
461///
462/// The protocol allows expressing new lines as multiple messages, however,
463/// and Rocket automatically transforms a message of `foo\nbar` into two
464/// messages, `foo` and `bar`, so that they are reconstructed (automatically)
465/// as `foo\nbar` on the client-side. For messages that only contain new
466/// lines `\n`, the conversion is lossless.
467///
468/// However, the protocol has no mechanism for expressing carriage returns
469/// and thus it is not possible to send unencoded carriage returns via SSE.
470/// Rocket handles carriage returns like it handles new lines: it splits the
471/// data into multiple messages. Thus, a sequence of `\r\n` becomes `\n` at
472/// the client side. A single `\r` that is not part of an `\r\n` sequence
473/// also becomes `\n` at the client side. As a result, the message
474/// `foo\r\nbar\rbaz` is read as `foo\nbar\nbaz` at the client-side.
475///
476/// To send messages losslessly, they must be encoded first, for instance, by
477/// using [`Event::json()`].
478///
479/// * **Clients reconnect ad-infinitum**
480///
481/// The [SSE standard] stipulates: _"Clients will reconnect if the connection
482/// is closed; a client can be told to stop reconnecting using the HTTP 204
483/// No Content response code."_ As a result, clients will typically reconnect
484/// exhaustively until either they choose to disconnect or they receive a
485/// `204 No Content` response.
486///
487/// [SSE standard]: https://html.spec.whatwg.org/multipage/server-sent-events.html
488pub struct EventStream<S> {
489 stream: S,
490 heartbeat: Option<Duration>,
491}
492
493impl<S: Stream<Item = Event>> EventStream<S> {
494 /// Sets a "ping" interval for this `EventStream` to avoid connection
495 /// timeouts when no data is being transferred. The default `interval` is 30
496 /// seconds.
497 ///
498 /// The ping is implemented by sending an empty comment to the client every
499 /// `interval` seconds.
500 ///
501 /// # Example
502 ///
503 /// ```rust
504 /// # use rocket::get;
505 /// use rocket::response::stream::{Event, EventStream};
506 /// use rocket::tokio::time::Duration;
507 ///
508 /// #[get("/events")]
509 /// fn events() -> EventStream![] {
510 /// // Remove the default heartbeat.
511 /// # let event_stream = rocket::futures::stream::pending();
512 /// EventStream::from(event_stream).heartbeat(None);
513 ///
514 /// // Set the heartbeat interval to 15 seconds.
515 /// # let event_stream = rocket::futures::stream::pending();
516 /// EventStream::from(event_stream).heartbeat(Duration::from_secs(15));
517 ///
518 /// // Do the same but for a generated `EventStream`:
519 /// let stream = EventStream! {
520 /// yield Event::data("hello");
521 /// };
522 ///
523 /// stream.heartbeat(Duration::from_secs(15))
524 /// }
525 /// ```
526 pub fn heartbeat<H: Into<Option<Duration>>>(mut self, heartbeat: H) -> Self {
527 self.heartbeat = heartbeat.into();
528 self
529 }
530
531 fn heartbeat_stream(&self) -> impl Stream<Item = RawLinedEvent> {
532 self.heartbeat
533 .map(|beat| IntervalStream::new(interval(beat)))
534 .map(|stream| stream.map(|_| RawLinedEvent::raw(":")))
535 .map_or_else(|| Either::Right(stream::empty()), Either::Left)
536 }
537
538 fn into_stream(self) -> impl Stream<Item = RawLinedEvent> {
539 use futures::StreamExt;
540
541 let heartbeats = self.heartbeat_stream();
542 let events = StreamExt::map(self.stream, |e| e.into_stream()).flatten();
543 crate::util::join(events, heartbeats)
544 }
545
546 fn into_reader(self) -> impl AsyncRead {
547 ReaderStream::from(self.into_stream())
548 }
549}
550
551impl<S: Stream<Item = Event>> From<S> for EventStream<S> {
552 /// Creates an `EventStream` from a [`Stream`] of [`Event`]s.
553 ///
554 /// Use `EventStream::from()` to construct an `EventStream` from an already
555 /// existing stream. Otherwise, prefer to use [`EventStream!`].
556 ///
557 /// # Example
558 ///
559 /// ```rust
560 /// use rocket::response::stream::{Event, EventStream};
561 /// use rocket::futures::stream;
562 ///
563 /// let raw = stream::iter(vec![Event::data("a"), Event::data("b")]);
564 /// let stream = EventStream::from(raw);
565 /// ```
566 fn from(stream: S) -> Self {
567 EventStream { stream, heartbeat: Some(Duration::from_secs(30)), }
568 }
569}
570
571impl<'r, S: Stream<Item = Event> + Send + 'r> Responder<'r, 'r> for EventStream<S> {
572 fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
573 Response::build()
574 .header(ContentType::EventStream)
575 .raw_header("Cache-Control", "no-cache")
576 .raw_header("Expires", "0")
577 .streamed_body(self.into_reader())
578 .ok()
579 }
580}
581
582crate::export! {
583 /// Type and stream expression macro for [`struct@EventStream`].
584 ///
585 /// See [`stream!`](crate::response::stream::stream) for the syntax
586 /// supported by this macro. In addition to that syntax, this macro can also
587 /// be called with no arguments, `EventStream![]`, as shorthand for
588 /// `EventStream![Event]`.
589 ///
590 /// See [`struct@EventStream`] and the [module level
591 /// docs](crate::response::stream#typed-streams) for usage details.
592 macro_rules! EventStream {
593 () => ($crate::_typed_stream!(EventStream, $crate::response::stream::Event));
594 ($($s:tt)*) => ($crate::_typed_stream!(EventStream, $($s)*));
595 }
596}
597
598#[cfg(test)]
599mod sse_tests {
600 use tokio::io::AsyncReadExt;
601 use tokio::time::{self, Duration};
602 use futures::stream::Stream;
603 use crate::response::stream::{stream, Event, EventStream, ReaderStream};
604
605 impl Event {
606 fn into_string(self) -> String {
607 crate::async_test(async move {
608 let mut string = String::new();
609 let mut reader = ReaderStream::from(self.into_stream());
610 reader.read_to_string(&mut string).await.expect("event -> string");
611 string
612 })
613 }
614 }
615
616 impl<S: Stream<Item = Event>> EventStream<S> {
617 fn into_string(self) -> String {
618 use std::pin::pin;
619
620 crate::async_test(async move {
621 let mut string = String::new();
622 let mut reader = pin!(self.into_reader());
623 reader.read_to_string(&mut string).await.expect("event stream -> string");
624 string
625 })
626 }
627 }
628
629 #[test]
630 fn test_event_data() {
631 let event = Event::data("a\nb");
632 assert_eq!(event.into_string(), "data:a\ndata:b\n\n");
633
634 let event = Event::data("a\n");
635 assert_eq!(event.into_string(), "data:a\ndata:\n\n");
636
637 let event = Event::data("cats make me happy!");
638 assert_eq!(event.into_string(), "data:cats make me happy!\n\n");
639
640 let event = Event::data("in the\njungle\nthe mighty\njungle");
641 assert_eq!(event.into_string(),
642 "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
643
644 let event = Event::data("in the\njungle\r\nthe mighty\rjungle");
645 assert_eq!(event.into_string(),
646 "data:in the\ndata:jungle\ndata:the mighty\ndata:jungle\n\n");
647
648 let event = Event::data("\nb\n");
649 assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
650
651 let event = Event::data("\r\nb\n");
652 assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
653
654 let event = Event::data("\r\nb\r\n");
655 assert_eq!(event.into_string(), "data:\ndata:b\ndata:\n\n");
656
657 let event = Event::data("\n\nb\n");
658 assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
659
660 let event = Event::data("\n\rb\n");
661 assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
662
663 let event = Event::data("\n\rb\r");
664 assert_eq!(event.into_string(), "data:\ndata:\ndata:b\ndata:\n\n");
665
666 let event = Event::comment("\n\rb\r");
667 assert_eq!(event.into_string(), ":\n:\n:b\n:\n\n");
668
669 let event = Event::data("\n\n\n");
670 assert_eq!(event.into_string(), "data:\ndata:\ndata:\ndata:\n\n");
671
672 let event = Event::data("\n");
673 assert_eq!(event.into_string(), "data:\ndata:\n\n");
674
675 let event = Event::data("");
676 assert_eq!(event.into_string(), "data:\n\n");
677 }
678
679 #[test]
680 fn test_event_fields() {
681 let event = Event::data("foo").id("moo");
682 assert_eq!(event.into_string(), "id:moo\ndata:foo\n\n");
683
684 let event = Event::data("foo").id("moo").with_retry(Duration::from_secs(45));
685 assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\n\n");
686
687 let event = Event::data("foo\nbar").id("moo").with_retry(Duration::from_secs(45));
688 assert_eq!(event.into_string(), "retry:45000\nid:moo\ndata:foo\ndata:bar\n\n");
689
690 let event = Event::retry(Duration::from_secs(45));
691 assert_eq!(event.into_string(), "retry:45000\n\n");
692
693 let event = Event::comment("incoming data...");
694 assert_eq!(event.into_string(), ":incoming data...\n\n");
695
696 let event = Event::data("foo").id("moo").with_comment("cows, ey?");
697 assert_eq!(event.into_string(), ":cows, ey?\nid:moo\ndata:foo\n\n");
698
699 let event = Event::data("foo\nbar")
700 .id("moo")
701 .event("milk")
702 .with_retry(Duration::from_secs(3));
703
704 assert_eq!(event.into_string(), "retry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\n\n");
705
706 let event = Event::data("foo")
707 .id("moo")
708 .event("milk")
709 .with_comment("??")
710 .with_retry(Duration::from_secs(3));
711
712 assert_eq!(event.into_string(), ":??\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
713
714 let event = Event::data("foo")
715 .id("moo")
716 .event("milk")
717 .with_comment("?\n?")
718 .with_retry(Duration::from_secs(3));
719
720 assert_eq!(event.into_string(), ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\n\n");
721
722 let event = Event::data("foo\r\nbar\nbaz")
723 .id("moo")
724 .event("milk")
725 .with_comment("?\n?")
726 .with_retry(Duration::from_secs(3));
727
728 assert_eq!(event.into_string(),
729 ":?\n:?\nretry:3000\nid:moo\nevent:milk\ndata:foo\ndata:bar\ndata:baz\n\n");
730 }
731
732 #[test]
733 fn test_bad_chars() {
734 let event = Event::data("foo").id("dead\nbeef").event("m\noo");
735 assert_eq!(event.into_string(), "id:dead beef\nevent:m oo\ndata:foo\n\n");
736
737 let event = Event::data("f\no").id("d\r\nbe\rf").event("m\n\r");
738 assert_eq!(event.into_string(), "id:d be f\nevent:m \ndata:f\ndata:o\n\n");
739
740 let event = Event::data("f\no").id("\r\n\n\r\n\r\r").event("\n\rb");
741 assert_eq!(event.into_string(), "id: \nevent: b\ndata:f\ndata:o\n\n");
742 }
743
744 #[test]
745 fn test_event_stream() {
746 use futures::stream::iter;
747
748 let stream = EventStream::from(iter(vec![Event::data("foo")]));
749 assert_eq!(stream.into_string().replace(":\n\n", ""), "data:foo\n\n");
750
751 let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
752 assert_eq!(stream.into_string().replace(":\n\n", ""), "data:a\n\ndata:b\n\n");
753
754 let stream = EventStream::from(iter(vec![
755 Event::data("a\nb"),
756 Event::data("b"),
757 Event::data("c\n\nd"),
758 Event::data("e"),
759 ]));
760
761 assert_eq!(stream.into_string().replace(":\n\n", ""),
762 "data:a\ndata:b\n\ndata:b\n\ndata:c\ndata:\ndata:d\n\ndata:e\n\n");
763 }
764
765 #[test]
766 fn test_heartbeat() {
767 use futures::future::ready;
768 use futures::stream::{once, iter, StreamExt};
769
770 const HEARTBEAT: &str = ":\n";
771
772 // Set a heartbeat interval of 250ms. Send nothing for 600ms. We should
773 // get 2 or 3 heartbeats, the latter if one is sent eagerly. Maybe 4.
774 let raw = stream!(time::sleep(Duration::from_millis(600)).await;)
775 .map(|_| unreachable!());
776
777 let string = EventStream::from(raw)
778 .heartbeat(Duration::from_millis(250))
779 .into_string();
780
781 let heartbeats = string.matches(HEARTBEAT).count();
782 assert!(heartbeats >= 2 && heartbeats <= 4, "got {} beat(s)", heartbeats);
783
784 let stream = EventStream! {
785 time::sleep(Duration::from_millis(250)).await;
786 yield Event::data("foo");
787 time::sleep(Duration::from_millis(250)).await;
788 yield Event::data("bar");
789 };
790
791 // We expect: foo\n\n [heartbeat] bar\n\n [maybe heartbeat].
792 let string = stream.heartbeat(Duration::from_millis(350)).into_string();
793 let heartbeats = string.matches(HEARTBEAT).count();
794 assert!(heartbeats >= 1 && heartbeats <= 3, "got {} beat(s)", heartbeats);
795 assert!(string.contains("data:foo\n\n"), "string = {:?}", string);
796 assert!(string.contains("data:bar\n\n"), "string = {:?}", string);
797
798 // We shouldn't send a heartbeat if a message is immediately available.
799 let stream = EventStream::from(once(ready(Event::data("hello"))));
800 let string = stream.heartbeat(Duration::from_secs(1)).into_string();
801 assert_eq!(string, "data:hello\n\n", "string = {:?}", string);
802
803 // It's okay if we do it with two, though.
804 let stream = EventStream::from(iter(vec![Event::data("a"), Event::data("b")]));
805 let string = stream.heartbeat(Duration::from_secs(1)).into_string();
806 let heartbeats = string.matches(HEARTBEAT).count();
807 assert!(heartbeats <= 1);
808 assert!(string.contains("data:a\n\n"), "string = {:?}", string);
809 assert!(string.contains("data:b\n\n"), "string = {:?}", string);
810 }
811}