]> code.octet-stream.net Git - m17rt/blob - m17app/src/reflector.rs
9735298131bd510132fb21b41bd7311d2e8f0a22
[m17rt] / m17app / src / reflector.rs
1 use std::{
2 io::{self, Read, Write},
3 net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket},
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 mpsc::{self, Receiver, Sender},
7 Arc, Mutex,
8 },
9 thread,
10 time::Duration,
11 };
12
13 use crate::{link_setup::M17Address, tnc::Tnc, util::out_buffer::OutBuffer};
14 use m17core::{
15 kiss::KissFrame,
16 reflector::{
17 convert::VoiceToRf,
18 packet::{Connect, Pong, ServerMessage},
19 },
20 };
21
22 #[derive(Debug, PartialEq, Eq, Clone)]
23 pub struct ReflectorClientConfig {
24 hostname: String,
25 port: u16,
26 module: char,
27 local_callsign: M17Address,
28 }
29
30 type WrappedStatusHandler = Arc<Mutex<dyn StatusHandler + Send + 'static>>;
31
32 /// Network-based TNC that attempts to maintain a UDP connection to a reflector.
33 ///
34 /// Streams will be sent and received over IP rather than RF.
35 #[derive(Clone)]
36 pub struct ReflectorClientTnc {
37 config: ReflectorClientConfig,
38 status_handler: WrappedStatusHandler,
39 kiss_out_tx: Sender<Arc<[u8]>>,
40 kiss_out: OutBuffer,
41 event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
42 is_closed: Arc<AtomicBool>,
43 }
44
45 impl ReflectorClientTnc {
46 /// Create a new Reflector Client TNC.
47 ///
48 /// You must provide a configuration object and a handler for status events, such as when the TNC
49 /// connects and disconnects. The status events are purely information and if you're not interested
50 /// in them, provide a `NullStatusHandler`.
51 pub fn new<S: StatusHandler + Send + 'static>(
52 config: ReflectorClientConfig,
53 status: S,
54 ) -> Self {
55 let (tx, rx) = mpsc::channel();
56 Self {
57 config,
58 status_handler: Arc::new(Mutex::new(status)),
59 kiss_out_tx: tx,
60 kiss_out: OutBuffer::new(rx),
61 event_tx: Arc::new(Mutex::new(None)),
62 is_closed: Arc::new(AtomicBool::new(false)),
63 }
64 }
65 }
66
67 impl Read for ReflectorClientTnc {
68 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
69 self.kiss_out.read(buf)
70 }
71 }
72
73 impl Write for ReflectorClientTnc {
74 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
75 Ok(buf.len())
76 }
77
78 fn flush(&mut self) -> std::io::Result<()> {
79 Ok(())
80 }
81 }
82
83 impl Tnc for ReflectorClientTnc {
84 fn try_clone(&mut self) -> Result<Self, crate::tnc::TncError> {
85 Ok(self.clone())
86 }
87
88 fn start(&mut self) {
89 spawn_runner(
90 self.config.clone(),
91 self.status_handler.clone(),
92 self.event_tx.clone(),
93 self.is_closed.clone(),
94 self.kiss_out_tx.clone(),
95 );
96 }
97
98 fn close(&mut self) {
99 if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
100 self.is_closed.store(true, Ordering::Release);
101 let _ = tx.send(TncEvent::Close);
102 }
103 }
104 }
105
106 #[allow(clippy::large_enum_variant)]
107 enum TncEvent {
108 Close,
109 Received(ServerMessage),
110 }
111
112 fn spawn_runner(
113 config: ReflectorClientConfig,
114 status: WrappedStatusHandler,
115 event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
116 is_closed: Arc<AtomicBool>,
117 kiss_out_tx: Sender<Arc<[u8]>>,
118 ) {
119 std::thread::spawn(move || {
120 status
121 .lock()
122 .unwrap()
123 .status_changed(TncStatus::Disconnected);
124 while !is_closed.load(Ordering::Acquire) {
125 status.lock().unwrap().status_changed(TncStatus::Connecting);
126 let sa = if let Ok(mut sa_iter) =
127 (config.hostname.as_str(), config.port).to_socket_addrs()
128 {
129 if let Some(sa) = sa_iter.next() {
130 sa
131 } else {
132 status
133 .lock()
134 .unwrap()
135 .status_changed(TncStatus::Disconnected);
136 thread::sleep(Duration::from_secs(10));
137 continue;
138 }
139 } else {
140 status
141 .lock()
142 .unwrap()
143 .status_changed(TncStatus::Disconnected);
144 thread::sleep(Duration::from_secs(10));
145 continue;
146 };
147 let (tx, rx) = mpsc::channel();
148 *event_tx.lock().unwrap() = Some(tx.clone());
149 if !is_closed.load(Ordering::Acquire) {
150 run_single_conn(
151 sa,
152 tx,
153 rx,
154 kiss_out_tx.clone(),
155 config.clone(),
156 status.clone(),
157 );
158 }
159 println!("single conn ended");
160 }
161 status.lock().unwrap().status_changed(TncStatus::Closed);
162 });
163 }
164
165 fn run_single_conn(
166 dest: SocketAddr,
167 event_tx: Sender<TncEvent>,
168 event_rx: Receiver<TncEvent>,
169 kiss_out_tx: Sender<Arc<[u8]>>,
170 config: ReflectorClientConfig,
171 status: WrappedStatusHandler,
172 ) {
173 let socket = if dest.is_ipv4() {
174 UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).unwrap()
175 } else {
176 UdpSocket::bind((Ipv6Addr::UNSPECIFIED, 0)).unwrap()
177 };
178
179 let mut connect = Connect::new();
180 connect.set_address(config.local_callsign.address().to_owned());
181 connect.set_module(config.module);
182 socket.send_to(connect.as_bytes(), dest).unwrap();
183 let mut converter = VoiceToRf::new();
184 let single_conn_ended = Arc::new(AtomicBool::new(false));
185 // TODO: unwrap
186 spawn_reader(
187 socket.try_clone().unwrap(),
188 event_tx,
189 single_conn_ended.clone(),
190 );
191
192 while let Ok(ev) = event_rx.recv_timeout(Duration::from_secs(30)) {
193 match ev {
194 TncEvent::Close => {
195 println!("writer: close");
196 break;
197 }
198 TncEvent::Received(server_msg) => match server_msg {
199 ServerMessage::ConnectAcknowledge(_) => {
200 status.lock().unwrap().status_changed(TncStatus::Connected);
201 }
202 ServerMessage::ConnectNack(_) => {
203 status
204 .lock()
205 .unwrap()
206 .status_changed(TncStatus::ConnectRejected);
207 break;
208 }
209 ServerMessage::ForceDisconnect(_) => {
210 status
211 .lock()
212 .unwrap()
213 .status_changed(TncStatus::ForceDisconnect);
214 break;
215 }
216 ServerMessage::Voice(voice) => {
217 let (lsf, stream) = converter.next(&voice);
218 if let Some(lsf) = lsf {
219 let kiss = KissFrame::new_stream_setup(&lsf.0).unwrap();
220 let _ = kiss_out_tx.send(kiss.as_bytes().into());
221 }
222 let kiss = KissFrame::new_stream_data(&stream).unwrap();
223 let _ = kiss_out_tx.send(kiss.as_bytes().into());
224 }
225 ServerMessage::Ping(_ping) => {
226 let mut pong = Pong::new();
227 pong.set_address(
228 M17Address::from_callsign("VK7XT")
229 .unwrap()
230 .address()
231 .clone(),
232 );
233 socket.send_to(pong.as_bytes(), dest).unwrap();
234 }
235 _ => {}
236 },
237 }
238 }
239 single_conn_ended.store(true, Ordering::Release);
240 status
241 .lock()
242 .unwrap()
243 .status_changed(TncStatus::Disconnected);
244 println!("write thread terminating");
245 }
246
247 fn spawn_reader(socket: UdpSocket, event_tx: Sender<TncEvent>, cancel: Arc<AtomicBool>) {
248 std::thread::spawn(move || {
249 let mut buf = [0u8; 2048];
250 while let Ok((n, _sa)) = socket.recv_from(&mut buf) {
251 if cancel.load(Ordering::Acquire) {
252 break;
253 }
254 if let Some(msg) = ServerMessage::parse(&buf[..n]) {
255 if event_tx.send(TncEvent::Received(msg)).is_err() {
256 break;
257 }
258 }
259 }
260 println!("read thread terminating");
261 });
262 }
263
264 /// Callbacks to get runtime information about how the reflector client TNC is operating
265 pub trait StatusHandler {
266 fn status_changed(&mut self, status: TncStatus);
267 }
268
269 #[derive(Debug, PartialEq, Eq, Clone)]
270 pub enum TncStatus {
271 Disconnected,
272 Connecting,
273 Connected,
274 ConnectRejected,
275 ForceDisconnect,
276 Closed,
277 }
278
279 pub struct NullStatusHandler;
280 impl StatusHandler for NullStatusHandler {
281 fn status_changed(&mut self, _status: TncStatus) {}
282 }