]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
dc2138da0605879ef6b34878af5e477b35075143
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::error::{M17Error, M17Errors};
3 use crate::link_setup::LinkSetup;
4 use crate::tnc::Tnc;
5 use crate::{LsfFrame, PacketType, StreamFrame};
6 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
7 use m17core::protocol::EncryptionType;
8
9 use log::debug;
10 use std::collections::HashMap;
11 use std::sync::mpsc;
12 use std::sync::{Arc, RwLock};
13
14 #[derive(Debug, Clone, PartialEq, Eq, Copy)]
15 enum Lifecycle {
16 Setup,
17 Started,
18 Closed,
19 }
20
21 pub struct M17App {
22 adapters: Arc<RwLock<Adapters>>,
23 event_tx: mpsc::SyncSender<TncControlEvent>,
24 lifecycle: RwLock<Lifecycle>,
25 }
26
27 impl M17App {
28 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
29 let write_tnc = tnc.try_clone().unwrap();
30 let (event_tx, event_rx) = mpsc::sync_channel(128);
31 let listeners = Arc::new(RwLock::new(Adapters::new()));
32 spawn_reader(tnc, listeners.clone());
33 spawn_writer(write_tnc, event_rx);
34 Self {
35 adapters: listeners,
36 event_tx,
37 lifecycle: RwLock::new(Lifecycle::Setup),
38 }
39 }
40
41 pub fn add_packet_adapter<P: PacketAdapter + 'static>(
42 &self,
43 adapter: P,
44 ) -> Result<usize, M17Error> {
45 let adapter = Arc::new(adapter);
46 let mut adapters = self.adapters.write().unwrap();
47 let id = adapters.next;
48 adapters.next += 1;
49 adapters.packet.insert(id, adapter.clone());
50 drop(adapters);
51 if self.lifecycle() == Lifecycle::Started {
52 adapter
53 .start(self.tx())
54 .map_err(|e| M17Error::Adapter(id, e))?;
55 }
56 Ok(id)
57 }
58
59 pub fn add_stream_adapter<S: StreamAdapter + 'static>(
60 &self,
61 adapter: S,
62 ) -> Result<usize, M17Error> {
63 let adapter = Arc::new(adapter);
64 let mut adapters = self.adapters.write().unwrap();
65 let id = adapters.next;
66 adapters.next += 1;
67 adapters.stream.insert(id, adapter.clone());
68 drop(adapters);
69 if self.lifecycle() == Lifecycle::Started {
70 adapter
71 .start(self.tx())
72 .map_err(|e| M17Error::Adapter(id, e))?;
73 }
74 Ok(id)
75 }
76
77 pub fn remove_packet_adapter(&self, id: usize) -> Result<(), M17Error> {
78 if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
79 if self.lifecycle() == Lifecycle::Started {
80 a.close().map_err(|e| M17Error::Adapter(id, e))?;
81 }
82 }
83 Ok(())
84 }
85
86 pub fn remove_stream_adapter(&self, id: usize) -> Result<(), M17Error> {
87 if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
88 if self.lifecycle() == Lifecycle::Started {
89 a.close().map_err(|e| M17Error::Adapter(id, e))?;
90 }
91 }
92 Ok(())
93 }
94
95 /// Create a handle that can be used to transmit data on the TNC
96 pub fn tx(&self) -> TxHandle {
97 TxHandle {
98 event_tx: self.event_tx.clone(),
99 }
100 }
101
102 pub fn start(&self) -> Result<(), M17Errors> {
103 if self.lifecycle() != Lifecycle::Setup {
104 return Err(M17Errors(vec![M17Error::InvalidStart]));
105 }
106 self.set_lifecycle(Lifecycle::Started);
107 let mut errs = vec![];
108 {
109 let adapters = self.adapters.read().unwrap();
110 for (i, p) in &adapters.packet {
111 if let Err(e) = p.start(self.tx()) {
112 errs.push(M17Error::Adapter(*i, e));
113 }
114 }
115 for (i, s) in &adapters.stream {
116 if let Err(e) = s.start(self.tx()) {
117 errs.push(M17Error::Adapter(*i, e));
118 }
119 }
120 }
121 let _ = self.event_tx.send(TncControlEvent::Start);
122 if errs.is_empty() {
123 Ok(())
124 } else {
125 Err(M17Errors(errs))
126 }
127 }
128
129 pub fn close(&self) -> Result<(), M17Errors> {
130 if self.lifecycle() != Lifecycle::Started {
131 return Err(M17Errors(vec![M17Error::InvalidClose]));
132 }
133 self.set_lifecycle(Lifecycle::Closed);
134 let mut errs = vec![];
135 {
136 let adapters = self.adapters.read().unwrap();
137 for (i, p) in &adapters.packet {
138 if let Err(e) = p.close() {
139 errs.push(M17Error::Adapter(*i, e));
140 }
141 }
142 for (i, s) in &adapters.stream {
143 if let Err(e) = s.close() {
144 errs.push(M17Error::Adapter(*i, e));
145 }
146 }
147 }
148 // TODO: blocking function to indicate TNC has finished closing
149 // then we could call this in a signal handler to ensure PTT is dropped before quit
150 let _ = self.event_tx.send(TncControlEvent::Close);
151 if errs.is_empty() {
152 Ok(())
153 } else {
154 Err(M17Errors(errs))
155 }
156 }
157
158 fn lifecycle(&self) -> Lifecycle {
159 *self.lifecycle.read().unwrap()
160 }
161
162 fn set_lifecycle(&self, lifecycle: Lifecycle) {
163 *self.lifecycle.write().unwrap() = lifecycle;
164 }
165 }
166
167 pub struct TxHandle {
168 event_tx: mpsc::SyncSender<TncControlEvent>,
169 }
170
171 impl TxHandle {
172 pub fn transmit_packet(
173 &self,
174 link_setup: &LinkSetup,
175 packet_type: PacketType,
176 payload: &[u8],
177 ) -> Result<(), M17Error> {
178 let (pack_type, pack_type_len) = packet_type.as_proto();
179 if pack_type_len + payload.len() > 823 {
180 return Err(M17Error::PacketTooLarge {
181 provided: payload.len(),
182 capacity: 823 - pack_type_len,
183 });
184 }
185 let mut full_payload = vec![];
186 full_payload.extend_from_slice(&pack_type[0..pack_type_len]);
187 full_payload.extend_from_slice(payload);
188 let crc = m17core::crc::m17_crc(&full_payload);
189 full_payload.extend_from_slice(&crc.to_be_bytes());
190 let kiss_frame = KissFrame::new_full_packet(&link_setup.raw.0, &full_payload).unwrap();
191 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
192 Ok(())
193 }
194
195 pub fn transmit_stream_start(&self, link_setup: &LinkSetup) {
196 let kiss_frame = KissFrame::new_stream_setup(&link_setup.raw.0).unwrap();
197 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
198 }
199
200 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
201
202 pub fn transmit_stream_next(&self, stream: &StreamFrame) {
203 let kiss_frame = KissFrame::new_stream_data(stream).unwrap();
204 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
205 }
206 }
207
208 /// Synchronised structure for listeners subscribing to packets and streams.
209 ///
210 /// Each listener will be notified in turn of each event.
211 struct Adapters {
212 /// Identifier to be assigned to the next listener, starting from 0
213 next: usize,
214 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
215 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
216 }
217
218 impl Adapters {
219 fn new() -> Self {
220 Self {
221 next: 0,
222 packet: HashMap::new(),
223 stream: HashMap::new(),
224 }
225 }
226 }
227
228 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
229 #[allow(clippy::large_enum_variant)]
230 enum TncControlEvent {
231 Kiss(KissFrame),
232 Start,
233 Close,
234 }
235
236 fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
237 std::thread::spawn(move || {
238 let mut kiss_buffer = KissBuffer::new();
239 let mut stream_running = false;
240 loop {
241 let buf = kiss_buffer.buf_remaining();
242 let n = match tnc.read(buf) {
243 Ok(n) => n,
244 Err(_) => break,
245 };
246 kiss_buffer.did_write(n);
247 while let Some(frame) = kiss_buffer.next_frame() {
248 if frame.command() != Ok(KissCommand::DataFrame) {
249 continue;
250 }
251 match frame.port() {
252 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
253 // no action
254 // we will handle the more full-featured version from from port 1
255 }
256 Ok(m17core::kiss::PORT_PACKET_FULL) => {
257 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
258 let Ok(n) = frame.decode_payload(&mut payload) else {
259 debug!("failed to decode payload from KISS frame");
260 continue;
261 };
262 if n < 33 {
263 debug!("unusually short full packet frame");
264 continue;
265 }
266 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
267 if lsf.check_crc() != 0 {
268 debug!("LSF in full packet frame did not pass CRC");
269 continue;
270 }
271 if lsf.encryption_type() != EncryptionType::None {
272 debug!("we only understand None encryption for now - skipping packet");
273 continue;
274 }
275 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
276 else {
277 debug!("failed to decode packet type");
278 continue;
279 };
280 if (n - 30 - type_len) < 2 {
281 debug!("packet payload too small to provide CRC");
282 continue;
283 }
284 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
285 if packet_crc != 0 {
286 debug!("packet CRC does not pass");
287 continue;
288 }
289 let packet_payload: Arc<[u8]> =
290 Arc::from(&payload[(30 + type_len)..(n - 2)]);
291
292 let subs: Vec<_> =
293 adapters.read().unwrap().packet.values().cloned().collect();
294 for s in subs {
295 s.packet_received(
296 LinkSetup::new_raw(lsf.clone()),
297 packet_type,
298 packet_payload.clone(),
299 );
300 }
301 }
302 Ok(m17core::kiss::PORT_STREAM) => {
303 let mut payload = [0u8; 32];
304 let Ok(n) = frame.decode_payload(&mut payload) else {
305 debug!("failed to decode stream payload from KISS frame");
306 continue;
307 };
308 if n == 30 {
309 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
310 if lsf.check_crc() != 0 {
311 debug!("initial LSF in stream did not pass CRC");
312 continue;
313 }
314 stream_running = true;
315 let subs: Vec<_> =
316 adapters.read().unwrap().stream.values().cloned().collect();
317 for s in subs {
318 s.stream_began(LinkSetup::new_raw(lsf.clone()));
319 }
320 } else if n == 26 {
321 if !stream_running {
322 debug!("ignoring stream data as we didn't get a valid LSF first");
323 continue;
324 }
325 // TODO: parse LICH and handle the different changing subvalues META could have
326 if m17core::crc::m17_crc(&payload[6..n]) != 0 {
327 debug!("stream data CRC mismatch");
328 continue;
329 }
330 let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
331 let is_final = (frame_number & 0x8000) > 0;
332 frame_number &= 0x7fff;
333 let data: [u8; 16] = payload[8..24].try_into().unwrap();
334 let data = Arc::new(data);
335 if is_final {
336 stream_running = false;
337 }
338 let subs: Vec<_> =
339 adapters.read().unwrap().stream.values().cloned().collect();
340 for s in subs {
341 s.stream_data(frame_number, is_final, data.clone());
342 }
343 }
344 }
345 _ => (),
346 }
347 }
348 }
349 });
350 }
351
352 fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
353 std::thread::spawn(move || {
354 while let Ok(ev) = event_rx.recv() {
355 match ev {
356 TncControlEvent::Kiss(k) => {
357 if let Err(e) = tnc.write_all(k.as_bytes()) {
358 debug!("kiss send err: {:?}", e);
359 return;
360 }
361 }
362 TncControlEvent::Start => {
363 if let Err(e) = tnc.start() {
364 debug!("tnc start err: {:?}", e);
365 return;
366 }
367 }
368 TncControlEvent::Close => {
369 if let Err(e) = tnc.close() {
370 debug!("tnc close err: {:?}", e);
371 return;
372 }
373 }
374 }
375 }
376 });
377 }
378
379 #[cfg(test)]
380 mod tests {
381 use crate::error::AdapterError;
382 use crate::{link_setup::M17Address, test_util::NullTnc};
383
384 use super::*;
385
386 #[test]
387 fn packet_payload_len() {
388 let app = M17App::new(NullTnc);
389 let res = app.tx().transmit_packet(
390 &LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()),
391 PacketType::Raw,
392 &[0u8; 100],
393 );
394 assert!(matches!(res, Ok(())));
395 let res = app.tx().transmit_packet(
396 &LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()),
397 PacketType::Raw,
398 &[0u8; 900],
399 );
400 assert!(matches!(
401 res,
402 Err(M17Error::PacketTooLarge {
403 provided: 900,
404 capacity: 822
405 })
406 ));
407 }
408
409 #[test]
410 fn adapter_lifecycle() {
411 #[derive(Debug, PartialEq)]
412 enum Event {
413 Started,
414 Closed,
415 }
416 macro_rules! event_impl {
417 ($target:ty, $trait:ty) => {
418 impl $trait for $target {
419 fn start(&self, _handle: TxHandle) -> Result<(), AdapterError> {
420 self.0.send(Event::Started)?;
421 Ok(())
422 }
423
424 fn close(&self) -> Result<(), AdapterError> {
425 self.0.send(Event::Closed)?;
426 Ok(())
427 }
428 }
429 };
430 }
431 struct FakePacket(mpsc::SyncSender<Event>);
432 struct FakeStream(mpsc::SyncSender<Event>);
433 event_impl!(FakePacket, PacketAdapter);
434 event_impl!(FakeStream, StreamAdapter);
435
436 let app = M17App::new(NullTnc);
437 let (tx_p, rx_p) = mpsc::sync_channel(128);
438 let (tx_s, rx_s) = mpsc::sync_channel(128);
439 let packet = FakePacket(tx_p);
440 let stream = FakeStream(tx_s);
441
442 let id_p = app.add_packet_adapter(packet).unwrap();
443 let id_s = app.add_stream_adapter(stream).unwrap();
444 app.start().unwrap();
445 app.close().unwrap();
446 app.remove_packet_adapter(id_p).unwrap();
447 app.remove_stream_adapter(id_s).unwrap();
448
449 assert_eq!(rx_p.try_recv(), Ok(Event::Started));
450 assert_eq!(rx_p.try_recv(), Ok(Event::Closed));
451 assert_eq!(rx_p.try_recv(), Err(mpsc::TryRecvError::Disconnected));
452
453 assert_eq!(rx_s.try_recv(), Ok(Event::Started));
454 assert_eq!(rx_s.try_recv(), Ok(Event::Closed));
455 assert_eq!(rx_s.try_recv(), Err(mpsc::TryRecvError::Disconnected));
456 }
457 }