X-Git-Url: https://code.octet-stream.net/m17rt/blobdiff_plain/e67ea96c8a3d7c23ba29c6ed91ddb451927176a1..b8702d115ebc071a9ae97e9aa9c4bc00196b5e26:/m17app/src/app.rs?ds=sidebyside diff --git a/m17app/src/app.rs b/m17app/src/app.rs index 0ad196d..8130455 100644 --- a/m17app/src/app.rs +++ b/m17app/src/app.rs @@ -1,16 +1,15 @@ +use crate::adapter::{PacketAdapter, StreamAdapter}; use crate::tnc::Tnc; -use m17core::kiss::KissFrame; -use m17core::protocol::PacketType; -use m17core::traits::{PacketListener, StreamListener}; +use m17core::kiss::{KissBuffer, KissCommand, KissFrame}; +use m17core::protocol::{EncryptionType, LsfFrame, PacketType}; use log::debug; use std::collections::HashMap; -use std::io::{Read, Write}; use std::sync::mpsc; use std::sync::{Arc, RwLock}; pub struct M17App { - listeners: Arc>, + adapters: Arc>, event_tx: mpsc::SyncSender, } @@ -18,40 +17,50 @@ impl M17App { pub fn new(mut tnc: T) -> Self { let write_tnc = tnc.try_clone().unwrap(); let (event_tx, event_rx) = mpsc::sync_channel(128); - let listeners = Arc::new(RwLock::new(Listeners::new())); + let listeners = Arc::new(RwLock::new(Adapters::new())); spawn_reader(tnc, listeners.clone()); spawn_writer(write_tnc, event_rx); Self { - listeners, + adapters: listeners, event_tx, } } - pub fn add_packet_listener(&self, listener: P) -> usize { - let mut listeners = self.listeners.write().unwrap(); - let id = listeners.next; - listeners.next += 1; - listeners.packet.insert(id, Box::new(listener)); + pub fn add_packet_adapter(&self, adapter: P) -> usize { + let adapter = Arc::new(adapter); + let mut adapters = self.adapters.write().unwrap(); + let id = adapters.next; + adapters.next += 1; + adapters.packet.insert(id, adapter.clone()); + drop(adapters); + adapter.adapter_registered(id, self.tx()); id } - pub fn add_stream_listener(&self, listener: S) -> usize { - let mut listeners = self.listeners.write().unwrap(); - let id = listeners.next; - listeners.next += 1; - listeners.stream.insert(id, Box::new(listener)); + pub fn add_stream_adapter(&self, adapter: S) -> usize { + let adapter = Arc::new(adapter); + let mut adapters = self.adapters.write().unwrap(); + let id = adapters.next; + adapters.next += 1; + adapters.stream.insert(id, adapter.clone()); + drop(adapters); + adapter.adapter_registered(id, self.tx()); id } - pub fn remove_packet_listener(&self, id: usize) { - self.listeners.write().unwrap().packet.remove(&id); + pub fn remove_packet_adapter(&self, id: usize) { + if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) { + a.adapter_removed(); + } } - pub fn remove_stream_listener(&self, id: usize) { - self.listeners.write().unwrap().stream.remove(&id); + pub fn remove_stream_adapter(&self, id: usize) { + if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) { + a.adapter_removed(); + } } - pub fn transmit_packet(&self, type_code: PacketType, payload: &[u8]) { + pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) { // hang on where do we get the LSF details from? We need a destination obviously // our source address needs to be configured here too // also there is possible CAN, encryption, meta payload @@ -59,13 +68,12 @@ impl M17App { // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data } - // add more methods here for stream outgoing - - pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {} - - // as long as there is only one TNC it is implied there is only ever one stream transmission in flight - - pub fn transmit_stream_next(&self, /* next payload, */ end_of_stream: bool) {} + /// Create a handle that can be used to transmit data on the TNC + pub fn tx(&self) -> TxHandle { + TxHandle { + event_tx: self.event_tx.clone(), + } + } pub fn start(&self) { let _ = self.event_tx.send(TncControlEvent::Start); @@ -76,17 +84,31 @@ impl M17App { } } +pub struct TxHandle { + event_tx: mpsc::SyncSender, +} + +impl TxHandle { + // add more methods here for stream outgoing + + pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {} + + // as long as there is only one TNC it is implied there is only ever one stream transmission in flight + + pub fn transmit_stream_next(&self, /* next payload, */ end_of_stream: bool) {} +} + /// Synchronised structure for listeners subscribing to packets and streams. /// /// Each listener will be notified in turn of each event. -struct Listeners { +struct Adapters { /// Identifier to be assigned to the next listener, starting from 0 next: usize, - packet: HashMap>, - stream: HashMap>, + packet: HashMap>, + stream: HashMap>, } -impl Listeners { +impl Adapters { fn new() -> Self { Self { next: 0, @@ -103,22 +125,118 @@ enum TncControlEvent { Close, } -fn spawn_reader(mut tnc: T, listeners: Arc>) { +fn spawn_reader(mut tnc: T, adapters: Arc>) { std::thread::spawn(move || { - let mut buf = [0u8; 1713]; - let mut n = 0; + let mut kiss_buffer = KissBuffer::new(); + let mut stream_running = false; loop { - // I want to call tnc.read() here - // Probably these needs a helper in m17core::kiss? It will be common to both TNC and host - - // After a read... - // if this does not start with FEND, forget all data up until first FEND - // if we start with a FEND, see if there is another FEND with at least one byte between - // for each such case, turn that FEND..=FEND slice into a KissFrame and attempt to parse it - // once all such pairs have been handled... - // move the last FEND onwards back to the start of the buffer - // - if there is no room to do so, this is an oversize frame. purge everything and carry on. - // perform next read from end + let mut buf = kiss_buffer.buf_remaining(); + let n = match tnc.read(&mut buf) { + Ok(n) => n, + Err(_) => break, + }; + kiss_buffer.did_write(n); + while let Some(frame) = kiss_buffer.next_frame() { + if frame.command() != Ok(KissCommand::DataFrame) { + continue; + } + match frame.port() { + Ok(m17core::kiss::PORT_PACKET_BASIC) => { + // no action + // we will handle the more full-featured version from from port 1 + } + Ok(m17core::kiss::PORT_PACKET_FULL) => { + let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC + let Ok(n) = frame.decode_payload(&mut payload) else { + debug!("failed to decode payload from KISS frame"); + continue; + }; + if n < 33 { + debug!("unusually short full packet frame"); + continue; + } + let lsf = LsfFrame(payload[0..30].try_into().unwrap()); + if lsf.crc() != 0 { + debug!("LSF in full packet frame did not pass CRC"); + continue; + } + if lsf.encryption_type() != EncryptionType::None { + debug!("we only understand None encryption for now - skipping packet"); + continue; + } + let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n]) + else { + debug!("failed to decode packet type"); + continue; + }; + if (n - 30 - type_len) < 2 { + debug!("packet payload too small to provide CRC"); + continue; + } + let packet_crc = m17core::crc::m17_crc(&payload[30..n]); + if packet_crc != 0 { + debug!("packet CRC does not pass"); + continue; + } + let packet_payload: Arc<[u8]> = + Arc::from(&payload[(30 + type_len)..(n - 2)]); + + let subs: Vec<_> = + adapters.read().unwrap().packet.values().cloned().collect(); + for s in subs { + s.packet_received( + lsf.clone(), + packet_type.clone(), + packet_payload.clone(), + ); + } + } + Ok(m17core::kiss::PORT_STREAM) => { + let mut payload = [0u8; 32]; + let Ok(n) = frame.decode_payload(&mut payload) else { + debug!("failed to decode stream payload from KISS frame"); + continue; + }; + if n == 30 { + let lsf = LsfFrame(payload[0..30].try_into().unwrap()); + if lsf.crc() != 0 { + debug!("initial LSF in stream did not pass CRC"); + continue; + } + stream_running = true; + let subs: Vec<_> = + adapters.read().unwrap().stream.values().cloned().collect(); + for s in subs { + s.stream_began(lsf.clone()); + } + } else if n == 26 { + if !stream_running { + debug!("ignoring stream data as we didn't get a valid LSF first"); + continue; + } + // TODO: parse LICH and handle the different changing subvalues META could have + if m17core::crc::m17_crc(&payload[6..n]) != 0 { + debug!("stream data CRC mismatch"); + continue; + } + let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]); + let is_final = (frame_number & 0x8000) > 0; + frame_number &= 0x7fff; + let data: [u8; 16] = payload[8..24].try_into().unwrap(); + let data = Arc::new(data); + if is_final { + stream_running = false; + } + let subs: Vec<_> = + adapters.read().unwrap().stream.values().cloned().collect(); + for s in subs { + s.stream_data(frame_number, is_final, data.clone()); + } + } + } + _ => (), + } + } } }); }