]> code.octet-stream.net Git - m17rt/blobdiff - m17app/src/app.rs
Fix timing bugs and add documentation
[m17rt] / m17app / src / app.rs
index 05e3167139eb0f9ccadd27838643d3354970aaa2..7f17df64a4f5898ba1de7cff2b0b364484f7bc55 100644 (file)
@@ -1,7 +1,9 @@
 use crate::adapter::{PacketAdapter, StreamAdapter};
 use crate::adapter::{PacketAdapter, StreamAdapter};
+use crate::link_setup::LinkSetup;
 use crate::tnc::Tnc;
 use crate::tnc::Tnc;
+use crate::{LsfFrame, PacketType, StreamFrame};
 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
-use m17core::protocol::{EncryptionType, LsfFrame, PacketType};
+use m17core::protocol::EncryptionType;
 
 use log::debug;
 use std::collections::HashMap;
 
 use log::debug;
 use std::collections::HashMap;
@@ -60,14 +62,6 @@ impl M17App {
         }
     }
 
         }
     }
 
-    pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) {
-        // hang on where do we get the LSF details from? We need a destination obviously
-        // our source address needs to be configured here too
-        // also there is possible CAN, encryption, meta payload
-
-        // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
-    }
-
     /// Create a handle that can be used to transmit data on the TNC
     pub fn tx(&self) -> TxHandle {
         TxHandle {
     /// Create a handle that can be used to transmit data on the TNC
     pub fn tx(&self) -> TxHandle {
         TxHandle {
@@ -80,6 +74,8 @@ impl M17App {
     }
 
     pub fn close(&self) {
     }
 
     pub fn close(&self) {
+        // TODO: blocking function to indicate TNC has finished closing
+        // then we could call this in a signal handler to ensure PTT is dropped before quit
         let _ = self.event_tx.send(TncControlEvent::Close);
     }
 }
         let _ = self.event_tx.send(TncControlEvent::Close);
     }
 }
@@ -89,13 +85,32 @@ pub struct TxHandle {
 }
 
 impl TxHandle {
 }
 
 impl TxHandle {
-    // add more methods here for stream outgoing
+    pub fn transmit_packet(&self, link_setup: &LinkSetup, packet_type: PacketType, payload: &[u8]) {
+        let (pack_type, pack_type_len) = packet_type.as_proto();
+        if pack_type_len + payload.len() > 823 {
+            // TODO: error for invalid transmission type
+            return;
+        }
+        let mut full_payload = vec![];
+        full_payload.extend_from_slice(&pack_type[0..pack_type_len]);
+        full_payload.extend_from_slice(payload);
+        let crc = m17core::crc::m17_crc(&full_payload);
+        full_payload.extend_from_slice(&crc.to_be_bytes());
+        let kiss_frame = KissFrame::new_full_packet(&link_setup.raw.0, &full_payload).unwrap();
+        let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
+    }
 
 
-    pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {}
+    pub fn transmit_stream_start(&self, link_setup: &LinkSetup) {
+        let kiss_frame = KissFrame::new_stream_setup(&link_setup.raw.0).unwrap();
+        let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
+    }
 
     // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
 
 
     // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
 
-    pub fn transmit_stream_next(&self, /* next payload,  */ end_of_stream: bool) {}
+    pub fn transmit_stream_next(&self, stream: &StreamFrame) {
+        let kiss_frame = KissFrame::new_stream_data(stream).unwrap();
+        let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
+    }
 }
 
 /// Synchronised structure for listeners subscribing to packets and streams.
 }
 
 /// Synchronised structure for listeners subscribing to packets and streams.
@@ -119,18 +134,20 @@ impl Adapters {
 }
 
 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
 }
 
 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
+#[allow(clippy::large_enum_variant)]
 enum TncControlEvent {
     Kiss(KissFrame),
     Start,
     Close,
 }
 
 enum TncControlEvent {
     Kiss(KissFrame),
     Start,
     Close,
 }
 
-fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Adapters>>) {
+fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
     std::thread::spawn(move || {
         let mut kiss_buffer = KissBuffer::new();
     std::thread::spawn(move || {
         let mut kiss_buffer = KissBuffer::new();
+        let mut stream_running = false;
         loop {
         loop {
-            let mut buf = kiss_buffer.buf_remaining();
-            let n = match tnc.read(&mut buf) {
+            let buf = kiss_buffer.buf_remaining();
+            let n = match tnc.read(buf) {
                 Ok(n) => n,
                 Err(_) => break,
             };
                 Ok(n) => n,
                 Err(_) => break,
             };
@@ -155,7 +172,7 @@ fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Adapt
                             continue;
                         }
                         let lsf = LsfFrame(payload[0..30].try_into().unwrap());
                             continue;
                         }
                         let lsf = LsfFrame(payload[0..30].try_into().unwrap());
-                        if lsf.crc() != 0 {
+                        if lsf.check_crc() != 0 {
                             debug!("LSF in full packet frame did not pass CRC");
                             continue;
                         }
                             debug!("LSF in full packet frame did not pass CRC");
                             continue;
                         }
@@ -181,17 +198,57 @@ fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Adapt
                             Arc::from(&payload[(30 + type_len)..(n - 2)]);
 
                         let subs: Vec<_> =
                             Arc::from(&payload[(30 + type_len)..(n - 2)]);
 
                         let subs: Vec<_> =
-                            listeners.read().unwrap().packet.values().cloned().collect();
+                            adapters.read().unwrap().packet.values().cloned().collect();
                         for s in subs {
                             s.packet_received(
                         for s in subs {
                             s.packet_received(
-                                lsf.clone(),
-                                packet_type.clone(),
+                                LinkSetup::new_raw(lsf.clone()),
+                                packet_type,
                                 packet_payload.clone(),
                             );
                         }
                     }
                     Ok(m17core::kiss::PORT_STREAM) => {
                                 packet_payload.clone(),
                             );
                         }
                     }
                     Ok(m17core::kiss::PORT_STREAM) => {
-                        // handle stream and send it to subscribers
+                        let mut payload = [0u8; 32];
+                        let Ok(n) = frame.decode_payload(&mut payload) else {
+                            debug!("failed to decode stream payload from KISS frame");
+                            continue;
+                        };
+                        if n == 30 {
+                            let lsf = LsfFrame(payload[0..30].try_into().unwrap());
+                            if lsf.check_crc() != 0 {
+                                debug!("initial LSF in stream did not pass CRC");
+                                continue;
+                            }
+                            stream_running = true;
+                            let subs: Vec<_> =
+                                adapters.read().unwrap().stream.values().cloned().collect();
+                            for s in subs {
+                                s.stream_began(LinkSetup::new_raw(lsf.clone()));
+                            }
+                        } else if n == 26 {
+                            if !stream_running {
+                                debug!("ignoring stream data as we didn't get a valid LSF first");
+                                continue;
+                            }
+                            // TODO: parse LICH and handle the different changing subvalues META could have
+                            if m17core::crc::m17_crc(&payload[6..n]) != 0 {
+                                debug!("stream data CRC mismatch");
+                                continue;
+                            }
+                            let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
+                            let is_final = (frame_number & 0x8000) > 0;
+                            frame_number &= 0x7fff;
+                            let data: [u8; 16] = payload[8..24].try_into().unwrap();
+                            let data = Arc::new(data);
+                            if is_final {
+                                stream_running = false;
+                            }
+                            let subs: Vec<_> =
+                                adapters.read().unwrap().stream.values().cloned().collect();
+                            for s in subs {
+                                s.stream_data(frame_number, is_final, data.clone());
+                            }
+                        }
                     }
                     _ => (),
                 }
                     }
                     _ => (),
                 }
@@ -200,12 +257,12 @@ fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Adapt
     });
 }
 
     });
 }
 
-fn spawn_writer<T: Tnc + Send + 'static>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
+fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
     std::thread::spawn(move || {
         while let Ok(ev) = event_rx.recv() {
             match ev {
                 TncControlEvent::Kiss(k) => {
     std::thread::spawn(move || {
         while let Ok(ev) = event_rx.recv() {
             match ev {
                 TncControlEvent::Kiss(k) => {
-                    if let Err(e) = tnc.write_all(&k.as_bytes()) {
+                    if let Err(e) = tnc.write_all(k.as_bytes()) {
                         debug!("kiss send err: {:?}", e);
                         return;
                     }
                         debug!("kiss send err: {:?}", e);
                         return;
                     }