rocket/listener/
unix.rs

1use std::io;
2use std::path::{Path, PathBuf};
3
4use either::{Either, Left, Right};
5use figment::error::Kind;
6use tokio::time::{sleep, Duration};
7
8use crate::fs::NamedFile;
9use crate::listener::{Listener, Bind, Connection, Endpoint};
10use crate::util::unix;
11use crate::{Ignite, Rocket};
12
13pub use tokio::net::UnixStream;
14
15/// Unix domain sockets listener.
16///
17/// # Configuration
18///
19/// Reads the following configuration parameters:
20///
21/// | parameter | type         | default | note                                      |
22/// |-----------|--------------|---------|-------------------------------------------|
23/// | `address` | [`Endpoint`] |         | required: must be `unix:path`             |
24/// | `reuse`   | boolean      | `true`  | whether to create/reuse/delete the socket |
25pub struct UnixListener {
26    path: PathBuf,
27    lock: Option<NamedFile>,
28    listener: tokio::net::UnixListener,
29}
30
31impl UnixListener {
32    pub async fn bind<P: AsRef<Path>>(path: P, reuse: bool) -> io::Result<Self> {
33        let path = path.as_ref();
34        let lock = if reuse {
35            let lock_ext = match path.extension().and_then(|s| s.to_str()) {
36                Some(ext) if !ext.is_empty() => format!("{}.lock", ext),
37                _ => "lock".to_string()
38            };
39
40            let mut opts = tokio::fs::File::options();
41            opts.create(true).write(true);
42            let lock_path = path.with_extension(lock_ext);
43            let lock_file = NamedFile::open_with(lock_path, &opts).await?;
44
45            unix::lock_exclusive_nonblocking(lock_file.file())?;
46            if path.exists() {
47                tokio::fs::remove_file(&path).await?;
48            }
49
50            Some(lock_file)
51        } else {
52            None
53        };
54
55        // Sometimes, we get `AddrInUse`, even though we've tried deleting the
56        // socket. If all is well, eventually the socket will _really_ be gone,
57        // and this will succeed. So let's try a few times.
58        let mut retries = 5;
59        let listener = loop {
60            match tokio::net::UnixListener::bind(&path) {
61                Ok(listener) => break listener,
62                Err(e) if path.exists() && lock.is_none() => return Err(e),
63                Err(_) if retries > 0 => {
64                    retries -= 1;
65                    sleep(Duration::from_millis(100)).await;
66                },
67                Err(e) => return Err(e),
68            }
69        };
70
71        Ok(UnixListener { lock, listener, path: path.into() })
72    }
73}
74
75impl Bind for UnixListener {
76    type Error = Either<figment::Error, io::Error>;
77
78    async fn bind(rocket: &Rocket<Ignite>) -> Result<Self, Self::Error> {
79        let endpoint = Self::bind_endpoint(rocket)?;
80        let path = endpoint.unix()
81            .ok_or_else(|| Right(io::Error::other("internal error: invalid endpoint")))?;
82
83        let reuse: bool = rocket.figment()
84            .extract_inner("reuse")
85            .or_else(|e| if e.missing() { Ok(true) } else { Err(e) } )
86            .map_err(Left)?;
87        Ok(Self::bind(path, reuse).await.map_err(Right)?)
88    }
89
90    fn bind_endpoint(rocket: &Rocket<Ignite>) -> Result<Endpoint, Self::Error> {
91        let as_pathbuf = |e: Option<&Endpoint>| e.and_then(|e| e.unix().map(|p| p.to_path_buf()));
92        Endpoint::fetch(rocket.figment(), "unix", "address", as_pathbuf)
93            .map(Endpoint::Unix)
94            .map_err(Left)
95    }
96}
97
98impl Listener for UnixListener {
99    type Accept = UnixStream;
100
101    type Connection = Self::Accept;
102
103    async fn accept(&self) -> io::Result<Self::Accept> {
104        Ok(self.listener.accept().await?.0)
105    }
106
107    async fn connect(&self, accept:Self::Accept) -> io::Result<Self::Connection> {
108        Ok(accept)
109    }
110
111    fn endpoint(&self) -> io::Result<Endpoint> {
112        self.listener.local_addr()?.try_into()
113    }
114}
115
116impl Connection for UnixStream {
117    fn endpoint(&self) -> io::Result<Endpoint> {
118        self.local_addr()?.try_into()
119    }
120}
121
122impl Drop for UnixListener {
123    fn drop(&mut self) {
124        if let Some(lock) = &self.lock {
125            let _ = std::fs::remove_file(&self.path);
126            let _ = std::fs::remove_file(lock.path());
127            let _ = unix::unlock_nonblocking(lock.file());
128        } else {
129            let _ = std::fs::remove_file(&self.path);
130        }
131    }
132}