]> code.octet-stream.net Git - m17rt/blobdiff - m17app/src/app.rs
Generate preamble and EOT
[m17rt] / m17app / src / app.rs
index 4a7a345f5fa1bf709fa0f2cb5e1a52c5f4caf0f2..aa040de51948e9632a15adb77da5fe01ef132a4c 100644 (file)
@@ -9,7 +9,7 @@ use std::sync::mpsc;
 use std::sync::{Arc, RwLock};
 
 pub struct M17App {
-    listeners: Arc<RwLock<Listeners>>,
+    adapters: Arc<RwLock<Adapters>>,
     event_tx: mpsc::SyncSender<TncControlEvent>,
 }
 
@@ -17,45 +17,47 @@ impl M17App {
     pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
         let write_tnc = tnc.try_clone().unwrap();
         let (event_tx, event_rx) = mpsc::sync_channel(128);
-        let listeners = Arc::new(RwLock::new(Listeners::new()));
+        let listeners = Arc::new(RwLock::new(Adapters::new()));
         spawn_reader(tnc, listeners.clone());
         spawn_writer(write_tnc, event_rx);
         Self {
-            listeners,
+            adapters: listeners,
             event_tx,
         }
     }
 
-    pub fn add_packet_listener<P: PacketAdapter + 'static>(&self, listener: P) -> usize {
-        let mut listeners = self.listeners.write().unwrap();
-        let id = listeners.next;
-        listeners.next += 1;
-        listeners.packet.insert(id, Arc::new(listener));
+    pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
+        let adapter = Arc::new(adapter);
+        let mut adapters = self.adapters.write().unwrap();
+        let id = adapters.next;
+        adapters.next += 1;
+        adapters.packet.insert(id, adapter.clone());
+        drop(adapters);
+        adapter.adapter_registered(id, self.tx());
         id
     }
 
-    pub fn add_stream_listener<S: StreamAdapter + 'static>(&self, listener: S) -> usize {
-        let mut listeners = self.listeners.write().unwrap();
-        let id = listeners.next;
-        listeners.next += 1;
-        listeners.stream.insert(id, Arc::new(listener));
+    pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
+        let adapter = Arc::new(adapter);
+        let mut adapters = self.adapters.write().unwrap();
+        let id = adapters.next;
+        adapters.next += 1;
+        adapters.stream.insert(id, adapter.clone());
+        drop(adapters);
+        adapter.adapter_registered(id, self.tx());
         id
     }
 
-    pub fn remove_packet_listener(&self, id: usize) {
-        self.listeners.write().unwrap().packet.remove(&id);
-    }
-
-    pub fn remove_stream_listener(&self, id: usize) {
-        self.listeners.write().unwrap().stream.remove(&id);
+    pub fn remove_packet_adapter(&self, id: usize) {
+        if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
+            a.adapter_removed();
+        }
     }
 
-    pub fn transmit_packet(&self, type_code: PacketType, payload: &[u8]) {
-        // hang on where do we get the LSF details from? We need a destination obviously
-        // our source address needs to be configured here too
-        // also there is possible CAN, encryption, meta payload
-
-        // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
+    pub fn remove_stream_adapter(&self, id: usize) {
+        if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
+            a.adapter_removed();
+        }
     }
 
     /// Create a handle that can be used to transmit data on the TNC
@@ -79,6 +81,14 @@ pub struct TxHandle {
 }
 
 impl TxHandle {
+    pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) {
+        // hang on where do we get the LSF details from? We need a destination obviously
+        // our source address needs to be configured here too
+        // also there is possible CAN, encryption, meta payload
+
+        // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
+    }
+
     // add more methods here for stream outgoing
 
     pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {}
@@ -91,14 +101,14 @@ impl TxHandle {
 /// Synchronised structure for listeners subscribing to packets and streams.
 ///
 /// Each listener will be notified in turn of each event.
-struct Listeners {
+struct Adapters {
     /// Identifier to be assigned to the next listener, starting from 0
     next: usize,
     packet: HashMap<usize, Arc<dyn PacketAdapter>>,
     stream: HashMap<usize, Arc<dyn StreamAdapter>>,
 }
 
-impl Listeners {
+impl Adapters {
     fn new() -> Self {
         Self {
             next: 0,
@@ -115,9 +125,10 @@ enum TncControlEvent {
     Close,
 }
 
-fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Listeners>>) {
+fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
     std::thread::spawn(move || {
         let mut kiss_buffer = KissBuffer::new();
+        let mut stream_running = false;
         loop {
             let mut buf = kiss_buffer.buf_remaining();
             let n = match tnc.read(&mut buf) {
@@ -171,7 +182,7 @@ fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Liste
                             Arc::from(&payload[(30 + type_len)..(n - 2)]);
 
                         let subs: Vec<_> =
-                            listeners.read().unwrap().packet.values().cloned().collect();
+                            adapters.read().unwrap().packet.values().cloned().collect();
                         for s in subs {
                             s.packet_received(
                                 lsf.clone(),
@@ -181,7 +192,47 @@ fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Liste
                         }
                     }
                     Ok(m17core::kiss::PORT_STREAM) => {
-                        // handle stream and send it to subscribers
+                        let mut payload = [0u8; 32];
+                        let Ok(n) = frame.decode_payload(&mut payload) else {
+                            debug!("failed to decode stream payload from KISS frame");
+                            continue;
+                        };
+                        if n == 30 {
+                            let lsf = LsfFrame(payload[0..30].try_into().unwrap());
+                            if lsf.crc() != 0 {
+                                debug!("initial LSF in stream did not pass CRC");
+                                continue;
+                            }
+                            stream_running = true;
+                            let subs: Vec<_> =
+                                adapters.read().unwrap().stream.values().cloned().collect();
+                            for s in subs {
+                                s.stream_began(lsf.clone());
+                            }
+                        } else if n == 26 {
+                            if !stream_running {
+                                debug!("ignoring stream data as we didn't get a valid LSF first");
+                                continue;
+                            }
+                            // TODO: parse LICH and handle the different changing subvalues META could have
+                            if m17core::crc::m17_crc(&payload[6..n]) != 0 {
+                                debug!("stream data CRC mismatch");
+                                continue;
+                            }
+                            let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
+                            let is_final = (frame_number & 0x8000) > 0;
+                            frame_number &= 0x7fff;
+                            let data: [u8; 16] = payload[8..24].try_into().unwrap();
+                            let data = Arc::new(data);
+                            if is_final {
+                                stream_running = false;
+                            }
+                            let subs: Vec<_> =
+                                adapters.read().unwrap().stream.values().cloned().collect();
+                            for s in subs {
+                                s.stream_data(frame_number, is_final, data.clone());
+                            }
+                        }
                     }
                     _ => (),
                 }
@@ -190,7 +241,7 @@ fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Liste
     });
 }
 
-fn spawn_writer<T: Tnc + Send + 'static>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
+fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
     std::thread::spawn(move || {
         while let Ok(ev) = event_rx.recv() {
             match ev {