]> code.octet-stream.net Git - m17rt/blob - m17app/src/reflector.rs
Handle reflector packet traffic
[m17rt] / m17app / src / reflector.rs
1 use std::{
2 io::{self, Read, Write},
3 net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket},
4 sync::{
5 Arc, Mutex,
6 atomic::{AtomicBool, Ordering},
7 mpsc::{self, Receiver, Sender},
8 },
9 thread,
10 time::Duration,
11 };
12
13 use crate::{link_setup::M17Address, tnc::Tnc, util::out_buffer::OutBuffer};
14 use m17core::{
15 kiss::{KissBuffer, KissCommand, KissFrame, PORT_PACKET_BASIC, PORT_PACKET_FULL, PORT_STREAM},
16 protocol::{LsfFrame, StreamFrame},
17 reflector::{
18 convert::{RfToVoice, VoiceToRf},
19 packet::{Connect, Packet, Pong, ServerMessage, Voice},
20 },
21 };
22
23 #[derive(Debug, PartialEq, Eq, Clone)]
24 pub struct ReflectorClientConfig {
25 pub hostname: String,
26 pub port: u16,
27 pub module: char,
28 pub local_callsign: M17Address,
29 }
30
31 type WrappedStatusHandler = Arc<Mutex<dyn StatusHandler + Send + 'static>>;
32
33 /// Network-based TNC that attempts to maintain a UDP connection to a reflector.
34 ///
35 /// Streams will be sent and received over IP rather than RF.
36 #[derive(Clone)]
37 pub struct ReflectorClientTnc {
38 config: ReflectorClientConfig,
39 status_handler: WrappedStatusHandler,
40 kiss_out_tx: Sender<Arc<[u8]>>,
41 kiss_out: OutBuffer,
42 event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
43 is_closed: Arc<AtomicBool>,
44 kiss_buffer: Arc<Mutex<KissBuffer>>,
45 rf_to_voice: Arc<Mutex<Option<RfToVoice>>>,
46 }
47
48 impl ReflectorClientTnc {
49 /// Create a new Reflector Client TNC.
50 ///
51 /// You must provide a configuration object and a handler for status events, such as when the TNC
52 /// connects and disconnects. The status events are purely information and if you're not interested
53 /// in them, provide a `NullStatusHandler`.
54 pub fn new<S: StatusHandler + Send + 'static>(
55 config: ReflectorClientConfig,
56 status: S,
57 ) -> Self {
58 let (tx, rx) = mpsc::channel();
59 Self {
60 config,
61 status_handler: Arc::new(Mutex::new(status)),
62 kiss_out_tx: tx,
63 kiss_out: OutBuffer::new(rx),
64 event_tx: Arc::new(Mutex::new(None)),
65 is_closed: Arc::new(AtomicBool::new(false)),
66 kiss_buffer: Arc::new(Mutex::new(KissBuffer::new())),
67 rf_to_voice: Arc::new(Mutex::new(None)),
68 }
69 }
70 }
71
72 impl Read for ReflectorClientTnc {
73 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
74 self.kiss_out.read(buf)
75 }
76 }
77
78 impl Write for ReflectorClientTnc {
79 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
80 let mut kiss = self.kiss_buffer.lock().unwrap();
81 let rem = kiss.buf_remaining();
82 let sz = buf.len().min(rem.len());
83 rem[0..sz].copy_from_slice(&buf[0..sz]);
84 kiss.did_write(sz);
85 while let Some(frame) = kiss.next_frame() {
86 if Ok(KissCommand::DataFrame) == frame.command() && frame.port() == Ok(PORT_STREAM) {
87 let mut payload = [0u8; 30];
88 if let Ok(len) = frame.decode_payload(&mut payload) {
89 if len == 30 {
90 let lsf = LsfFrame(payload);
91 let mut to_voice = self.rf_to_voice.lock().unwrap();
92 match &mut *to_voice {
93 Some(to_voice) => to_voice.process_lsf(lsf),
94 None => *to_voice = Some(RfToVoice::new(lsf)),
95 }
96 } else if len == 26 {
97 let frame_num_part = u16::from_be_bytes([payload[6], payload[7]]);
98 let frame = StreamFrame {
99 lich_idx: payload[5] >> 5,
100 lich_part: payload[0..5].try_into().unwrap(),
101 frame_number: frame_num_part & 0x7fff,
102 end_of_stream: frame_num_part & 0x8000 > 0,
103 stream_data: payload[8..24].try_into().unwrap(),
104 };
105 let to_voice = self.rf_to_voice.lock().unwrap();
106 if let Some(to_voice) = &*to_voice {
107 let voice = to_voice.process_stream(&frame);
108 if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
109 let _ = tx.send(TncEvent::TransmitVoice(voice));
110 }
111 }
112 }
113 };
114 } else if Ok(KissCommand::DataFrame) == frame.command()
115 && frame.port() == Ok(PORT_PACKET_BASIC)
116 {
117 // basic packets not supported for now, they will require more config
118 } else if Ok(KissCommand::DataFrame) == frame.command()
119 && frame.port() == Ok(PORT_PACKET_FULL)
120 {
121 let mut payload = [0u8; 855];
122 let Ok(len) = frame.decode_payload(&mut payload) else {
123 continue;
124 };
125 if len < 33 {
126 continue;
127 }
128 let mut lsf = LsfFrame([0u8; 30]);
129 lsf.0.copy_from_slice(&payload[0..30]);
130 if lsf.check_crc() != 0 {
131 continue;
132 }
133 let mut packet = Packet::new();
134 packet.set_link_setup_frame(&lsf);
135 packet.set_payload(&payload[30..]);
136 }
137 }
138 Ok(sz)
139 }
140
141 fn flush(&mut self) -> std::io::Result<()> {
142 Ok(())
143 }
144 }
145
146 impl Tnc for ReflectorClientTnc {
147 fn try_clone(&mut self) -> Result<Self, crate::tnc::TncError> {
148 Ok(self.clone())
149 }
150
151 fn start(&mut self) {
152 spawn_runner(
153 self.config.clone(),
154 self.status_handler.clone(),
155 self.event_tx.clone(),
156 self.is_closed.clone(),
157 self.kiss_out_tx.clone(),
158 );
159 }
160
161 fn close(&mut self) {
162 if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
163 self.is_closed.store(true, Ordering::Release);
164 let _ = tx.send(TncEvent::Close);
165 }
166 }
167 }
168
169 #[allow(clippy::large_enum_variant)]
170 enum TncEvent {
171 Close,
172 Received(ServerMessage),
173 TransmitVoice(Voice),
174 }
175
176 fn spawn_runner(
177 config: ReflectorClientConfig,
178 status: WrappedStatusHandler,
179 event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
180 is_closed: Arc<AtomicBool>,
181 kiss_out_tx: Sender<Arc<[u8]>>,
182 ) {
183 std::thread::spawn(move || {
184 status
185 .lock()
186 .unwrap()
187 .status_changed(TncStatus::Disconnected);
188 while !is_closed.load(Ordering::Acquire) {
189 status.lock().unwrap().status_changed(TncStatus::Connecting);
190 let sa = if let Ok(mut sa_iter) =
191 (config.hostname.as_str(), config.port).to_socket_addrs()
192 {
193 if let Some(sa) = sa_iter.next() {
194 sa
195 } else {
196 status
197 .lock()
198 .unwrap()
199 .status_changed(TncStatus::Disconnected);
200 thread::sleep(Duration::from_secs(10));
201 continue;
202 }
203 } else {
204 status
205 .lock()
206 .unwrap()
207 .status_changed(TncStatus::Disconnected);
208 thread::sleep(Duration::from_secs(10));
209 continue;
210 };
211 let (tx, rx) = mpsc::channel();
212 *event_tx.lock().unwrap() = Some(tx.clone());
213 if !is_closed.load(Ordering::Acquire) {
214 run_single_conn(
215 sa,
216 tx,
217 rx,
218 kiss_out_tx.clone(),
219 config.clone(),
220 status.clone(),
221 );
222 // Cool off a bit if connect rejected, etc.
223 thread::sleep(Duration::from_secs(10));
224 }
225 }
226 status.lock().unwrap().status_changed(TncStatus::Closed);
227 });
228 }
229
230 fn run_single_conn(
231 dest: SocketAddr,
232 event_tx: Sender<TncEvent>,
233 event_rx: Receiver<TncEvent>,
234 kiss_out_tx: Sender<Arc<[u8]>>,
235 config: ReflectorClientConfig,
236 status: WrappedStatusHandler,
237 ) {
238 let socket = if dest.is_ipv4() {
239 UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).unwrap()
240 } else {
241 UdpSocket::bind((Ipv6Addr::UNSPECIFIED, 0)).unwrap()
242 };
243
244 let mut connect = Connect::new();
245 connect.set_address(config.local_callsign.address());
246 connect.set_module(config.module);
247 let _ = socket.send_to(connect.as_bytes(), dest);
248 let mut converter = VoiceToRf::new();
249 let single_conn_ended = Arc::new(AtomicBool::new(false));
250 // TODO: unwrap
251 spawn_reader(
252 socket.try_clone().unwrap(),
253 event_tx,
254 single_conn_ended.clone(),
255 );
256
257 while let Ok(ev) = event_rx.recv_timeout(Duration::from_secs(30)) {
258 match ev {
259 TncEvent::Close => {
260 break;
261 }
262 TncEvent::Received(server_msg) => match server_msg {
263 ServerMessage::ConnectAcknowledge(_) => {
264 status.lock().unwrap().status_changed(TncStatus::Connected);
265 }
266 ServerMessage::ConnectNack(_) => {
267 status
268 .lock()
269 .unwrap()
270 .status_changed(TncStatus::ConnectRejected);
271 break;
272 }
273 ServerMessage::ForceDisconnect(_) => {
274 status
275 .lock()
276 .unwrap()
277 .status_changed(TncStatus::ForceDisconnect);
278 break;
279 }
280 ServerMessage::Voice(voice) => {
281 let (lsf, stream) = converter.next(&voice);
282 if let Some(lsf) = lsf {
283 let kiss = KissFrame::new_stream_setup(&lsf.0).unwrap();
284 let _ = kiss_out_tx.send(kiss.as_bytes().into());
285 }
286 let kiss = KissFrame::new_stream_data(&stream).unwrap();
287 let _ = kiss_out_tx.send(kiss.as_bytes().into());
288 }
289 ServerMessage::Packet(packet) => {
290 if let Ok(kiss) =
291 KissFrame::new_full_packet(&packet.link_setup_frame().0, packet.payload())
292 {
293 let _ = kiss_out_tx.send(kiss.as_bytes().into());
294 }
295 }
296 ServerMessage::Ping(_ping) => {
297 let mut pong = Pong::new();
298 pong.set_address(config.local_callsign.address());
299 if socket.send_to(pong.as_bytes(), dest).is_err() {
300 break;
301 }
302 }
303 _ => {}
304 },
305 TncEvent::TransmitVoice(voice) => {
306 if socket.send_to(voice.as_bytes(), dest).is_err() {
307 break;
308 };
309 }
310 }
311 }
312 single_conn_ended.store(true, Ordering::Release);
313 status
314 .lock()
315 .unwrap()
316 .status_changed(TncStatus::Disconnected);
317 }
318
319 fn spawn_reader(socket: UdpSocket, event_tx: Sender<TncEvent>, cancel: Arc<AtomicBool>) {
320 std::thread::spawn(move || {
321 let mut buf = [0u8; 2048];
322 while let Ok((n, _sa)) = socket.recv_from(&mut buf) {
323 if cancel.load(Ordering::Acquire) {
324 break;
325 }
326 if let Some(msg) = ServerMessage::parse(&buf[..n]) {
327 if event_tx.send(TncEvent::Received(msg)).is_err() {
328 break;
329 }
330 }
331 }
332 });
333 }
334
335 /// Callbacks to get runtime information about how the reflector client TNC is operating
336 pub trait StatusHandler {
337 fn status_changed(&mut self, status: TncStatus);
338 }
339
340 #[derive(Debug, PartialEq, Eq, Clone)]
341 pub enum TncStatus {
342 Disconnected,
343 Connecting,
344 Connected,
345 ConnectRejected,
346 ForceDisconnect,
347 Closed,
348 }
349
350 pub struct NullStatusHandler;
351 impl StatusHandler for NullStatusHandler {
352 fn status_changed(&mut self, _status: TncStatus) {}
353 }