rocket/listener/
unix.rs

1use std::io;
2use std::path::{Path, PathBuf};
3
4use either::{Either, Left, Right};
5use tokio::time::{sleep, Duration};
6
7use crate::fs::NamedFile;
8use crate::listener::{Listener, Bind, Connection, Endpoint};
9use crate::util::unix;
10use crate::{Ignite, Rocket};
11
12pub use tokio::net::UnixStream;
13
14/// Unix domain sockets listener.
15///
16/// # Configuration
17///
18/// Reads the following configuration parameters:
19///
20/// | parameter | type         | default | note                                      |
21/// |-----------|--------------|---------|-------------------------------------------|
22/// | `address` | [`Endpoint`] |         | required: must be `unix:path`             |
23/// | `reuse`   | boolean      | `true`  | whether to create/reuse/delete the socket |
24pub struct UnixListener {
25    path: PathBuf,
26    lock: Option<NamedFile>,
27    listener: tokio::net::UnixListener,
28}
29
30impl UnixListener {
31    pub async fn bind<P: AsRef<Path>>(path: P, reuse: bool) -> io::Result<Self> {
32        let path = path.as_ref();
33        let lock = if reuse {
34            let lock_ext = match path.extension().and_then(|s| s.to_str()) {
35                Some(ext) if !ext.is_empty() => format!("{}.lock", ext),
36                _ => "lock".to_string()
37            };
38
39            let mut opts = tokio::fs::File::options();
40            opts.create(true).write(true);
41            let lock_path = path.with_extension(lock_ext);
42            let lock_file = NamedFile::open_with(lock_path, &opts).await?;
43
44            unix::lock_exclusive_nonblocking(lock_file.file())?;
45            if path.exists() {
46                tokio::fs::remove_file(&path).await?;
47            }
48
49            Some(lock_file)
50        } else {
51            None
52        };
53
54        // Sometimes, we get `AddrInUse`, even though we've tried deleting the
55        // socket. If all is well, eventually the socket will _really_ be gone,
56        // and this will succeed. So let's try a few times.
57        let mut retries = 5;
58        let listener = loop {
59            match tokio::net::UnixListener::bind(&path) {
60                Ok(listener) => break listener,
61                Err(e) if path.exists() && lock.is_none() => return Err(e),
62                Err(_) if retries > 0 => {
63                    retries -= 1;
64                    sleep(Duration::from_millis(100)).await;
65                },
66                Err(e) => return Err(e),
67            }
68        };
69
70        Ok(UnixListener { lock, listener, path: path.into() })
71    }
72}
73
74impl Bind for UnixListener {
75    type Error = Either<figment::Error, io::Error>;
76
77    async fn bind(rocket: &Rocket<Ignite>) -> Result<Self, Self::Error> {
78        let endpoint = Self::bind_endpoint(rocket)?;
79        let path = endpoint.unix()
80            .ok_or_else(|| Right(io::Error::other("internal error: invalid endpoint")))?;
81
82        let reuse: Option<bool> = rocket.figment().extract_inner("reuse").map_err(Left)?;
83        Ok(Self::bind(path, reuse.unwrap_or(true)).await.map_err(Right)?)
84    }
85
86    fn bind_endpoint(rocket: &Rocket<Ignite>) -> Result<Endpoint, Self::Error> {
87        let as_pathbuf = |e: Option<&Endpoint>| e.and_then(|e| e.unix().map(|p| p.to_path_buf()));
88        Endpoint::fetch(rocket.figment(), "unix", "address", as_pathbuf)
89            .map(Endpoint::Unix)
90            .map_err(Left)
91    }
92}
93
94impl Listener for UnixListener {
95    type Accept = UnixStream;
96
97    type Connection = Self::Accept;
98
99    async fn accept(&self) -> io::Result<Self::Accept> {
100        Ok(self.listener.accept().await?.0)
101    }
102
103    async fn connect(&self, accept:Self::Accept) -> io::Result<Self::Connection> {
104        Ok(accept)
105    }
106
107    fn endpoint(&self) -> io::Result<Endpoint> {
108        self.listener.local_addr()?.try_into()
109    }
110}
111
112impl Connection for UnixStream {
113    fn endpoint(&self) -> io::Result<Endpoint> {
114        self.local_addr()?.try_into()
115    }
116}
117
118impl Drop for UnixListener {
119    fn drop(&mut self) {
120        if let Some(lock) = &self.lock {
121            let _ = std::fs::remove_file(&self.path);
122            let _ = std::fs::remove_file(lock.path());
123            let _ = unix::unlock_nonblocking(lock.file());
124        } else {
125            let _ = std::fs::remove_file(&self.path);
126        }
127    }
128}