rocket_ws/
duplex.rs
1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use rocket::data::IoStream;
5use rocket::futures::{StreamExt, SinkExt, Sink};
6use rocket::futures::stream::{Stream, FusedStream};
7
8use crate::frame::{Message, CloseFrame};
9use crate::result::{Result, Error};
10
11pub struct DuplexStream(tokio_tungstenite::WebSocketStream<IoStream>);
38
39impl DuplexStream {
40 pub(crate) async fn new(stream: IoStream, config: crate::Config) -> Self {
41 use tokio_tungstenite::WebSocketStream;
42 use crate::tungstenite::protocol::Role;
43
44 let inner = WebSocketStream::from_raw_socket(stream, Role::Server, Some(config));
45 DuplexStream(inner.await)
46 }
47
48 pub async fn close(&mut self, msg: Option<CloseFrame<'_>>) -> Result<()> {
50 self.0.close(msg).await
51 }
52}
53
54impl Stream for DuplexStream {
55 type Item = Result<Message>;
56
57 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58 self.get_mut().0.poll_next_unpin(cx)
59 }
60
61 fn size_hint(&self) -> (usize, Option<usize>) {
62 self.0.size_hint()
63 }
64}
65
66impl FusedStream for DuplexStream {
67 fn is_terminated(&self) -> bool {
68 self.0.is_terminated()
69 }
70}
71
72impl Sink<Message> for DuplexStream {
73 type Error = Error;
74
75 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76 self.get_mut().0.poll_ready_unpin(cx)
77 }
78
79 fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
80 self.get_mut().0.start_send_unpin(item)
81 }
82
83 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84 self.get_mut().0.poll_flush_unpin(cx)
85 }
86
87 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88 self.get_mut().0.poll_close_unpin(cx)
89 }
90}