"log",
"m17app",
"m17core",
+ "thiserror 2.0.11",
]
[[package]]
-use crate::{app::TxHandle, link_setup::LinkSetup};
+use crate::{app::TxHandle, error::AdapterError, link_setup::LinkSetup};
use m17core::protocol::PacketType;
use std::sync::Arc;
/// a `TxHandle` when the adapter is first added to the app. This means the adapter can transmit as
/// well as receive.
pub trait PacketAdapter: Send + Sync + 'static {
- /// This adapter was added to an `M17App`.
- fn adapter_registered(&self, id: usize, handle: TxHandle) {
- let _ = id;
+ /// TNC is online. New packets may now be received and it is okay to transmit.
+ fn start(&self, handle: TxHandle) -> Result<(), AdapterError> {
let _ = handle;
+ Ok(())
}
- /// This adapter was removed from an `M17App`.
- fn adapter_removed(&self) {}
-
- /// The TNC has been started and incoming packets may now arrive.
- fn tnc_started(&self) {}
-
- /// The TNC has been shut down. There will be no more tx/rx.
- fn tnc_closed(&self) {}
+ /// This adapter or the whole TNC has been shut down. There will be no more tx/rx. This is a
+ /// permanent operation.
+ fn close(&self) -> Result<(), AdapterError> {
+ Ok(())
+ }
/// A packet has been received and assembled by the radio.
fn packet_received(&self, link_setup: LinkSetup, packet_type: PacketType, content: Arc<[u8]>) {
/// There are also some lifecycle callbacks, one of which will provide a `TxHandle` when the adapter
/// is first added to the app. This means the adapter can transmit as well as receive.
pub trait StreamAdapter: Send + Sync + 'static {
- /// This adapter was added to an `M17App`.
- fn adapter_registered(&self, id: usize, handle: TxHandle) {
- let _ = id;
+ /// TNC is online. New streams may now be received and it is okay to transmit.
+ fn start(&self, handle: TxHandle) -> Result<(), AdapterError> {
let _ = handle;
+ Ok(())
}
- /// This adapter was removed from an `M17App`.
- fn adapter_removed(&self) {}
-
- /// The TNC has been started and incoming streams may now arrive.
- fn tnc_started(&self) {}
-
- /// The TNC has been shut down. There will be no more tx/rx.
- fn tnc_closed(&self) {}
+ /// This adapter or the whole TNC has been shut down. There will be no more tx/rx. This is a
+ /// permanent operation.
+ fn close(&self) -> Result<(), AdapterError> {
+ Ok(())
+ }
/// A new incoming stream has begun.
///
use crate::adapter::{PacketAdapter, StreamAdapter};
-use crate::error::M17Error;
+use crate::error::{M17Error, M17Errors};
use crate::link_setup::LinkSetup;
use crate::tnc::Tnc;
use crate::{LsfFrame, PacketType, StreamFrame};
use std::sync::mpsc;
use std::sync::{Arc, RwLock};
+#[derive(Debug, Clone, PartialEq, Eq, Copy)]
+enum Lifecycle {
+ Setup,
+ Started,
+ Closed,
+}
+
pub struct M17App {
adapters: Arc<RwLock<Adapters>>,
event_tx: mpsc::SyncSender<TncControlEvent>,
+ lifecycle: RwLock<Lifecycle>,
}
impl M17App {
Self {
adapters: listeners,
event_tx,
+ lifecycle: RwLock::new(Lifecycle::Setup),
}
}
- pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
+ pub fn add_packet_adapter<P: PacketAdapter + 'static>(
+ &self,
+ adapter: P,
+ ) -> Result<usize, M17Error> {
let adapter = Arc::new(adapter);
let mut adapters = self.adapters.write().unwrap();
let id = adapters.next;
adapters.next += 1;
adapters.packet.insert(id, adapter.clone());
drop(adapters);
- adapter.adapter_registered(id, self.tx());
- id
+ if self.lifecycle() == Lifecycle::Started {
+ adapter
+ .start(self.tx())
+ .map_err(|e| M17Error::Adapter(id, e))?;
+ }
+ Ok(id)
}
- pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
+ pub fn add_stream_adapter<S: StreamAdapter + 'static>(
+ &self,
+ adapter: S,
+ ) -> Result<usize, M17Error> {
let adapter = Arc::new(adapter);
let mut adapters = self.adapters.write().unwrap();
let id = adapters.next;
adapters.next += 1;
adapters.stream.insert(id, adapter.clone());
drop(adapters);
- adapter.adapter_registered(id, self.tx());
- id
+ if self.lifecycle() == Lifecycle::Started {
+ adapter
+ .start(self.tx())
+ .map_err(|e| M17Error::Adapter(id, e))?;
+ }
+ Ok(id)
}
- pub fn remove_packet_adapter(&self, id: usize) {
+ pub fn remove_packet_adapter(&self, id: usize) -> Result<(), M17Error> {
if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
- a.adapter_removed();
+ if self.lifecycle() == Lifecycle::Started {
+ a.close().map_err(|e| M17Error::Adapter(id, e))?;
+ }
}
+ Ok(())
}
- pub fn remove_stream_adapter(&self, id: usize) {
+ pub fn remove_stream_adapter(&self, id: usize) -> Result<(), M17Error> {
if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
- a.adapter_removed();
+ if self.lifecycle() == Lifecycle::Started {
+ a.close().map_err(|e| M17Error::Adapter(id, e))?;
+ }
}
+ Ok(())
}
/// Create a handle that can be used to transmit data on the TNC
}
}
- pub fn start(&self) {
+ pub fn start(&self) -> Result<(), M17Errors> {
+ if self.lifecycle() != Lifecycle::Setup {
+ return Err(M17Errors(vec![M17Error::InvalidStart]));
+ }
+ self.set_lifecycle(Lifecycle::Started);
+ let mut errs = vec![];
{
let adapters = self.adapters.read().unwrap();
- for (_, p) in &adapters.packet {
- p.tnc_started();
+ for (i, p) in &adapters.packet {
+ if let Err(e) = p.start(self.tx()) {
+ errs.push(M17Error::Adapter(*i, e));
+ }
}
- for (_, s) in &adapters.stream {
- s.tnc_started();
+ for (i, s) in &adapters.stream {
+ if let Err(e) = s.start(self.tx()) {
+ errs.push(M17Error::Adapter(*i, e));
+ }
}
}
let _ = self.event_tx.send(TncControlEvent::Start);
+ if errs.is_empty() {
+ Ok(())
+ } else {
+ Err(M17Errors(errs))
+ }
}
- pub fn close(&self) {
+ pub fn close(&self) -> Result<(), M17Errors> {
+ if self.lifecycle() != Lifecycle::Started {
+ return Err(M17Errors(vec![M17Error::InvalidClose]));
+ }
+ self.set_lifecycle(Lifecycle::Closed);
+ let mut errs = vec![];
{
let adapters = self.adapters.read().unwrap();
- for (_, p) in &adapters.packet {
- p.tnc_closed();
+ for (i, p) in &adapters.packet {
+ if let Err(e) = p.close() {
+ errs.push(M17Error::Adapter(*i, e));
+ }
}
- for (_, s) in &adapters.stream {
- s.tnc_closed();
+ for (i, s) in &adapters.stream {
+ if let Err(e) = s.close() {
+ errs.push(M17Error::Adapter(*i, e));
+ }
}
}
// TODO: blocking function to indicate TNC has finished closing
// then we could call this in a signal handler to ensure PTT is dropped before quit
let _ = self.event_tx.send(TncControlEvent::Close);
+ if errs.is_empty() {
+ Ok(())
+ } else {
+ Err(M17Errors(errs))
+ }
+ }
+
+ fn lifecycle(&self) -> Lifecycle {
+ *self.lifecycle.read().unwrap()
+ }
+
+ fn set_lifecycle(&self, lifecycle: Lifecycle) {
+ *self.lifecycle.write().unwrap() = lifecycle;
}
}
#[cfg(test)]
mod tests {
+ use crate::error::AdapterError;
use crate::{link_setup::M17Address, test_util::NullTnc};
use super::*;
PacketType::Raw,
&[0u8; 100],
);
- assert_eq!(res, Ok(()));
+ assert!(matches!(res, Ok(())));
let res = app.tx().transmit_packet(
&LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()),
PacketType::Raw,
&[0u8; 900],
);
- assert_eq!(
+ assert!(matches!(
res,
Err(M17Error::PacketTooLarge {
provided: 900,
capacity: 822
})
- );
+ ));
}
#[test]
fn adapter_lifecycle() {
#[derive(Debug, PartialEq)]
enum Event {
- Registered(usize),
- Removed,
Started,
Closed,
}
macro_rules! event_impl {
($target:ty, $trait:ty) => {
impl $trait for $target {
- fn adapter_registered(&self, id: usize, _handle: TxHandle) {
- self.0.send(Event::Registered(id)).unwrap();
- }
-
- fn adapter_removed(&self) {
- self.0.send(Event::Removed).unwrap();
- }
-
- fn tnc_started(&self) {
- self.0.send(Event::Started).unwrap();
+ fn start(&self, _handle: TxHandle) -> Result<(), AdapterError> {
+ self.0.send(Event::Started)?;
+ Ok(())
}
- fn tnc_closed(&self) {
- self.0.send(Event::Closed).unwrap();
+ fn close(&self) -> Result<(), AdapterError> {
+ self.0.send(Event::Closed)?;
+ Ok(())
}
}
};
let packet = FakePacket(tx_p);
let stream = FakeStream(tx_s);
- let id_p = app.add_packet_adapter(packet);
- let id_s = app.add_stream_adapter(stream);
- app.start();
- app.close();
- app.remove_packet_adapter(id_p);
- app.remove_stream_adapter(id_s);
+ let id_p = app.add_packet_adapter(packet).unwrap();
+ let id_s = app.add_stream_adapter(stream).unwrap();
+ app.start().unwrap();
+ app.close().unwrap();
+ app.remove_packet_adapter(id_p).unwrap();
+ app.remove_stream_adapter(id_s).unwrap();
- assert_eq!(rx_p.try_recv(), Ok(Event::Registered(0)));
assert_eq!(rx_p.try_recv(), Ok(Event::Started));
assert_eq!(rx_p.try_recv(), Ok(Event::Closed));
- assert_eq!(rx_p.try_recv(), Ok(Event::Removed));
assert_eq!(rx_p.try_recv(), Err(mpsc::TryRecvError::Disconnected));
- assert_eq!(rx_s.try_recv(), Ok(Event::Registered(1)));
assert_eq!(rx_s.try_recv(), Ok(Event::Started));
assert_eq!(rx_s.try_recv(), Ok(Event::Closed));
- assert_eq!(rx_s.try_recv(), Ok(Event::Removed));
assert_eq!(rx_s.try_recv(), Err(mpsc::TryRecvError::Disconnected));
}
}
-use std::path::PathBuf;
+use std::{fmt::Display, path::PathBuf};
use thiserror::Error;
-#[derive(Debug, Error, PartialEq, Eq, Clone)]
+/// Errors originating from the M17 Rust Toolkit core
+#[derive(Debug, Error)]
pub enum M17Error {
#[error("given callsign contains at least one character invalid in M17: {0}")]
InvalidCallsignCharacters(char),
#[error("failed to read from RRC file: {0}")]
RrcReadFailed(PathBuf),
+
+ #[error("tried to start app more than once")]
+ InvalidStart,
+
+ #[error("tried to close app that is not started")]
+ InvalidClose,
+
+ #[error("adapter error for id {0}: {1}")]
+ Adapter(usize, #[source] AdapterError),
+}
+
+pub type AdapterError = Box<dyn std::error::Error + Sync + Send + 'static>;
+
+/// Iterator over potentially multiple errors
+#[derive(Debug, Error)]
+pub struct M17Errors(pub(crate) Vec<M17Error>);
+impl Iterator for M17Errors {
+ type Item = M17Error;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.0.pop()
+ }
+}
+
+impl Display for M17Errors {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{:?}", self.0)
+ }
}
m17core = { version = "0.1", path = "../m17core" }
m17app = { version = "0.1", path = "../m17app" }
log = "0.4.22"
-
+thiserror = "2.0.11"
use log::debug;
use m17app::adapter::StreamAdapter;
use m17app::app::TxHandle;
+use m17app::error::AdapterError;
use m17app::link_setup::LinkSetup;
use m17app::link_setup::M17Address;
use m17app::StreamFrame;
};
use std::time::Duration;
use std::time::Instant;
+use thiserror::Error;
pub fn decode_codec2<P: AsRef<Path>>(data: &[u8], out_path: P) -> Vec<i16> {
let codec2 = Codec2::new(Codec2Mode::MODE_3200);
}
impl StreamAdapter for Codec2Adapter {
- fn adapter_registered(&self, _id: usize, handle: TxHandle) {
+ fn start(&self, handle: TxHandle) -> Result<(), AdapterError> {
self.state.lock().unwrap().tx = Some(handle);
let (end_tx, end_rx) = channel();
+ let (setup_tx, setup_rx) = channel();
let state = self.state.clone();
let output_card = self.output_card.clone();
- std::thread::spawn(move || stream_thread(end_rx, state, output_card));
+ std::thread::spawn(move || stream_thread(end_rx, setup_tx, state, output_card));
self.state.lock().unwrap().end_tx = Some(end_tx);
+ // Propagate any errors arising in the thread
+ Ok(setup_rx.recv()??)
}
- fn adapter_removed(&self) {
+ fn close(&self) -> Result<(), AdapterError> {
let mut state = self.state.lock().unwrap();
state.tx = None;
state.end_tx = None;
+ Ok(())
}
- fn tnc_started(&self) {}
-
- fn tnc_closed(&self) {}
-
fn stream_began(&self, _link_setup: LinkSetup) {
// for now we will assume:
// - unencrypted
}
/// Create and manage the stream from a dedicated thread since it's `!Send`
-fn stream_thread(end: Receiver<()>, state: Arc<Mutex<AdapterState>>, output_card: String) {
+fn stream_thread(
+ end: Receiver<()>,
+ setup_tx: Sender<Result<(), AdapterError>>,
+ state: Arc<Mutex<AdapterState>>,
+ output_card: String,
+) {
let host = cpal::default_host();
- let device = host
+ let device = match host
.output_devices()
.unwrap()
.find(|d| d.name().unwrap() == output_card)
- .unwrap();
- let mut configs = device.supported_output_configs().unwrap();
+ {
+ Some(d) => d,
+ None => {
+ let _ = setup_tx.send(Err(M17Codec2Error::CardUnavailable(output_card).into()));
+ return;
+ }
+ };
+ let mut configs = match device.supported_output_configs() {
+ Ok(c) => c,
+ Err(e) => {
+ let _ = setup_tx.send(Err(M17Codec2Error::OutputConfigsUnavailable(
+ output_card,
+ e,
+ )
+ .into()));
+ return;
+ }
+ };
// TODO: channels == 1 doesn't work on a Raspberry Pi
// make this configurable and support interleaving LRLR stereo samples if using 2 channels
- let config = configs
- .find(|c| c.channels() == 1 && c.sample_format() == SampleFormat::I16)
- .unwrap()
- .with_sample_rate(SampleRate(8000));
- let stream = device
- .build_output_stream(
- &config.into(),
- move |data: &mut [i16], _info: &cpal::OutputCallbackInfo| {
- output_cb(data, &state);
- },
- |e| {
- // trigger end_tx here? always more edge cases
- debug!("error occurred in codec2 playback: {e:?}");
- },
- None,
- )
- .unwrap();
- stream.play().unwrap();
+ let config = match configs.find(|c| c.channels() == 1 && c.sample_format() == SampleFormat::I16)
+ {
+ Some(c) => c,
+ None => {
+ let _ = setup_tx.send(Err(
+ M17Codec2Error::SupportedOutputUnavailable(output_card).into()
+ ));
+ return;
+ }
+ };
+
+ let config = config.with_sample_rate(SampleRate(8000));
+ let stream = match device.build_output_stream(
+ &config.into(),
+ move |data: &mut [i16], _info: &cpal::OutputCallbackInfo| {
+ output_cb(data, &state);
+ },
+ |e| {
+ // trigger end_tx here? always more edge cases
+ debug!("error occurred in codec2 playback: {e:?}");
+ },
+ None,
+ ) {
+ Ok(s) => s,
+ Err(e) => {
+ let _ = setup_tx.send(Err(
+ M17Codec2Error::OutputStreamBuildError(output_card, e).into()
+ ));
+ return;
+ }
+ };
+ match stream.play() {
+ Ok(()) => (),
+ Err(e) => {
+ let _ = setup_tx.send(Err(
+ M17Codec2Error::OutputStreamPlayError(output_card, e).into()
+ ));
+ return;
+ }
+ }
+ let _ = setup_tx.send(Ok(()));
let _ = end.recv();
// it seems concrete impls of Stream have a Drop implementation that will handle termination
}
}
}
}
+
+#[derive(Debug, Error)]
+pub enum M17Codec2Error {
+ #[error("selected card '{0}' does not exist or is in use")]
+ CardUnavailable(String),
+
+ #[error("selected card '{0}' failed to list available output configs: '{1}'")]
+ OutputConfigsUnavailable(String, #[source] cpal::SupportedStreamConfigsError),
+
+ #[error("selected card '{0}' did not offer a compatible output config type, either due to hardware limitations or because it is currently in use")]
+ SupportedOutputUnavailable(String),
+
+ #[error("selected card '{0}' was unable to build an output stream: '{1}'")]
+ OutputStreamBuildError(String, #[source] cpal::BuildStreamError),
+
+ #[error("selected card '{0}' was unable to play an output stream: '{1}'")]
+ OutputStreamPlayError(String, #[source] cpal::PlayStreamError),
+}
let soundcard = Soundcard::new("plughw:CARD=Device,DEV=0").unwrap();
let soundmodem = Soundmodem::new(soundcard.input(), NullOutputSink::new(), NullPtt::new());
let app = M17App::new(soundmodem);
- app.add_stream_adapter(Codec2Adapter::new());
- app.start();
+ app.add_stream_adapter(Codec2Adapter::new()).unwrap();
+ app.start().unwrap();
loop {
std::thread::park();
let ptt = SerialPtt::new("/dev/ttyUSB0", PttPin::Rts);
let soundmodem = Soundmodem::new(soundcard.input(), soundcard.output(), ptt);
let app = M17App::new(soundmodem);
- app.start();
+ app.start().unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
println!("Beginning playback...");
WavePlayer::play(
);
println!("Playback complete.");
std::thread::sleep(std::time::Duration::from_secs(1));
- app.close();
+ app.close().unwrap();
}
fn main() {
let soundcard = Soundcard::new("plughw:CARD=Device,DEV=0").unwrap();
let soundmodem = Soundmodem::new(soundcard.input(), NullOutputSink::new(), NullPtt::new());
let app = M17App::new(soundmodem);
- app.add_packet_adapter(PacketPrinter);
- app.start();
+ app.add_packet_adapter(PacketPrinter).unwrap();
+ app.start().unwrap();
loop {
std::thread::park();
let soundmodem = Soundmodem::new(soundcard.input(), soundcard.output(), ptt);
let app = M17App::new(soundmodem);
- app.start();
+ app.start().unwrap();
println!("Transmitting packet...");
let source = M17Address::from_callsign("VK7XT-1").unwrap();
.unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
- app.close();
+ app.close().unwrap();
}