rocket/listener/
unix.rs
1use std::io;
2use std::path::{Path, PathBuf};
3
4use either::{Either, Left, Right};
5use tokio::time::{sleep, Duration};
6
7use crate::fs::NamedFile;
8use crate::listener::{Listener, Bind, Connection, Endpoint};
9use crate::util::unix;
10use crate::{Ignite, Rocket};
11
12pub use tokio::net::UnixStream;
13
14pub struct UnixListener {
25 path: PathBuf,
26 lock: Option<NamedFile>,
27 listener: tokio::net::UnixListener,
28}
29
30impl UnixListener {
31 pub async fn bind<P: AsRef<Path>>(path: P, reuse: bool) -> io::Result<Self> {
32 let path = path.as_ref();
33 let lock = if reuse {
34 let lock_ext = match path.extension().and_then(|s| s.to_str()) {
35 Some(ext) if !ext.is_empty() => format!("{}.lock", ext),
36 _ => "lock".to_string()
37 };
38
39 let mut opts = tokio::fs::File::options();
40 opts.create(true).write(true);
41 let lock_path = path.with_extension(lock_ext);
42 let lock_file = NamedFile::open_with(lock_path, &opts).await?;
43
44 unix::lock_exclusive_nonblocking(lock_file.file())?;
45 if path.exists() {
46 tokio::fs::remove_file(&path).await?;
47 }
48
49 Some(lock_file)
50 } else {
51 None
52 };
53
54 let mut retries = 5;
58 let listener = loop {
59 match tokio::net::UnixListener::bind(&path) {
60 Ok(listener) => break listener,
61 Err(e) if path.exists() && lock.is_none() => return Err(e),
62 Err(_) if retries > 0 => {
63 retries -= 1;
64 sleep(Duration::from_millis(100)).await;
65 },
66 Err(e) => return Err(e),
67 }
68 };
69
70 Ok(UnixListener { lock, listener, path: path.into() })
71 }
72}
73
74impl Bind for UnixListener {
75 type Error = Either<figment::Error, io::Error>;
76
77 async fn bind(rocket: &Rocket<Ignite>) -> Result<Self, Self::Error> {
78 let endpoint = Self::bind_endpoint(rocket)?;
79 let path = endpoint.unix()
80 .ok_or_else(|| Right(io::Error::other("internal error: invalid endpoint")))?;
81
82 let reuse: Option<bool> = rocket.figment().extract_inner("reuse").map_err(Left)?;
83 Ok(Self::bind(path, reuse.unwrap_or(true)).await.map_err(Right)?)
84 }
85
86 fn bind_endpoint(rocket: &Rocket<Ignite>) -> Result<Endpoint, Self::Error> {
87 let as_pathbuf = |e: Option<&Endpoint>| e.and_then(|e| e.unix().map(|p| p.to_path_buf()));
88 Endpoint::fetch(rocket.figment(), "unix", "address", as_pathbuf)
89 .map(Endpoint::Unix)
90 .map_err(Left)
91 }
92}
93
94impl Listener for UnixListener {
95 type Accept = UnixStream;
96
97 type Connection = Self::Accept;
98
99 async fn accept(&self) -> io::Result<Self::Accept> {
100 Ok(self.listener.accept().await?.0)
101 }
102
103 async fn connect(&self, accept:Self::Accept) -> io::Result<Self::Connection> {
104 Ok(accept)
105 }
106
107 fn endpoint(&self) -> io::Result<Endpoint> {
108 self.listener.local_addr()?.try_into()
109 }
110}
111
112impl Connection for UnixStream {
113 fn endpoint(&self) -> io::Result<Endpoint> {
114 self.local_addr()?.try_into()
115 }
116}
117
118impl Drop for UnixListener {
119 fn drop(&mut self) {
120 if let Some(lock) = &self.lock {
121 let _ = std::fs::remove_file(&self.path);
122 let _ = std::fs::remove_file(lock.path());
123 let _ = unix::unlock_nonblocking(lock.file());
124 } else {
125 let _ = std::fs::remove_file(&self.path);
126 }
127 }
128}