]> code.octet-stream.net Git - netwatcher/blob - src/watch_linux.rs
Add top-level docs and update README
[netwatcher] / src / watch_linux.rs
1 use std::os::fd::AsRawFd;
2 use std::os::fd::OwnedFd;
3 use std::sync::mpsc;
4
5 use nix::libc::poll;
6 use nix::libc::pollfd;
7 use nix::libc::POLLIN;
8 use nix::sys::socket::bind;
9 use nix::sys::socket::recv;
10 use nix::sys::socket::socket;
11 use nix::sys::socket::AddressFamily;
12 use nix::sys::socket::MsgFlags;
13 use nix::sys::socket::NetlinkAddr;
14 use nix::sys::socket::SockFlag;
15 use nix::sys::socket::SockProtocol;
16 use nix::sys::socket::SockType;
17 use nix::unistd::pipe;
18
19 use crate::Error;
20 use crate::List;
21 use crate::Update;
22
23 const RTMGRP_IPV4_IFADDR: u32 = 0x10;
24 const RTMGRP_IPV6_IFADDR: u32 = 0x20;
25 const RTMGRP_LINK: u32 = 0x01;
26
27 pub(crate) struct WatchHandle {
28 // Close on drop, which will be detected by poll in background thread
29 pipefd: Option<OwnedFd>,
30
31 // Detect when thread has completed
32 complete: Option<mpsc::Receiver<()>>,
33 }
34
35 impl Drop for WatchHandle {
36 fn drop(&mut self) {
37 drop(self.pipefd.take());
38 let _ = self.complete.take().unwrap().recv();
39 }
40 }
41
42 pub(crate) fn watch_interfaces<F: FnMut(Update) + Send + 'static>(
43 callback: F,
44 ) -> Result<WatchHandle, Error> {
45 let (pipefd, complete) = start_watcher_thread(callback)?;
46 Ok(WatchHandle {
47 pipefd: Some(pipefd),
48 complete: Some(complete),
49 })
50 }
51
52 fn start_watcher_thread<F: FnMut(Update) + Send + 'static>(
53 mut callback: F,
54 ) -> Result<(OwnedFd, mpsc::Receiver<()>), Error> {
55 let sockfd = socket(
56 AddressFamily::Netlink,
57 SockType::Raw,
58 SockFlag::SOCK_NONBLOCK,
59 Some(SockProtocol::NetlinkRoute),
60 )
61 .map_err(|e| Error::CreateSocket(e.to_string()))?;
62 let sa_nl = NetlinkAddr::new(
63 0,
64 (RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR) as u32,
65 );
66 bind(sockfd.as_raw_fd(), &sa_nl).map_err(|e| Error::Bind(e.to_string()))?;
67 let (pipe_rd, pipe_wr) = pipe().map_err(|e| Error::CreatePipe(e.to_string()))?;
68
69 let mut prev_list = List::default();
70 let mut handle_update = move |new_list: List| {
71 if new_list == prev_list {
72 return;
73 }
74 let update = Update {
75 interfaces: new_list.0.clone(),
76 diff: new_list.diff_from(&prev_list),
77 };
78 (callback)(update);
79 prev_list = new_list;
80 };
81
82 // Now that netlink socket is open, provide an initial update.
83 // By having this outside the thread we can return an error synchronously if it
84 // looks like we're going to have trouble listing interfaces.
85 handle_update(crate::list::list_interfaces()?);
86
87 let (complete_tx, complete_rx) = mpsc::channel();
88
89 std::thread::spawn(move || {
90 let mut buf = [0u8; 4096];
91
92 loop {
93 let mut fds = [
94 pollfd {
95 fd: sockfd.as_raw_fd(),
96 events: POLLIN,
97 revents: 0,
98 },
99 pollfd {
100 fd: pipe_rd.as_raw_fd(),
101 events: POLLIN,
102 revents: 0,
103 },
104 ];
105 unsafe {
106 poll(&mut fds as *mut _, 2, -1);
107 }
108 if fds[0].revents != 0 {
109 // netlink socket had something happen
110 if recv(sockfd.as_raw_fd(), &mut buf, MsgFlags::empty()).is_ok() {
111 let Ok(new_list) = crate::list::list_interfaces() else {
112 continue;
113 };
114 handle_update(new_list);
115 }
116 }
117 if fds[1].revents != 0 {
118 // pipe had something happen
119 break;
120 }
121 }
122
123 drop(complete_tx);
124 });
125
126 Ok((pipe_wr, complete_rx))
127 }