]> code.octet-stream.net Git - netwatcher/blob - src/watch_linux.rs
Block on dropping handle on Linux
[netwatcher] / src / watch_linux.rs
1 use std::os::fd::AsRawFd;
2 use std::os::fd::OwnedFd;
3 use std::sync::mpsc;
4
5 use nix::libc::poll;
6 use nix::libc::pollfd;
7 use nix::libc::POLLIN;
8 use nix::sys::socket::bind;
9 use nix::sys::socket::recv;
10 use nix::sys::socket::socket;
11 use nix::sys::socket::AddressFamily;
12 use nix::sys::socket::MsgFlags;
13 use nix::sys::socket::NetlinkAddr;
14 use nix::sys::socket::SockProtocol;
15 use nix::sys::socket::SockType;
16 use nix::sys::socket::SOCK_NONBLOCK;
17 use nix::unistd::pipe;
18
19 use crate::Error;
20 use crate::List;
21 use crate::Update;
22
23 const RTMGRP_IPV4_IFADDR: u32 = 0x10;
24 const RTMGRP_IPV6_IFADDR: u32 = 0x20;
25 const RTMGRP_LINK: u32 = 0x01;
26
27 pub(crate) struct WatchHandle {
28 // Close on drop, which will be detected by poll in background thread
29 pipefd: Option<OwnedFd>,
30
31 // Detect when thread has completed
32 complete: Option<mpsc::Receiver<()>>,
33 }
34
35 impl Drop for WatchHandle {
36 fn drop(&mut self) {
37 drop(self.pipefd.take());
38 let _ = self.complete.take().recv();
39 }
40 }
41
42 pub(crate) fn watch_interfaces<F: FnMut(Update) + Send + 'static>(
43 callback: F,
44 ) -> Result<WatchHandle, Error> {
45 let (pipefd, complete) = start_watcher_thread(callback)?;
46 Ok(WatchHandle {
47 pipefd: Some(pipefd),
48 complete: Some(complete),
49 })
50 }
51
52 fn start_watcher_thread<F: FnMut(Update) + Send + 'static>(
53 mut callback: F,
54 ) -> Result<(OwnedFd, mpsc::Receiver<()>), Error> {
55 let sockfd = socket(
56 AddressFamily::Netlink,
57 SockType::Raw,
58 SOCK_NONBLOCK,
59 Some(SockProtocol::NetlinkRoute),
60 )
61 .map_err(|e| Error::CreateSocket(e.to_string()))?;
62 sockfd.set_nonblocking(true);
63 let sa_nl = NetlinkAddr::new(
64 0,
65 (RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR) as u32,
66 );
67 bind(sockfd.as_raw_fd(), &sa_nl).map_err(|e| Error::Bind(e.to_string()))?;
68 let (pipe_rd, pipe_wr) = pipe().map_err(|e| Error::CreatePipe(e.to_string()))?;
69
70 let mut prev_list = List::default();
71 let mut handle_update = move |new_list: List| {
72 if new_list == prev_list {
73 return;
74 }
75 let update = Update {
76 interfaces: new_list.0.clone(),
77 diff: new_list.diff_from(&prev_list),
78 };
79 (callback)(update);
80 prev_list = new_list;
81 };
82
83 // Now that netlink socket is open, provide an initial update.
84 // By having this outside the thread we can return an error synchronously if it
85 // looks like we're going to have trouble listing interfaces.
86 handle_update(crate::list::list_interfaces()?);
87
88 let (complete_tx, complete_rx) = mpsc::channel();
89
90 std::thread::spawn(move || {
91 let mut buf = [0u8; 4096];
92
93 loop {
94 let mut fds = [
95 pollfd {
96 fd: sockfd.as_raw_fd(),
97 events: POLLIN,
98 revents: 0,
99 },
100 pollfd {
101 fd: pipe_rd.as_raw_fd(),
102 events: POLLIN,
103 revents: 0,
104 },
105 ];
106 unsafe {
107 poll(&mut fds as *mut _, 2, -1);
108 }
109 if fds[0].revents != 0 {
110 // netlink socket had something happen
111 if recv(sockfd.as_raw_fd(), &mut buf, MsgFlags::empty()).is_ok() {
112 let Ok(new_list) = crate::list::list_interfaces() else {
113 continue;
114 };
115 handle_update(new_list);
116 }
117 }
118 if fds[1].revents != 0 {
119 // pipe had something happen
120 break;
121 }
122 }
123
124 drop(complete_tx);
125 });
126
127 Ok(pipe_wr)
128 }