X-Git-Url: https://code.octet-stream.net/m17rt/blobdiff_plain/9058451e46e4d36264282abe381aa9b6fd2c773f..7c22bfc75bf2e3b5ca553ac2baf251790f57a10d:/m17app/src/app.rs diff --git a/m17app/src/app.rs b/m17app/src/app.rs index fbc9a08..d7c3fb2 100644 --- a/m17app/src/app.rs +++ b/m17app/src/app.rs @@ -1,7 +1,10 @@ use crate::adapter::{PacketAdapter, StreamAdapter}; +use crate::error::M17Error; +use crate::link_setup::LinkSetup; use crate::tnc::Tnc; +use crate::{LsfFrame, PacketType, StreamFrame}; use m17core::kiss::{KissBuffer, KissCommand, KissFrame}; -use m17core::protocol::{EncryptionType, LsfFrame, PacketType}; +use m17core::protocol::EncryptionType; use log::debug; use std::collections::HashMap; @@ -68,10 +71,30 @@ impl M17App { } pub fn start(&self) { + { + let adapters = self.adapters.read().unwrap(); + for (_, p) in &adapters.packet { + p.tnc_started(); + } + for (_, s) in &adapters.stream { + s.tnc_started(); + } + } let _ = self.event_tx.send(TncControlEvent::Start); } pub fn close(&self) { + { + let adapters = self.adapters.read().unwrap(); + for (_, p) in &adapters.packet { + p.tnc_closed(); + } + for (_, s) in &adapters.stream { + s.tnc_closed(); + } + } + // TODO: blocking function to indicate TNC has finished closing + // then we could call this in a signal handler to ensure PTT is dropped before quit let _ = self.event_tx.send(TncControlEvent::Close); } } @@ -81,21 +104,40 @@ pub struct TxHandle { } impl TxHandle { - pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) { - // hang on where do we get the LSF details from? We need a destination obviously - // our source address needs to be configured here too - // also there is possible CAN, encryption, meta payload - - // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data + pub fn transmit_packet( + &self, + link_setup: &LinkSetup, + packet_type: PacketType, + payload: &[u8], + ) -> Result<(), M17Error> { + let (pack_type, pack_type_len) = packet_type.as_proto(); + if pack_type_len + payload.len() > 823 { + return Err(M17Error::PacketTooLarge { + provided: payload.len(), + capacity: 823 - pack_type_len, + }); + } + let mut full_payload = vec![]; + full_payload.extend_from_slice(&pack_type[0..pack_type_len]); + full_payload.extend_from_slice(payload); + let crc = m17core::crc::m17_crc(&full_payload); + full_payload.extend_from_slice(&crc.to_be_bytes()); + let kiss_frame = KissFrame::new_full_packet(&link_setup.raw.0, &full_payload).unwrap(); + let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame)); + Ok(()) } - // add more methods here for stream outgoing - - pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {} + pub fn transmit_stream_start(&self, link_setup: &LinkSetup) { + let kiss_frame = KissFrame::new_stream_setup(&link_setup.raw.0).unwrap(); + let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame)); + } // as long as there is only one TNC it is implied there is only ever one stream transmission in flight - pub fn transmit_stream_next(&self, /* next payload, */ end_of_stream: bool) {} + pub fn transmit_stream_next(&self, stream: &StreamFrame) { + let kiss_frame = KissFrame::new_stream_data(stream).unwrap(); + let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame)); + } } /// Synchronised structure for listeners subscribing to packets and streams. @@ -119,6 +161,7 @@ impl Adapters { } /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it. +#[allow(clippy::large_enum_variant)] enum TncControlEvent { Kiss(KissFrame), Start, @@ -130,8 +173,8 @@ fn spawn_reader(mut tnc: T, adapters: Arc>) { let mut kiss_buffer = KissBuffer::new(); let mut stream_running = false; loop { - let mut buf = kiss_buffer.buf_remaining(); - let n = match tnc.read(&mut buf) { + let buf = kiss_buffer.buf_remaining(); + let n = match tnc.read(buf) { Ok(n) => n, Err(_) => break, }; @@ -185,8 +228,8 @@ fn spawn_reader(mut tnc: T, adapters: Arc>) { adapters.read().unwrap().packet.values().cloned().collect(); for s in subs { s.packet_received( - lsf.clone(), - packet_type.clone(), + LinkSetup::new_raw(lsf.clone()), + packet_type, packet_payload.clone(), ); } @@ -207,7 +250,7 @@ fn spawn_reader(mut tnc: T, adapters: Arc>) { let subs: Vec<_> = adapters.read().unwrap().stream.values().cloned().collect(); for s in subs { - s.stream_began(lsf.clone()); + s.stream_began(LinkSetup::new_raw(lsf.clone())); } } else if n == 26 { if !stream_running { @@ -246,7 +289,7 @@ fn spawn_writer(mut tnc: T, event_rx: mpsc::Receiver) { while let Ok(ev) = event_rx.recv() { match ev { TncControlEvent::Kiss(k) => { - if let Err(e) = tnc.write_all(&k.as_bytes()) { + if let Err(e) = tnc.write_all(k.as_bytes()) { debug!("kiss send err: {:?}", e); return; } @@ -267,3 +310,94 @@ fn spawn_writer(mut tnc: T, event_rx: mpsc::Receiver) { } }); } + +#[cfg(test)] +mod tests { + use crate::{link_setup::M17Address, test_util::NullTnc}; + + use super::*; + + #[test] + fn packet_payload_len() { + let app = M17App::new(NullTnc); + let res = app.tx().transmit_packet( + &LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()), + PacketType::Raw, + &[0u8; 100], + ); + assert_eq!(res, Ok(())); + let res = app.tx().transmit_packet( + &LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()), + PacketType::Raw, + &[0u8; 900], + ); + assert_eq!( + res, + Err(M17Error::PacketTooLarge { + provided: 900, + capacity: 822 + }) + ); + } + + #[test] + fn adapter_lifecycle() { + #[derive(Debug, PartialEq)] + enum Event { + Registered(usize), + Removed, + Started, + Closed, + } + macro_rules! event_impl { + ($target:ty, $trait:ty) => { + impl $trait for $target { + fn adapter_registered(&self, id: usize, _handle: TxHandle) { + self.0.send(Event::Registered(id)).unwrap(); + } + + fn adapter_removed(&self) { + self.0.send(Event::Removed).unwrap(); + } + + fn tnc_started(&self) { + self.0.send(Event::Started).unwrap(); + } + + fn tnc_closed(&self) { + self.0.send(Event::Closed).unwrap(); + } + } + }; + } + struct FakePacket(mpsc::SyncSender); + struct FakeStream(mpsc::SyncSender); + event_impl!(FakePacket, PacketAdapter); + event_impl!(FakeStream, StreamAdapter); + + let app = M17App::new(NullTnc); + let (tx_p, rx_p) = mpsc::sync_channel(128); + let (tx_s, rx_s) = mpsc::sync_channel(128); + let packet = FakePacket(tx_p); + let stream = FakeStream(tx_s); + + let id_p = app.add_packet_adapter(packet); + let id_s = app.add_stream_adapter(stream); + app.start(); + app.close(); + app.remove_packet_adapter(id_p); + app.remove_stream_adapter(id_s); + + assert_eq!(rx_p.try_recv(), Ok(Event::Registered(0))); + assert_eq!(rx_p.try_recv(), Ok(Event::Started)); + assert_eq!(rx_p.try_recv(), Ok(Event::Closed)); + assert_eq!(rx_p.try_recv(), Ok(Event::Removed)); + assert_eq!(rx_p.try_recv(), Err(mpsc::TryRecvError::Disconnected)); + + assert_eq!(rx_s.try_recv(), Ok(Event::Registered(1))); + assert_eq!(rx_s.try_recv(), Ok(Event::Started)); + assert_eq!(rx_s.try_recv(), Ok(Event::Closed)); + assert_eq!(rx_s.try_recv(), Ok(Event::Removed)); + assert_eq!(rx_s.try_recv(), Err(mpsc::TryRecvError::Disconnected)); + } +}