]> code.octet-stream.net Git - m17rt/blobdiff - m17app/src/app.rs
Pass stream data to adapters
[m17rt] / m17app / src / app.rs
index 05e3167139eb0f9ccadd27838643d3354970aaa2..81304559ddf6db33174da3fd74570ca046106547 100644 (file)
@@ -125,9 +125,10 @@ enum TncControlEvent {
     Close,
 }
 
     Close,
 }
 
-fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Adapters>>) {
+fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
     std::thread::spawn(move || {
         let mut kiss_buffer = KissBuffer::new();
     std::thread::spawn(move || {
         let mut kiss_buffer = KissBuffer::new();
+        let mut stream_running = false;
         loop {
             let mut buf = kiss_buffer.buf_remaining();
             let n = match tnc.read(&mut buf) {
         loop {
             let mut buf = kiss_buffer.buf_remaining();
             let n = match tnc.read(&mut buf) {
@@ -181,7 +182,7 @@ fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Adapt
                             Arc::from(&payload[(30 + type_len)..(n - 2)]);
 
                         let subs: Vec<_> =
                             Arc::from(&payload[(30 + type_len)..(n - 2)]);
 
                         let subs: Vec<_> =
-                            listeners.read().unwrap().packet.values().cloned().collect();
+                            adapters.read().unwrap().packet.values().cloned().collect();
                         for s in subs {
                             s.packet_received(
                                 lsf.clone(),
                         for s in subs {
                             s.packet_received(
                                 lsf.clone(),
@@ -191,7 +192,47 @@ fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Adapt
                         }
                     }
                     Ok(m17core::kiss::PORT_STREAM) => {
                         }
                     }
                     Ok(m17core::kiss::PORT_STREAM) => {
-                        // handle stream and send it to subscribers
+                        let mut payload = [0u8; 32];
+                        let Ok(n) = frame.decode_payload(&mut payload) else {
+                            debug!("failed to decode stream payload from KISS frame");
+                            continue;
+                        };
+                        if n == 30 {
+                            let lsf = LsfFrame(payload[0..30].try_into().unwrap());
+                            if lsf.crc() != 0 {
+                                debug!("initial LSF in stream did not pass CRC");
+                                continue;
+                            }
+                            stream_running = true;
+                            let subs: Vec<_> =
+                                adapters.read().unwrap().stream.values().cloned().collect();
+                            for s in subs {
+                                s.stream_began(lsf.clone());
+                            }
+                        } else if n == 26 {
+                            if !stream_running {
+                                debug!("ignoring stream data as we didn't get a valid LSF first");
+                                continue;
+                            }
+                            // TODO: parse LICH and handle the different changing subvalues META could have
+                            if m17core::crc::m17_crc(&payload[6..n]) != 0 {
+                                debug!("stream data CRC mismatch");
+                                continue;
+                            }
+                            let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
+                            let is_final = (frame_number & 0x8000) > 0;
+                            frame_number &= 0x7fff;
+                            let data: [u8; 16] = payload[8..24].try_into().unwrap();
+                            let data = Arc::new(data);
+                            if is_final {
+                                stream_running = false;
+                            }
+                            let subs: Vec<_> =
+                                adapters.read().unwrap().stream.values().cloned().collect();
+                            for s in subs {
+                                s.stream_data(frame_number, is_final, data.clone());
+                            }
+                        }
                     }
                     _ => (),
                 }
                     }
                     _ => (),
                 }