From 54fc21708c9121409560731f90b91e59699f4ddc Mon Sep 17 00:00:00 2001 From: Thomas Karpiniec Date: Mon, 3 Feb 2025 12:02:23 +1100 Subject: [PATCH] Simplify adapter lifecycle and introduce a lot of error propagation --- Cargo.lock | 1 + m17app/src/adapter.rs | 40 ++++----- m17app/src/app.rs | 150 +++++++++++++++++++++---------- m17app/src/error.rs | 33 ++++++- m17codec2/Cargo.toml | 2 +- m17codec2/src/lib.rs | 122 +++++++++++++++++++------ tools/m17rt-demod/src/main.rs | 4 +- tools/m17rt-mod/src/main.rs | 4 +- tools/m17rt-rxpacket/src/main.rs | 4 +- tools/m17rt-txpacket/src/main.rs | 4 +- 10 files changed, 253 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a2bcd0..e9719ee 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -471,6 +471,7 @@ dependencies = [ "log", "m17app", "m17core", + "thiserror 2.0.11", ] [[package]] diff --git a/m17app/src/adapter.rs b/m17app/src/adapter.rs index 81fdb5b..0e23924 100644 --- a/m17app/src/adapter.rs +++ b/m17app/src/adapter.rs @@ -1,4 +1,4 @@ -use crate::{app::TxHandle, link_setup::LinkSetup}; +use crate::{app::TxHandle, error::AdapterError, link_setup::LinkSetup}; use m17core::protocol::PacketType; use std::sync::Arc; @@ -9,20 +9,17 @@ use std::sync::Arc; /// a `TxHandle` when the adapter is first added to the app. This means the adapter can transmit as /// well as receive. pub trait PacketAdapter: Send + Sync + 'static { - /// This adapter was added to an `M17App`. - fn adapter_registered(&self, id: usize, handle: TxHandle) { - let _ = id; + /// TNC is online. New packets may now be received and it is okay to transmit. + fn start(&self, handle: TxHandle) -> Result<(), AdapterError> { let _ = handle; + Ok(()) } - /// This adapter was removed from an `M17App`. - fn adapter_removed(&self) {} - - /// The TNC has been started and incoming packets may now arrive. - fn tnc_started(&self) {} - - /// The TNC has been shut down. There will be no more tx/rx. - fn tnc_closed(&self) {} + /// This adapter or the whole TNC has been shut down. There will be no more tx/rx. This is a + /// permanent operation. + fn close(&self) -> Result<(), AdapterError> { + Ok(()) + } /// A packet has been received and assembled by the radio. fn packet_received(&self, link_setup: LinkSetup, packet_type: PacketType, content: Arc<[u8]>) { @@ -40,20 +37,17 @@ pub trait PacketAdapter: Send + Sync + 'static { /// There are also some lifecycle callbacks, one of which will provide a `TxHandle` when the adapter /// is first added to the app. This means the adapter can transmit as well as receive. pub trait StreamAdapter: Send + Sync + 'static { - /// This adapter was added to an `M17App`. - fn adapter_registered(&self, id: usize, handle: TxHandle) { - let _ = id; + /// TNC is online. New streams may now be received and it is okay to transmit. + fn start(&self, handle: TxHandle) -> Result<(), AdapterError> { let _ = handle; + Ok(()) } - /// This adapter was removed from an `M17App`. - fn adapter_removed(&self) {} - - /// The TNC has been started and incoming streams may now arrive. - fn tnc_started(&self) {} - - /// The TNC has been shut down. There will be no more tx/rx. - fn tnc_closed(&self) {} + /// This adapter or the whole TNC has been shut down. There will be no more tx/rx. This is a + /// permanent operation. + fn close(&self) -> Result<(), AdapterError> { + Ok(()) + } /// A new incoming stream has begun. /// diff --git a/m17app/src/app.rs b/m17app/src/app.rs index d7c3fb2..dc2138d 100644 --- a/m17app/src/app.rs +++ b/m17app/src/app.rs @@ -1,5 +1,5 @@ use crate::adapter::{PacketAdapter, StreamAdapter}; -use crate::error::M17Error; +use crate::error::{M17Error, M17Errors}; use crate::link_setup::LinkSetup; use crate::tnc::Tnc; use crate::{LsfFrame, PacketType, StreamFrame}; @@ -11,9 +11,17 @@ use std::collections::HashMap; use std::sync::mpsc; use std::sync::{Arc, RwLock}; +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +enum Lifecycle { + Setup, + Started, + Closed, +} + pub struct M17App { adapters: Arc>, event_tx: mpsc::SyncSender, + lifecycle: RwLock, } impl M17App { @@ -26,41 +34,62 @@ impl M17App { Self { adapters: listeners, event_tx, + lifecycle: RwLock::new(Lifecycle::Setup), } } - pub fn add_packet_adapter(&self, adapter: P) -> usize { + pub fn add_packet_adapter( + &self, + adapter: P, + ) -> Result { let adapter = Arc::new(adapter); let mut adapters = self.adapters.write().unwrap(); let id = adapters.next; adapters.next += 1; adapters.packet.insert(id, adapter.clone()); drop(adapters); - adapter.adapter_registered(id, self.tx()); - id + if self.lifecycle() == Lifecycle::Started { + adapter + .start(self.tx()) + .map_err(|e| M17Error::Adapter(id, e))?; + } + Ok(id) } - pub fn add_stream_adapter(&self, adapter: S) -> usize { + pub fn add_stream_adapter( + &self, + adapter: S, + ) -> Result { let adapter = Arc::new(adapter); let mut adapters = self.adapters.write().unwrap(); let id = adapters.next; adapters.next += 1; adapters.stream.insert(id, adapter.clone()); drop(adapters); - adapter.adapter_registered(id, self.tx()); - id + if self.lifecycle() == Lifecycle::Started { + adapter + .start(self.tx()) + .map_err(|e| M17Error::Adapter(id, e))?; + } + Ok(id) } - pub fn remove_packet_adapter(&self, id: usize) { + pub fn remove_packet_adapter(&self, id: usize) -> Result<(), M17Error> { if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) { - a.adapter_removed(); + if self.lifecycle() == Lifecycle::Started { + a.close().map_err(|e| M17Error::Adapter(id, e))?; + } } + Ok(()) } - pub fn remove_stream_adapter(&self, id: usize) { + pub fn remove_stream_adapter(&self, id: usize) -> Result<(), M17Error> { if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) { - a.adapter_removed(); + if self.lifecycle() == Lifecycle::Started { + a.close().map_err(|e| M17Error::Adapter(id, e))?; + } } + Ok(()) } /// Create a handle that can be used to transmit data on the TNC @@ -70,32 +99,68 @@ impl M17App { } } - pub fn start(&self) { + pub fn start(&self) -> Result<(), M17Errors> { + if self.lifecycle() != Lifecycle::Setup { + return Err(M17Errors(vec![M17Error::InvalidStart])); + } + self.set_lifecycle(Lifecycle::Started); + let mut errs = vec![]; { let adapters = self.adapters.read().unwrap(); - for (_, p) in &adapters.packet { - p.tnc_started(); + for (i, p) in &adapters.packet { + if let Err(e) = p.start(self.tx()) { + errs.push(M17Error::Adapter(*i, e)); + } } - for (_, s) in &adapters.stream { - s.tnc_started(); + for (i, s) in &adapters.stream { + if let Err(e) = s.start(self.tx()) { + errs.push(M17Error::Adapter(*i, e)); + } } } let _ = self.event_tx.send(TncControlEvent::Start); + if errs.is_empty() { + Ok(()) + } else { + Err(M17Errors(errs)) + } } - pub fn close(&self) { + pub fn close(&self) -> Result<(), M17Errors> { + if self.lifecycle() != Lifecycle::Started { + return Err(M17Errors(vec![M17Error::InvalidClose])); + } + self.set_lifecycle(Lifecycle::Closed); + let mut errs = vec![]; { let adapters = self.adapters.read().unwrap(); - for (_, p) in &adapters.packet { - p.tnc_closed(); + for (i, p) in &adapters.packet { + if let Err(e) = p.close() { + errs.push(M17Error::Adapter(*i, e)); + } } - for (_, s) in &adapters.stream { - s.tnc_closed(); + for (i, s) in &adapters.stream { + if let Err(e) = s.close() { + errs.push(M17Error::Adapter(*i, e)); + } } } // TODO: blocking function to indicate TNC has finished closing // then we could call this in a signal handler to ensure PTT is dropped before quit let _ = self.event_tx.send(TncControlEvent::Close); + if errs.is_empty() { + Ok(()) + } else { + Err(M17Errors(errs)) + } + } + + fn lifecycle(&self) -> Lifecycle { + *self.lifecycle.read().unwrap() + } + + fn set_lifecycle(&self, lifecycle: Lifecycle) { + *self.lifecycle.write().unwrap() = lifecycle; } } @@ -313,6 +378,7 @@ fn spawn_writer(mut tnc: T, event_rx: mpsc::Receiver) { #[cfg(test)] mod tests { + use crate::error::AdapterError; use crate::{link_setup::M17Address, test_util::NullTnc}; use super::*; @@ -325,47 +391,39 @@ mod tests { PacketType::Raw, &[0u8; 100], ); - assert_eq!(res, Ok(())); + assert!(matches!(res, Ok(()))); let res = app.tx().transmit_packet( &LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()), PacketType::Raw, &[0u8; 900], ); - assert_eq!( + assert!(matches!( res, Err(M17Error::PacketTooLarge { provided: 900, capacity: 822 }) - ); + )); } #[test] fn adapter_lifecycle() { #[derive(Debug, PartialEq)] enum Event { - Registered(usize), - Removed, Started, Closed, } macro_rules! event_impl { ($target:ty, $trait:ty) => { impl $trait for $target { - fn adapter_registered(&self, id: usize, _handle: TxHandle) { - self.0.send(Event::Registered(id)).unwrap(); - } - - fn adapter_removed(&self) { - self.0.send(Event::Removed).unwrap(); - } - - fn tnc_started(&self) { - self.0.send(Event::Started).unwrap(); + fn start(&self, _handle: TxHandle) -> Result<(), AdapterError> { + self.0.send(Event::Started)?; + Ok(()) } - fn tnc_closed(&self) { - self.0.send(Event::Closed).unwrap(); + fn close(&self) -> Result<(), AdapterError> { + self.0.send(Event::Closed)?; + Ok(()) } } }; @@ -381,23 +439,19 @@ mod tests { let packet = FakePacket(tx_p); let stream = FakeStream(tx_s); - let id_p = app.add_packet_adapter(packet); - let id_s = app.add_stream_adapter(stream); - app.start(); - app.close(); - app.remove_packet_adapter(id_p); - app.remove_stream_adapter(id_s); + let id_p = app.add_packet_adapter(packet).unwrap(); + let id_s = app.add_stream_adapter(stream).unwrap(); + app.start().unwrap(); + app.close().unwrap(); + app.remove_packet_adapter(id_p).unwrap(); + app.remove_stream_adapter(id_s).unwrap(); - assert_eq!(rx_p.try_recv(), Ok(Event::Registered(0))); assert_eq!(rx_p.try_recv(), Ok(Event::Started)); assert_eq!(rx_p.try_recv(), Ok(Event::Closed)); - assert_eq!(rx_p.try_recv(), Ok(Event::Removed)); assert_eq!(rx_p.try_recv(), Err(mpsc::TryRecvError::Disconnected)); - assert_eq!(rx_s.try_recv(), Ok(Event::Registered(1))); assert_eq!(rx_s.try_recv(), Ok(Event::Started)); assert_eq!(rx_s.try_recv(), Ok(Event::Closed)); - assert_eq!(rx_s.try_recv(), Ok(Event::Removed)); assert_eq!(rx_s.try_recv(), Err(mpsc::TryRecvError::Disconnected)); } } diff --git a/m17app/src/error.rs b/m17app/src/error.rs index c1bcac1..aeb58a5 100644 --- a/m17app/src/error.rs +++ b/m17app/src/error.rs @@ -1,8 +1,9 @@ -use std::path::PathBuf; +use std::{fmt::Display, path::PathBuf}; use thiserror::Error; -#[derive(Debug, Error, PartialEq, Eq, Clone)] +/// Errors originating from the M17 Rust Toolkit core +#[derive(Debug, Error)] pub enum M17Error { #[error("given callsign contains at least one character invalid in M17: {0}")] InvalidCallsignCharacters(char), @@ -29,4 +30,32 @@ pub enum M17Error { #[error("failed to read from RRC file: {0}")] RrcReadFailed(PathBuf), + + #[error("tried to start app more than once")] + InvalidStart, + + #[error("tried to close app that is not started")] + InvalidClose, + + #[error("adapter error for id {0}: {1}")] + Adapter(usize, #[source] AdapterError), +} + +pub type AdapterError = Box; + +/// Iterator over potentially multiple errors +#[derive(Debug, Error)] +pub struct M17Errors(pub(crate) Vec); +impl Iterator for M17Errors { + type Item = M17Error; + + fn next(&mut self) -> Option { + self.0.pop() + } +} + +impl Display for M17Errors { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } } diff --git a/m17codec2/Cargo.toml b/m17codec2/Cargo.toml index 01b032f..4b8014c 100755 --- a/m17codec2/Cargo.toml +++ b/m17codec2/Cargo.toml @@ -19,4 +19,4 @@ hound = "3.5.1" m17core = { version = "0.1", path = "../m17core" } m17app = { version = "0.1", path = "../m17app" } log = "0.4.22" - +thiserror = "2.0.11" diff --git a/m17codec2/src/lib.rs b/m17codec2/src/lib.rs index e4bfbac..c338f6a 100755 --- a/m17codec2/src/lib.rs +++ b/m17codec2/src/lib.rs @@ -8,6 +8,7 @@ use cpal::{Sample, SampleFormat, SampleRate}; use log::debug; use m17app::adapter::StreamAdapter; use m17app::app::TxHandle; +use m17app::error::AdapterError; use m17app::link_setup::LinkSetup; use m17app::link_setup::M17Address; use m17app::StreamFrame; @@ -22,6 +23,7 @@ use std::sync::{ }; use std::time::Duration; use std::time::Instant; +use thiserror::Error; pub fn decode_codec2>(data: &[u8], out_path: P) -> Vec { let codec2 = Codec2::new(Codec2Mode::MODE_3200); @@ -79,26 +81,26 @@ struct AdapterState { } impl StreamAdapter for Codec2Adapter { - fn adapter_registered(&self, _id: usize, handle: TxHandle) { + fn start(&self, handle: TxHandle) -> Result<(), AdapterError> { self.state.lock().unwrap().tx = Some(handle); let (end_tx, end_rx) = channel(); + let (setup_tx, setup_rx) = channel(); let state = self.state.clone(); let output_card = self.output_card.clone(); - std::thread::spawn(move || stream_thread(end_rx, state, output_card)); + std::thread::spawn(move || stream_thread(end_rx, setup_tx, state, output_card)); self.state.lock().unwrap().end_tx = Some(end_tx); + // Propagate any errors arising in the thread + Ok(setup_rx.recv()??) } - fn adapter_removed(&self) { + fn close(&self) -> Result<(), AdapterError> { let mut state = self.state.lock().unwrap(); state.tx = None; state.end_tx = None; + Ok(()) } - fn tnc_started(&self) {} - - fn tnc_closed(&self) {} - fn stream_began(&self, _link_setup: LinkSetup) { // for now we will assume: // - unencrypted @@ -133,34 +135,78 @@ fn output_cb(data: &mut [i16], state: &Mutex) { } /// Create and manage the stream from a dedicated thread since it's `!Send` -fn stream_thread(end: Receiver<()>, state: Arc>, output_card: String) { +fn stream_thread( + end: Receiver<()>, + setup_tx: Sender>, + state: Arc>, + output_card: String, +) { let host = cpal::default_host(); - let device = host + let device = match host .output_devices() .unwrap() .find(|d| d.name().unwrap() == output_card) - .unwrap(); - let mut configs = device.supported_output_configs().unwrap(); + { + Some(d) => d, + None => { + let _ = setup_tx.send(Err(M17Codec2Error::CardUnavailable(output_card).into())); + return; + } + }; + let mut configs = match device.supported_output_configs() { + Ok(c) => c, + Err(e) => { + let _ = setup_tx.send(Err(M17Codec2Error::OutputConfigsUnavailable( + output_card, + e, + ) + .into())); + return; + } + }; // TODO: channels == 1 doesn't work on a Raspberry Pi // make this configurable and support interleaving LRLR stereo samples if using 2 channels - let config = configs - .find(|c| c.channels() == 1 && c.sample_format() == SampleFormat::I16) - .unwrap() - .with_sample_rate(SampleRate(8000)); - let stream = device - .build_output_stream( - &config.into(), - move |data: &mut [i16], _info: &cpal::OutputCallbackInfo| { - output_cb(data, &state); - }, - |e| { - // trigger end_tx here? always more edge cases - debug!("error occurred in codec2 playback: {e:?}"); - }, - None, - ) - .unwrap(); - stream.play().unwrap(); + let config = match configs.find(|c| c.channels() == 1 && c.sample_format() == SampleFormat::I16) + { + Some(c) => c, + None => { + let _ = setup_tx.send(Err( + M17Codec2Error::SupportedOutputUnavailable(output_card).into() + )); + return; + } + }; + + let config = config.with_sample_rate(SampleRate(8000)); + let stream = match device.build_output_stream( + &config.into(), + move |data: &mut [i16], _info: &cpal::OutputCallbackInfo| { + output_cb(data, &state); + }, + |e| { + // trigger end_tx here? always more edge cases + debug!("error occurred in codec2 playback: {e:?}"); + }, + None, + ) { + Ok(s) => s, + Err(e) => { + let _ = setup_tx.send(Err( + M17Codec2Error::OutputStreamBuildError(output_card, e).into() + )); + return; + } + }; + match stream.play() { + Ok(()) => (), + Err(e) => { + let _ = setup_tx.send(Err( + M17Codec2Error::OutputStreamPlayError(output_card, e).into() + )); + return; + } + } + let _ = setup_tx.send(Ok(())); let _ = end.recv(); // it seems concrete impls of Stream have a Drop implementation that will handle termination } @@ -232,3 +278,21 @@ impl WavePlayer { } } } + +#[derive(Debug, Error)] +pub enum M17Codec2Error { + #[error("selected card '{0}' does not exist or is in use")] + CardUnavailable(String), + + #[error("selected card '{0}' failed to list available output configs: '{1}'")] + OutputConfigsUnavailable(String, #[source] cpal::SupportedStreamConfigsError), + + #[error("selected card '{0}' did not offer a compatible output config type, either due to hardware limitations or because it is currently in use")] + SupportedOutputUnavailable(String), + + #[error("selected card '{0}' was unable to build an output stream: '{1}'")] + OutputStreamBuildError(String, #[source] cpal::BuildStreamError), + + #[error("selected card '{0}' was unable to play an output stream: '{1}'")] + OutputStreamPlayError(String, #[source] cpal::PlayStreamError), +} diff --git a/tools/m17rt-demod/src/main.rs b/tools/m17rt-demod/src/main.rs index af5fafd..bcc5468 100755 --- a/tools/m17rt-demod/src/main.rs +++ b/tools/m17rt-demod/src/main.rs @@ -7,8 +7,8 @@ pub fn demod_test() { let soundcard = Soundcard::new("plughw:CARD=Device,DEV=0").unwrap(); let soundmodem = Soundmodem::new(soundcard.input(), NullOutputSink::new(), NullPtt::new()); let app = M17App::new(soundmodem); - app.add_stream_adapter(Codec2Adapter::new()); - app.start(); + app.add_stream_adapter(Codec2Adapter::new()).unwrap(); + app.start().unwrap(); loop { std::thread::park(); diff --git a/tools/m17rt-mod/src/main.rs b/tools/m17rt-mod/src/main.rs index b5708e9..26b7aba 100644 --- a/tools/m17rt-mod/src/main.rs +++ b/tools/m17rt-mod/src/main.rs @@ -12,7 +12,7 @@ pub fn mod_test() { let ptt = SerialPtt::new("/dev/ttyUSB0", PttPin::Rts); let soundmodem = Soundmodem::new(soundcard.input(), soundcard.output(), ptt); let app = M17App::new(soundmodem); - app.start(); + app.start().unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); println!("Beginning playback..."); WavePlayer::play( @@ -24,7 +24,7 @@ pub fn mod_test() { ); println!("Playback complete."); std::thread::sleep(std::time::Duration::from_secs(1)); - app.close(); + app.close().unwrap(); } fn main() { diff --git a/tools/m17rt-rxpacket/src/main.rs b/tools/m17rt-rxpacket/src/main.rs index 012cc46..11ee69e 100755 --- a/tools/m17rt-rxpacket/src/main.rs +++ b/tools/m17rt-rxpacket/src/main.rs @@ -10,8 +10,8 @@ fn main() { let soundcard = Soundcard::new("plughw:CARD=Device,DEV=0").unwrap(); let soundmodem = Soundmodem::new(soundcard.input(), NullOutputSink::new(), NullPtt::new()); let app = M17App::new(soundmodem); - app.add_packet_adapter(PacketPrinter); - app.start(); + app.add_packet_adapter(PacketPrinter).unwrap(); + app.start().unwrap(); loop { std::thread::park(); diff --git a/tools/m17rt-txpacket/src/main.rs b/tools/m17rt-txpacket/src/main.rs index 3e72e23..37f86d9 100644 --- a/tools/m17rt-txpacket/src/main.rs +++ b/tools/m17rt-txpacket/src/main.rs @@ -12,7 +12,7 @@ fn main() { let soundmodem = Soundmodem::new(soundcard.input(), soundcard.output(), ptt); let app = M17App::new(soundmodem); - app.start(); + app.start().unwrap(); println!("Transmitting packet..."); let source = M17Address::from_callsign("VK7XT-1").unwrap(); @@ -24,5 +24,5 @@ fn main() { .unwrap(); std::thread::sleep(std::time::Duration::from_secs(1)); - app.close(); + app.close().unwrap(); } -- 2.39.5