1use std::io;
2use std::path::{Path, PathBuf};
3
4use either::{Either, Left, Right};
5use figment::error::Kind;
6use tokio::time::{sleep, Duration};
7
8use crate::fs::NamedFile;
9use crate::listener::{Listener, Bind, Connection, Endpoint};
10use crate::util::unix;
11use crate::{Ignite, Rocket};
12
13pub use tokio::net::UnixStream;
14
15pub struct UnixListener {
26 path: PathBuf,
27 lock: Option<NamedFile>,
28 listener: tokio::net::UnixListener,
29}
30
31impl UnixListener {
32 pub async fn bind<P: AsRef<Path>>(path: P, reuse: bool) -> io::Result<Self> {
33 let path = path.as_ref();
34 let lock = if reuse {
35 let lock_ext = match path.extension().and_then(|s| s.to_str()) {
36 Some(ext) if !ext.is_empty() => format!("{}.lock", ext),
37 _ => "lock".to_string()
38 };
39
40 let mut opts = tokio::fs::File::options();
41 opts.create(true).write(true);
42 let lock_path = path.with_extension(lock_ext);
43 let lock_file = NamedFile::open_with(lock_path, &opts).await?;
44
45 unix::lock_exclusive_nonblocking(lock_file.file())?;
46 if path.exists() {
47 tokio::fs::remove_file(&path).await?;
48 }
49
50 Some(lock_file)
51 } else {
52 None
53 };
54
55 let mut retries = 5;
59 let listener = loop {
60 match tokio::net::UnixListener::bind(&path) {
61 Ok(listener) => break listener,
62 Err(e) if path.exists() && lock.is_none() => return Err(e),
63 Err(_) if retries > 0 => {
64 retries -= 1;
65 sleep(Duration::from_millis(100)).await;
66 },
67 Err(e) => return Err(e),
68 }
69 };
70
71 Ok(UnixListener { lock, listener, path: path.into() })
72 }
73}
74
75impl Bind for UnixListener {
76 type Error = Either<figment::Error, io::Error>;
77
78 async fn bind(rocket: &Rocket<Ignite>) -> Result<Self, Self::Error> {
79 let endpoint = Self::bind_endpoint(rocket)?;
80 let path = endpoint.unix()
81 .ok_or_else(|| Right(io::Error::other("internal error: invalid endpoint")))?;
82
83 let reuse: bool = rocket.figment()
84 .extract_inner("reuse")
85 .or_else(|e| if e.missing() { Ok(true) } else { Err(e) } )
86 .map_err(Left)?;
87 Ok(Self::bind(path, reuse).await.map_err(Right)?)
88 }
89
90 fn bind_endpoint(rocket: &Rocket<Ignite>) -> Result<Endpoint, Self::Error> {
91 let as_pathbuf = |e: Option<&Endpoint>| e.and_then(|e| e.unix().map(|p| p.to_path_buf()));
92 Endpoint::fetch(rocket.figment(), "unix", "address", as_pathbuf)
93 .map(Endpoint::Unix)
94 .map_err(Left)
95 }
96}
97
98impl Listener for UnixListener {
99 type Accept = UnixStream;
100
101 type Connection = Self::Accept;
102
103 async fn accept(&self) -> io::Result<Self::Accept> {
104 Ok(self.listener.accept().await?.0)
105 }
106
107 async fn connect(&self, accept:Self::Accept) -> io::Result<Self::Connection> {
108 Ok(accept)
109 }
110
111 fn endpoint(&self) -> io::Result<Endpoint> {
112 self.listener.local_addr()?.try_into()
113 }
114}
115
116impl Connection for UnixStream {
117 fn endpoint(&self) -> io::Result<Endpoint> {
118 self.local_addr()?.try_into()
119 }
120}
121
122impl Drop for UnixListener {
123 fn drop(&mut self) {
124 if let Some(lock) = &self.lock {
125 let _ = std::fs::remove_file(&self.path);
126 let _ = std::fs::remove_file(lock.path());
127 let _ = unix::unlock_nonblocking(lock.file());
128 } else {
129 let _ = std::fs::remove_file(&self.path);
130 }
131 }
132}