X-Git-Url: https://code.octet-stream.net/netwatcher/blobdiff_plain/f9df7170ac929ccc915642c2e6415e768973bb54..cc86d4310fa2b8af2675e82004facc1d90ef6922:/src/watch_linux.rs?ds=sidebyside diff --git a/src/watch_linux.rs b/src/watch_linux.rs index 95ef106..e49d629 100644 --- a/src/watch_linux.rs +++ b/src/watch_linux.rs @@ -1,12 +1,10 @@ use std::os::fd::AsRawFd; use std::os::fd::OwnedFd; +use std::sync::mpsc; use nix::libc::poll; use nix::libc::pollfd; use nix::libc::POLLIN; -use nix::libc::RTMGRP_IPV4_IFADDR; -use nix::libc::RTMGRP_IPV6_IFADDR; -use nix::libc::RTMGRP_LINK; use nix::sys::socket::bind; use nix::sys::socket::recv; use nix::sys::socket::socket; @@ -22,34 +20,51 @@ use crate::Error; use crate::List; use crate::Update; +const RTMGRP_IPV4_IFADDR: u32 = 0x10; +const RTMGRP_IPV6_IFADDR: u32 = 0x20; +const RTMGRP_LINK: u32 = 0x01; + pub(crate) struct WatchHandle { - // Dropping will close the fd which will be detected by poll - _pipefd: OwnedFd, + // Close on drop, which will be detected by poll in background thread + pipefd: Option, + + // Detect when thread has completed + complete: Option>, +} + +impl Drop for WatchHandle { + fn drop(&mut self) { + drop(self.pipefd.take()); + let _ = self.complete.take().unwrap().recv(); + } } pub(crate) fn watch_interfaces( callback: F, ) -> Result { - let pipefd = start_watcher_thread(callback)?; - Ok(WatchHandle { _pipefd: pipefd }) + let (pipefd, complete) = start_watcher_thread(callback)?; + Ok(WatchHandle { + pipefd: Some(pipefd), + complete: Some(complete), + }) } fn start_watcher_thread( mut callback: F, -) -> Result { +) -> Result<(OwnedFd, mpsc::Receiver<()>), Error> { let sockfd = socket( AddressFamily::Netlink, SockType::Raw, - SockFlag::empty(), + SockFlag::SOCK_NONBLOCK, Some(SockProtocol::NetlinkRoute), ) - .map_err(|_| Error::Internal)?; // TODO: proper errors + .map_err(|e| Error::CreateSocket(e.to_string()))?; let sa_nl = NetlinkAddr::new( 0, (RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR) as u32, ); - bind(sockfd.as_raw_fd(), &sa_nl).map_err(|_| Error::Internal)?; // TODO: proper errors - let (pipe_rd, pipe_wr) = pipe().map_err(|_| Error::Internal)?; + bind(sockfd.as_raw_fd(), &sa_nl).map_err(|e| Error::Bind(e.to_string()))?; + let (pipe_rd, pipe_wr) = pipe().map_err(|e| Error::CreatePipe(e.to_string()))?; let mut prev_list = List::default(); let mut handle_update = move |new_list: List| { @@ -69,6 +84,8 @@ fn start_watcher_thread( // looks like we're going to have trouble listing interfaces. handle_update(crate::list::list_interfaces()?); + let (complete_tx, complete_rx) = mpsc::channel(); + std::thread::spawn(move || { let mut buf = [0u8; 4096]; @@ -102,7 +119,9 @@ fn start_watcher_thread( break; } } + + drop(complete_tx); }); - Ok(pipe_wr) + Ok((pipe_wr, complete_rx)) }