rocket_ws/
duplex.rs
1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use rocket::data::IoStream;
5use rocket::futures::{StreamExt, SinkExt, Sink};
6use rocket::futures::stream::{Stream, FusedStream};
7
8use crate::frame::{Message, CloseFrame};
9use crate::result::{Result, Error};
10
11pub struct DuplexStream(tokio_tungstenite::WebSocketStream<IoStream>);
37
38impl DuplexStream {
39 pub(crate) async fn new(stream: IoStream, config: crate::Config) -> Self {
40 use tokio_tungstenite::WebSocketStream;
41 use crate::tungstenite::protocol::Role;
42
43 let inner = WebSocketStream::from_raw_socket(stream, Role::Server, Some(config));
44 DuplexStream(inner.await)
45 }
46
47 pub async fn close(&mut self, msg: Option<CloseFrame<'_>>) -> Result<()> {
49 self.0.close(msg).await
50 }
51}
52
53impl Stream for DuplexStream {
54 type Item = Result<Message>;
55
56 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57 self.get_mut().0.poll_next_unpin(cx)
58 }
59
60 fn size_hint(&self) -> (usize, Option<usize>) {
61 self.0.size_hint()
62 }
63}
64
65impl FusedStream for DuplexStream {
66 fn is_terminated(&self) -> bool {
67 self.0.is_terminated()
68 }
69}
70
71impl Sink<Message> for DuplexStream {
72 type Error = Error;
73
74 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75 self.get_mut().0.poll_ready_unpin(cx)
76 }
77
78 fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
79 self.get_mut().0.start_send_unpin(item)
80 }
81
82 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
83 self.get_mut().0.poll_flush_unpin(cx)
84 }
85
86 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 self.get_mut().0.poll_close_unpin(cx)
88 }
89}