]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
Correctly call all adapter lifecycle methods, docs and test
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::error::M17Error;
3 use crate::link_setup::LinkSetup;
4 use crate::tnc::Tnc;
5 use crate::{LsfFrame, PacketType, StreamFrame};
6 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
7 use m17core::protocol::EncryptionType;
8
9 use log::debug;
10 use std::collections::HashMap;
11 use std::sync::mpsc;
12 use std::sync::{Arc, RwLock};
13
14 pub struct M17App {
15 adapters: Arc<RwLock<Adapters>>,
16 event_tx: mpsc::SyncSender<TncControlEvent>,
17 }
18
19 impl M17App {
20 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
21 let write_tnc = tnc.try_clone().unwrap();
22 let (event_tx, event_rx) = mpsc::sync_channel(128);
23 let listeners = Arc::new(RwLock::new(Adapters::new()));
24 spawn_reader(tnc, listeners.clone());
25 spawn_writer(write_tnc, event_rx);
26 Self {
27 adapters: listeners,
28 event_tx,
29 }
30 }
31
32 pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
33 let adapter = Arc::new(adapter);
34 let mut adapters = self.adapters.write().unwrap();
35 let id = adapters.next;
36 adapters.next += 1;
37 adapters.packet.insert(id, adapter.clone());
38 drop(adapters);
39 adapter.adapter_registered(id, self.tx());
40 id
41 }
42
43 pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
44 let adapter = Arc::new(adapter);
45 let mut adapters = self.adapters.write().unwrap();
46 let id = adapters.next;
47 adapters.next += 1;
48 adapters.stream.insert(id, adapter.clone());
49 drop(adapters);
50 adapter.adapter_registered(id, self.tx());
51 id
52 }
53
54 pub fn remove_packet_adapter(&self, id: usize) {
55 if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
56 a.adapter_removed();
57 }
58 }
59
60 pub fn remove_stream_adapter(&self, id: usize) {
61 if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
62 a.adapter_removed();
63 }
64 }
65
66 /// Create a handle that can be used to transmit data on the TNC
67 pub fn tx(&self) -> TxHandle {
68 TxHandle {
69 event_tx: self.event_tx.clone(),
70 }
71 }
72
73 pub fn start(&self) {
74 {
75 let adapters = self.adapters.read().unwrap();
76 for (_, p) in &adapters.packet {
77 p.tnc_started();
78 }
79 for (_, s) in &adapters.stream {
80 s.tnc_started();
81 }
82 }
83 let _ = self.event_tx.send(TncControlEvent::Start);
84 }
85
86 pub fn close(&self) {
87 {
88 let adapters = self.adapters.read().unwrap();
89 for (_, p) in &adapters.packet {
90 p.tnc_closed();
91 }
92 for (_, s) in &adapters.stream {
93 s.tnc_closed();
94 }
95 }
96 // TODO: blocking function to indicate TNC has finished closing
97 // then we could call this in a signal handler to ensure PTT is dropped before quit
98 let _ = self.event_tx.send(TncControlEvent::Close);
99 }
100 }
101
102 pub struct TxHandle {
103 event_tx: mpsc::SyncSender<TncControlEvent>,
104 }
105
106 impl TxHandle {
107 pub fn transmit_packet(
108 &self,
109 link_setup: &LinkSetup,
110 packet_type: PacketType,
111 payload: &[u8],
112 ) -> Result<(), M17Error> {
113 let (pack_type, pack_type_len) = packet_type.as_proto();
114 if pack_type_len + payload.len() > 823 {
115 return Err(M17Error::PacketTooLarge {
116 provided: payload.len(),
117 capacity: 823 - pack_type_len,
118 });
119 }
120 let mut full_payload = vec![];
121 full_payload.extend_from_slice(&pack_type[0..pack_type_len]);
122 full_payload.extend_from_slice(payload);
123 let crc = m17core::crc::m17_crc(&full_payload);
124 full_payload.extend_from_slice(&crc.to_be_bytes());
125 let kiss_frame = KissFrame::new_full_packet(&link_setup.raw.0, &full_payload).unwrap();
126 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
127 Ok(())
128 }
129
130 pub fn transmit_stream_start(&self, link_setup: &LinkSetup) {
131 let kiss_frame = KissFrame::new_stream_setup(&link_setup.raw.0).unwrap();
132 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
133 }
134
135 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
136
137 pub fn transmit_stream_next(&self, stream: &StreamFrame) {
138 let kiss_frame = KissFrame::new_stream_data(stream).unwrap();
139 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
140 }
141 }
142
143 /// Synchronised structure for listeners subscribing to packets and streams.
144 ///
145 /// Each listener will be notified in turn of each event.
146 struct Adapters {
147 /// Identifier to be assigned to the next listener, starting from 0
148 next: usize,
149 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
150 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
151 }
152
153 impl Adapters {
154 fn new() -> Self {
155 Self {
156 next: 0,
157 packet: HashMap::new(),
158 stream: HashMap::new(),
159 }
160 }
161 }
162
163 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
164 #[allow(clippy::large_enum_variant)]
165 enum TncControlEvent {
166 Kiss(KissFrame),
167 Start,
168 Close,
169 }
170
171 fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
172 std::thread::spawn(move || {
173 let mut kiss_buffer = KissBuffer::new();
174 let mut stream_running = false;
175 loop {
176 let buf = kiss_buffer.buf_remaining();
177 let n = match tnc.read(buf) {
178 Ok(n) => n,
179 Err(_) => break,
180 };
181 kiss_buffer.did_write(n);
182 while let Some(frame) = kiss_buffer.next_frame() {
183 if frame.command() != Ok(KissCommand::DataFrame) {
184 continue;
185 }
186 match frame.port() {
187 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
188 // no action
189 // we will handle the more full-featured version from from port 1
190 }
191 Ok(m17core::kiss::PORT_PACKET_FULL) => {
192 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
193 let Ok(n) = frame.decode_payload(&mut payload) else {
194 debug!("failed to decode payload from KISS frame");
195 continue;
196 };
197 if n < 33 {
198 debug!("unusually short full packet frame");
199 continue;
200 }
201 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
202 if lsf.check_crc() != 0 {
203 debug!("LSF in full packet frame did not pass CRC");
204 continue;
205 }
206 if lsf.encryption_type() != EncryptionType::None {
207 debug!("we only understand None encryption for now - skipping packet");
208 continue;
209 }
210 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
211 else {
212 debug!("failed to decode packet type");
213 continue;
214 };
215 if (n - 30 - type_len) < 2 {
216 debug!("packet payload too small to provide CRC");
217 continue;
218 }
219 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
220 if packet_crc != 0 {
221 debug!("packet CRC does not pass");
222 continue;
223 }
224 let packet_payload: Arc<[u8]> =
225 Arc::from(&payload[(30 + type_len)..(n - 2)]);
226
227 let subs: Vec<_> =
228 adapters.read().unwrap().packet.values().cloned().collect();
229 for s in subs {
230 s.packet_received(
231 LinkSetup::new_raw(lsf.clone()),
232 packet_type,
233 packet_payload.clone(),
234 );
235 }
236 }
237 Ok(m17core::kiss::PORT_STREAM) => {
238 let mut payload = [0u8; 32];
239 let Ok(n) = frame.decode_payload(&mut payload) else {
240 debug!("failed to decode stream payload from KISS frame");
241 continue;
242 };
243 if n == 30 {
244 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
245 if lsf.check_crc() != 0 {
246 debug!("initial LSF in stream did not pass CRC");
247 continue;
248 }
249 stream_running = true;
250 let subs: Vec<_> =
251 adapters.read().unwrap().stream.values().cloned().collect();
252 for s in subs {
253 s.stream_began(LinkSetup::new_raw(lsf.clone()));
254 }
255 } else if n == 26 {
256 if !stream_running {
257 debug!("ignoring stream data as we didn't get a valid LSF first");
258 continue;
259 }
260 // TODO: parse LICH and handle the different changing subvalues META could have
261 if m17core::crc::m17_crc(&payload[6..n]) != 0 {
262 debug!("stream data CRC mismatch");
263 continue;
264 }
265 let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
266 let is_final = (frame_number & 0x8000) > 0;
267 frame_number &= 0x7fff;
268 let data: [u8; 16] = payload[8..24].try_into().unwrap();
269 let data = Arc::new(data);
270 if is_final {
271 stream_running = false;
272 }
273 let subs: Vec<_> =
274 adapters.read().unwrap().stream.values().cloned().collect();
275 for s in subs {
276 s.stream_data(frame_number, is_final, data.clone());
277 }
278 }
279 }
280 _ => (),
281 }
282 }
283 }
284 });
285 }
286
287 fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
288 std::thread::spawn(move || {
289 while let Ok(ev) = event_rx.recv() {
290 match ev {
291 TncControlEvent::Kiss(k) => {
292 if let Err(e) = tnc.write_all(k.as_bytes()) {
293 debug!("kiss send err: {:?}", e);
294 return;
295 }
296 }
297 TncControlEvent::Start => {
298 if let Err(e) = tnc.start() {
299 debug!("tnc start err: {:?}", e);
300 return;
301 }
302 }
303 TncControlEvent::Close => {
304 if let Err(e) = tnc.close() {
305 debug!("tnc close err: {:?}", e);
306 return;
307 }
308 }
309 }
310 }
311 });
312 }
313
314 #[cfg(test)]
315 mod tests {
316 use crate::{link_setup::M17Address, test_util::NullTnc};
317
318 use super::*;
319
320 #[test]
321 fn packet_payload_len() {
322 let app = M17App::new(NullTnc);
323 let res = app.tx().transmit_packet(
324 &LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()),
325 PacketType::Raw,
326 &[0u8; 100],
327 );
328 assert_eq!(res, Ok(()));
329 let res = app.tx().transmit_packet(
330 &LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()),
331 PacketType::Raw,
332 &[0u8; 900],
333 );
334 assert_eq!(
335 res,
336 Err(M17Error::PacketTooLarge {
337 provided: 900,
338 capacity: 822
339 })
340 );
341 }
342
343 #[test]
344 fn adapter_lifecycle() {
345 #[derive(Debug, PartialEq)]
346 enum Event {
347 Registered(usize),
348 Removed,
349 Started,
350 Closed,
351 }
352 macro_rules! event_impl {
353 ($target:ty, $trait:ty) => {
354 impl $trait for $target {
355 fn adapter_registered(&self, id: usize, _handle: TxHandle) {
356 self.0.send(Event::Registered(id)).unwrap();
357 }
358
359 fn adapter_removed(&self) {
360 self.0.send(Event::Removed).unwrap();
361 }
362
363 fn tnc_started(&self) {
364 self.0.send(Event::Started).unwrap();
365 }
366
367 fn tnc_closed(&self) {
368 self.0.send(Event::Closed).unwrap();
369 }
370 }
371 };
372 }
373 struct FakePacket(mpsc::SyncSender<Event>);
374 struct FakeStream(mpsc::SyncSender<Event>);
375 event_impl!(FakePacket, PacketAdapter);
376 event_impl!(FakeStream, StreamAdapter);
377
378 let app = M17App::new(NullTnc);
379 let (tx_p, rx_p) = mpsc::sync_channel(128);
380 let (tx_s, rx_s) = mpsc::sync_channel(128);
381 let packet = FakePacket(tx_p);
382 let stream = FakeStream(tx_s);
383
384 let id_p = app.add_packet_adapter(packet);
385 let id_s = app.add_stream_adapter(stream);
386 app.start();
387 app.close();
388 app.remove_packet_adapter(id_p);
389 app.remove_stream_adapter(id_s);
390
391 assert_eq!(rx_p.try_recv(), Ok(Event::Registered(0)));
392 assert_eq!(rx_p.try_recv(), Ok(Event::Started));
393 assert_eq!(rx_p.try_recv(), Ok(Event::Closed));
394 assert_eq!(rx_p.try_recv(), Ok(Event::Removed));
395 assert_eq!(rx_p.try_recv(), Err(mpsc::TryRecvError::Disconnected));
396
397 assert_eq!(rx_s.try_recv(), Ok(Event::Registered(1)));
398 assert_eq!(rx_s.try_recv(), Ok(Event::Started));
399 assert_eq!(rx_s.try_recv(), Ok(Event::Closed));
400 assert_eq!(rx_s.try_recv(), Ok(Event::Removed));
401 assert_eq!(rx_s.try_recv(), Err(mpsc::TryRecvError::Disconnected));
402 }
403 }