X-Git-Url: https://code.octet-stream.net/m17rt/blobdiff_plain/9e40113187a3eb54335de0277f8038ee35d9428a..1a444762d8fd7d48e4f56a87c6bd77f837522d5d:/m17app/src/app.rs?ds=sidebyside diff --git a/m17app/src/app.rs b/m17app/src/app.rs index 05e3167..7d363dd 100644 --- a/m17app/src/app.rs +++ b/m17app/src/app.rs @@ -1,7 +1,8 @@ use crate::adapter::{PacketAdapter, StreamAdapter}; +use crate::link_setup::LinkSetup; use crate::tnc::Tnc; use m17core::kiss::{KissBuffer, KissCommand, KissFrame}; -use m17core::protocol::{EncryptionType, LsfFrame, PacketType}; +use m17core::protocol::{EncryptionType, LsfFrame, PacketType, StreamFrame}; use log::debug; use std::collections::HashMap; @@ -60,14 +61,6 @@ impl M17App { } } - pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) { - // hang on where do we get the LSF details from? We need a destination obviously - // our source address needs to be configured here too - // also there is possible CAN, encryption, meta payload - - // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data - } - /// Create a handle that can be used to transmit data on the TNC pub fn tx(&self) -> TxHandle { TxHandle { @@ -80,6 +73,8 @@ impl M17App { } pub fn close(&self) { + // TODO: blocking function to indicate TNC has finished closing + // then we could call this in a signal handler to ensure PTT is dropped before quit let _ = self.event_tx.send(TncControlEvent::Close); } } @@ -89,13 +84,32 @@ pub struct TxHandle { } impl TxHandle { - // add more methods here for stream outgoing + pub fn transmit_packet(&self, link_setup: &LinkSetup, packet_type: PacketType, payload: &[u8]) { + let (pack_type, pack_type_len) = packet_type.as_proto(); + if pack_type_len + payload.len() > 823 { + // TODO: error for invalid transmission type + return; + } + let mut full_payload = vec![]; + full_payload.extend_from_slice(&pack_type[0..pack_type_len]); + full_payload.extend_from_slice(&payload); + let crc = m17core::crc::m17_crc(&full_payload); + full_payload.extend_from_slice(&crc.to_be_bytes()); + let kiss_frame = KissFrame::new_full_packet(&link_setup.raw.0, &full_payload).unwrap(); + let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame)); + } - pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {} + pub fn transmit_stream_start(&self, link_setup: &LinkSetup) { + let kiss_frame = KissFrame::new_stream_setup(&link_setup.raw.0).unwrap(); + let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame)); + } // as long as there is only one TNC it is implied there is only ever one stream transmission in flight - pub fn transmit_stream_next(&self, /* next payload, */ end_of_stream: bool) {} + pub fn transmit_stream_next(&self, stream: &StreamFrame) { + let kiss_frame = KissFrame::new_stream_data(&stream).unwrap(); + let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame)); + } } /// Synchronised structure for listeners subscribing to packets and streams. @@ -125,9 +139,10 @@ enum TncControlEvent { Close, } -fn spawn_reader(mut tnc: T, listeners: Arc>) { +fn spawn_reader(mut tnc: T, adapters: Arc>) { std::thread::spawn(move || { let mut kiss_buffer = KissBuffer::new(); + let mut stream_running = false; loop { let mut buf = kiss_buffer.buf_remaining(); let n = match tnc.read(&mut buf) { @@ -155,7 +170,7 @@ fn spawn_reader(mut tnc: T, listeners: Arc(mut tnc: T, listeners: Arc = - listeners.read().unwrap().packet.values().cloned().collect(); + adapters.read().unwrap().packet.values().cloned().collect(); for s in subs { s.packet_received( - lsf.clone(), + LinkSetup::new_raw(lsf.clone()), packet_type.clone(), packet_payload.clone(), ); } } Ok(m17core::kiss::PORT_STREAM) => { - // handle stream and send it to subscribers + let mut payload = [0u8; 32]; + let Ok(n) = frame.decode_payload(&mut payload) else { + debug!("failed to decode stream payload from KISS frame"); + continue; + }; + if n == 30 { + let lsf = LsfFrame(payload[0..30].try_into().unwrap()); + if lsf.check_crc() != 0 { + debug!("initial LSF in stream did not pass CRC"); + continue; + } + stream_running = true; + let subs: Vec<_> = + adapters.read().unwrap().stream.values().cloned().collect(); + for s in subs { + s.stream_began(LinkSetup::new_raw(lsf.clone())); + } + } else if n == 26 { + if !stream_running { + debug!("ignoring stream data as we didn't get a valid LSF first"); + continue; + } + // TODO: parse LICH and handle the different changing subvalues META could have + if m17core::crc::m17_crc(&payload[6..n]) != 0 { + debug!("stream data CRC mismatch"); + continue; + } + let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]); + let is_final = (frame_number & 0x8000) > 0; + frame_number &= 0x7fff; + let data: [u8; 16] = payload[8..24].try_into().unwrap(); + let data = Arc::new(data); + if is_final { + stream_running = false; + } + let subs: Vec<_> = + adapters.read().unwrap().stream.values().cloned().collect(); + for s in subs { + s.stream_data(frame_number, is_final, data.clone()); + } + } } _ => (), } @@ -200,7 +255,7 @@ fn spawn_reader(mut tnc: T, listeners: Arc(mut tnc: T, event_rx: mpsc::Receiver) { +fn spawn_writer(mut tnc: T, event_rx: mpsc::Receiver) { std::thread::spawn(move || { while let Ok(ev) = event_rx.recv() { match ev {