X-Git-Url: https://code.octet-stream.net/netwatcher/blobdiff_plain/4abfa61b20e567fdd69ac3ca47a9c218971a30ff..91bdce994d6788c2c9a6b0488fd443615b1bcbe3:/src/watch_linux.rs?ds=sidebyside diff --git a/src/watch_linux.rs b/src/watch_linux.rs index efc3842..e49d629 100644 --- a/src/watch_linux.rs +++ b/src/watch_linux.rs @@ -1,9 +1,10 @@ use std::os::fd::AsRawFd; use std::os::fd::OwnedFd; +use std::sync::mpsc; -use nix::libc::RTMGRP_IPV4_IFADDR; -use nix::libc::RTMGRP_IPV6_IFADDR; -use nix::libc::RTMGRP_LINK; +use nix::libc::poll; +use nix::libc::pollfd; +use nix::libc::POLLIN; use nix::sys::socket::bind; use nix::sys::socket::recv; use nix::sys::socket::socket; @@ -13,61 +14,114 @@ use nix::sys::socket::NetlinkAddr; use nix::sys::socket::SockFlag; use nix::sys::socket::SockProtocol; use nix::sys::socket::SockType; +use nix::unistd::pipe; use crate::Error; use crate::List; use crate::Update; +const RTMGRP_IPV4_IFADDR: u32 = 0x10; +const RTMGRP_IPV6_IFADDR: u32 = 0x20; +const RTMGRP_LINK: u32 = 0x01; + pub(crate) struct WatchHandle { - // PROBLEM: close() doesn't cancel recv() for a netlink socket - // SOLUTION: open a pipe() and use poll() inside the thread to watch for cancellation too - sockfd: OwnedFd, + // Close on drop, which will be detected by poll in background thread + pipefd: Option, + + // Detect when thread has completed + complete: Option>, +} + +impl Drop for WatchHandle { + fn drop(&mut self) { + drop(self.pipefd.take()); + let _ = self.complete.take().unwrap().recv(); + } } pub(crate) fn watch_interfaces( callback: F, ) -> Result { - let sockfd = start_watcher_thread(callback)?; - Ok(WatchHandle { sockfd }) + let (pipefd, complete) = start_watcher_thread(callback)?; + Ok(WatchHandle { + pipefd: Some(pipefd), + complete: Some(complete), + }) } -fn start_watcher_thread(mut callback: F) -> Result { - let sockfd = socket(AddressFamily::Netlink, SockType::Raw, SockFlag::empty(), Some(SockProtocol::NetlinkRoute)) - .map_err(|_| Error::Internal)?; // TODO: proper errors - let sa_nl = NetlinkAddr::new(0, (RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR) as u32); - bind(sockfd.as_raw_fd(), &sa_nl).map_err(|_| Error::Internal)?; // TODO: proper errors - let fd = sockfd.as_raw_fd(); - println!("netlink socket on fd {}", fd); +fn start_watcher_thread( + mut callback: F, +) -> Result<(OwnedFd, mpsc::Receiver<()>), Error> { + let sockfd = socket( + AddressFamily::Netlink, + SockType::Raw, + SockFlag::SOCK_NONBLOCK, + Some(SockProtocol::NetlinkRoute), + ) + .map_err(|e| Error::CreateSocket(e.to_string()))?; + let sa_nl = NetlinkAddr::new( + 0, + (RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR) as u32, + ); + bind(sockfd.as_raw_fd(), &sa_nl).map_err(|e| Error::Bind(e.to_string()))?; + let (pipe_rd, pipe_wr) = pipe().map_err(|e| Error::CreatePipe(e.to_string()))?; - std::thread::spawn(move || { - println!("watch thread running"); - let mut prev_list = List::default(); - let mut buf = [0u8; 4096]; - let mut handle_update = move |new_list: List| { - if new_list == prev_list { - return; - } - let update = Update { - interfaces: new_list.0.clone(), - diff: new_list.diff_from(&prev_list), - }; - (callback)(update); - prev_list = new_list; + let mut prev_list = List::default(); + let mut handle_update = move |new_list: List| { + if new_list == prev_list { + return; + } + let update = Update { + interfaces: new_list.0.clone(), + diff: new_list.diff_from(&prev_list), }; + (callback)(update); + prev_list = new_list; + }; - if let Ok(initial) = crate::list::list_interfaces() { - handle_update(initial); - }; + // Now that netlink socket is open, provide an initial update. + // By having this outside the thread we can return an error synchronously if it + // looks like we're going to have trouble listing interfaces. + handle_update(crate::list::list_interfaces()?); - while let Ok(n) = recv(fd, &mut buf, MsgFlags::empty()) { - println!("something on the netlink socket: {} bytes", n); - let Ok(new_list) = crate::list::list_interfaces() else { - continue; - }; - handle_update(new_list); + let (complete_tx, complete_rx) = mpsc::channel(); + + std::thread::spawn(move || { + let mut buf = [0u8; 4096]; + + loop { + let mut fds = [ + pollfd { + fd: sockfd.as_raw_fd(), + events: POLLIN, + revents: 0, + }, + pollfd { + fd: pipe_rd.as_raw_fd(), + events: POLLIN, + revents: 0, + }, + ]; + unsafe { + poll(&mut fds as *mut _, 2, -1); + } + if fds[0].revents != 0 { + // netlink socket had something happen + if recv(sockfd.as_raw_fd(), &mut buf, MsgFlags::empty()).is_ok() { + let Ok(new_list) = crate::list::list_interfaces() else { + continue; + }; + handle_update(new_list); + } + } + if fds[1].revents != 0 { + // pipe had something happen + break; + } } - println!("netlink recv thread terminating"); + + drop(complete_tx); }); - - Ok(sockfd) + + Ok((pipe_wr, complete_rx)) }