From f5cbae9d09cc58d8b549c36111c7a57a16b9a822 Mon Sep 17 00:00:00 2001 From: Thomas Karpiniec Date: Tue, 27 May 2025 19:15:31 +1000 Subject: [PATCH] Add codec2 tx adapter --- m17codec2/src/error.rs | 15 ++ m17codec2/src/rx.rs | 9 +- m17codec2/src/tx.rs | 400 ++++++++++++++++++++++++++++++++++ tools/m17rt-demod/src/main.rs | 4 +- tools/m17rt-mod/src/main.rs | 2 +- 5 files changed, 420 insertions(+), 10 deletions(-) diff --git a/m17codec2/src/error.rs b/m17codec2/src/error.rs index ccf94b9..7fa6583 100644 --- a/m17codec2/src/error.rs +++ b/m17codec2/src/error.rs @@ -2,6 +2,9 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum M17Codec2Error { + #[error("tried to start adapter more than once")] + RepeatStart, + #[error("selected card '{0}' does not exist or is in use")] CardUnavailable(String), @@ -19,4 +22,16 @@ pub enum M17Codec2Error { #[error("selected card '{0}' was unable to play an output stream: '{1}'")] OutputStreamPlayError(String, #[source] cpal::PlayStreamError), + + #[error("selected card '{0}' failed to list available input configs: '{1}'")] + InputConfigsUnavailable(String, #[source] cpal::SupportedStreamConfigsError), + + #[error("selected card '{0}' did not offer a compatible input config type, either due to hardware limitations or because it is currently in use")] + SupportedInputUnavailable(String), + + #[error("selected card '{0}' was unable to build an input stream: '{1}'")] + InputStreamBuildError(String, #[source] cpal::BuildStreamError), + + #[error("selected card '{0}' was unable to play an input stream: '{1}'")] + InputStreamPlayError(String, #[source] cpal::PlayStreamError), } diff --git a/m17codec2/src/rx.rs b/m17codec2/src/rx.rs index c6d3cdc..7c45610 100644 --- a/m17codec2/src/rx.rs +++ b/m17codec2/src/rx.rs @@ -50,7 +50,6 @@ impl Codec2RxAdapter { pub fn new() -> Self { Self { state: Arc::new(Mutex::new(AdapterState { - tx: None, out_buf: VecDeque::new(), codec2: Codec2::new(Codec2Mode::MODE_3200), end_tx: None, @@ -72,7 +71,6 @@ impl Default for Codec2RxAdapter { } struct AdapterState { - tx: Option, /// Circular buffer of output samples for playback out_buf: VecDeque, codec2: Codec2, @@ -81,9 +79,7 @@ struct AdapterState { } impl StreamAdapter for Codec2RxAdapter { - fn start(&self, handle: TxHandle) -> Result<(), AdapterError> { - self.state.lock().unwrap().tx = Some(handle); - + fn start(&self, _handle: TxHandle) -> Result<(), AdapterError> { let (end_tx, end_rx) = channel(); let (setup_tx, setup_rx) = channel(); let state = self.state.clone(); @@ -92,7 +88,7 @@ impl StreamAdapter for Codec2RxAdapter { self.state.lock().unwrap().end_tx = Some(end_tx); // Propagate any errors arising in the thread let sample_rate = setup_rx.recv()??; - debug!("selected codec2 output sample rate {sample_rate}"); + debug!("selected codec2 speaker sample rate {sample_rate}"); if sample_rate != 8000 { let params = SincInterpolationParameters { sinc_len: 256, @@ -110,7 +106,6 @@ impl StreamAdapter for Codec2RxAdapter { fn close(&self) -> Result<(), AdapterError> { let mut state = self.state.lock().unwrap(); - state.tx = None; state.end_tx = None; Ok(()) } diff --git a/m17codec2/src/tx.rs b/m17codec2/src/tx.rs index b824268..c0eb596 100644 --- a/m17codec2/src/tx.rs +++ b/m17codec2/src/tx.rs @@ -1,12 +1,29 @@ use codec2::{Codec2, Codec2Mode}; +use cpal::traits::DeviceTrait; +use cpal::traits::HostTrait; +use cpal::traits::StreamTrait; +use cpal::SampleFormat; +use cpal::SampleRate; +use log::debug; +use m17app::adapter::StreamAdapter; use m17app::app::TxHandle; +use m17app::error::AdapterError; use m17app::link_setup::LinkSetup; use m17app::link_setup::M17Address; use m17app::StreamFrame; +use rubato::Resampler; +use rubato::SincFixedOut; +use rubato::SincInterpolationParameters; use std::path::PathBuf; +use std::sync::mpsc; +use std::sync::mpsc::channel; +use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; use std::time::Instant; +use crate::M17Codec2Error; + /// Transmits a wave file as an M17 stream pub struct WavePlayer; @@ -74,3 +91,386 @@ impl WavePlayer { } } } + +/// Control transmissions into a Codec2TxAdapter +#[derive(Clone)] +pub struct Ptt { + tx: mpsc::Sender, +} + +impl Ptt { + pub fn set_ptt(&self, ptt: bool) { + let _ = self.tx.send(if ptt { Event::PttOn } else { Event::PttOff }); + } +} + +/// Use a microphone and local PTT to transmit Codec2 voice data into an M17 channel. +pub struct Codec2TxAdapter { + input_card: Option, + event_tx: mpsc::Sender, + event_rx: Mutex>>, + source: M17Address, + destination: M17Address, +} + +impl Codec2TxAdapter { + pub fn new(source: M17Address, destination: M17Address) -> Self { + let (event_tx, event_rx) = mpsc::channel(); + Self { + input_card: None, + event_tx, + event_rx: Mutex::new(Some(event_rx)), + source, + destination, + } + } + + pub fn set_input_card>(&mut self, card_name: S) { + self.input_card = Some(card_name.into()); + } + + pub fn ptt(&self) -> Ptt { + Ptt { + tx: self.event_tx.clone(), + } + } +} + +enum Event { + PttOn, + PttOff, + MicSamples(Arc<[i16]>), + Close, +} + +impl StreamAdapter for Codec2TxAdapter { + fn start(&self, handle: TxHandle) -> Result<(), AdapterError> { + let Some(event_rx) = self.event_rx.lock().unwrap().take() else { + return Err(M17Codec2Error::RepeatStart.into()); + }; + let event_tx = self.event_tx.clone(); + let (setup_tx, setup_rx) = channel(); + let input_card = self.input_card.clone(); + let from = self.source.clone(); + let to = self.destination.clone(); + std::thread::spawn(move || { + stream_thread(event_tx, event_rx, setup_tx, input_card, handle, from, to) + }); + let sample_rate = setup_rx.recv()??; + debug!("selected codec2 microphone sample rate {sample_rate}"); + + Ok(()) + } + + fn close(&self) -> Result<(), AdapterError> { + let _ = self.event_tx.send(Event::Close); + Ok(()) + } + + fn stream_began(&self, _link_setup: LinkSetup) { + // not interested in incoming transmissions + } + + fn stream_data(&self, _frame_number: u16, _is_final: bool, _data: Arc<[u8; 16]>) { + // not interested in incoming transmissions + + // the only reason this is an adapter at all is for future "transmission aborted" feedback + // when that's implemented by m17app + } +} + +fn stream_thread( + event_tx: mpsc::Sender, + event_rx: mpsc::Receiver, + setup_tx: mpsc::Sender>, + input_card: Option, + handle: TxHandle, + source: M17Address, + destination: M17Address, +) { + let host = cpal::default_host(); + let device = if let Some(input_card) = input_card { + // TODO: more error handling for unwraps + match host + .input_devices() + .unwrap() + .find(|d| d.name().unwrap() == input_card) + { + Some(d) => d, + None => { + let _ = setup_tx.send(Err(M17Codec2Error::CardUnavailable(input_card).into())); + return; + } + } + } else { + match host.default_input_device() { + Some(d) => d, + None => { + let _ = setup_tx.send(Err(M17Codec2Error::DefaultCardUnavailable.into())); + return; + } + } + }; + let card_name = device.name().unwrap(); + let mut configs = match device.supported_input_configs() { + Ok(c) => c, + Err(e) => { + let _ = setup_tx.send(Err( + M17Codec2Error::InputConfigsUnavailable(card_name, e).into() + )); + return; + } + }; + // TODO: rank these by most favourable, same for rx + let config = match configs.find(|c| { + (c.channels() == 1 || c.channels() == 2) && c.sample_format() == SampleFormat::I16 + }) { + Some(c) => c, + None => { + let _ = setup_tx.send(Err( + M17Codec2Error::SupportedInputUnavailable(card_name).into() + )); + return; + } + }; + + let target_sample_rate = + if config.min_sample_rate().0 <= 8000 && config.max_sample_rate().0 >= 8000 { + 8000 + } else { + config.min_sample_rate().0 + }; + let channels = config.channels(); + + let mut acc: Box = if target_sample_rate != 8000 { + Box::new(ResamplingAccumulator::new(target_sample_rate as f64)) + } else { + Box::new(DirectAccumulator::new()) + }; + + let config = config.with_sample_rate(SampleRate(target_sample_rate)); + let stream = match device.build_input_stream( + &config.into(), + move |data: &[i16], _info: &cpal::InputCallbackInfo| { + let mut samples = vec![]; + for d in data.chunks(channels as usize) { + // if we were given multi-channel input we'll pick the first channel + // TODO: configurable? + samples.push(d[0]); + } + let _ = event_tx.send(Event::MicSamples(samples.into())); + }, + |e| { + // abort here? + debug!("error occurred in codec2 recording: {e:?}"); + }, + None, + ) { + Ok(s) => s, + Err(e) => { + let _ = setup_tx.send(Err( + M17Codec2Error::InputStreamBuildError(card_name, e).into() + )); + return; + } + }; + + let _ = setup_tx.send(Ok(target_sample_rate)); + let mut state = State::Idle; + let mut codec2 = Codec2::new(Codec2Mode::MODE_3200); + let link_setup = LinkSetup::new_voice(&source, &destination); + let mut lich_idx = 0; + let mut frame_number = 0; + + // Now the main loop + while let Ok(ev) = event_rx.recv() { + match ev { + Event::PttOn => { + match state { + State::Idle => { + match stream.play() { + Ok(()) => (), + Err(_e) => { + // TODO: report M17Codec2Error::InputStreamPlayError(card_name, e).into() + break; + } + } + acc.reset(); + codec2 = Codec2::new(Codec2Mode::MODE_3200); + state = State::StartTransmitting; + } + State::StartTransmitting => {} + State::Transmitting => {} + State::Ending => state = State::EndingWithPttRestart, + State::EndingWithPttRestart => {} + } + } + Event::PttOff => match state { + State::Idle => {} + State::StartTransmitting => state = State::Idle, + State::Transmitting => state = State::Ending, + State::Ending => {} + State::EndingWithPttRestart => state = State::Ending, + }, + Event::MicSamples(samples) => { + match state { + State::Idle => {} + State::StartTransmitting + | State::Transmitting + | State::Ending + | State::EndingWithPttRestart => { + acc.handle_samples(&samples); + while let Some(frame) = acc.try_next_frame() { + let mut stream_data = [0u8; 16]; + codec2.encode(&mut stream_data[0..8], &frame[0..160]); + codec2.encode(&mut stream_data[8..16], &frame[160..320]); + + if state == State::StartTransmitting { + handle.transmit_stream_start(&link_setup); + lich_idx = 0; + frame_number = 0; + state = State::Transmitting; + } + + let end_of_stream = state != State::Transmitting; + handle.transmit_stream_next(&StreamFrame { + lich_idx, + lich_part: link_setup.lich_part(lich_idx), + frame_number, + end_of_stream, + stream_data, + }); + frame_number += 1; + lich_idx = (lich_idx + 1) % 6; + + if end_of_stream { + break; + } + } + + if state == State::Ending { + // when finished sending final stream frame + let _ = stream.pause(); + state = State::Idle; + } + + if state == State::EndingWithPttRestart { + acc.reset(); + codec2 = Codec2::new(Codec2Mode::MODE_3200); + state = State::StartTransmitting; + } + } + } + } + Event::Close => { + // assume PTT etc. will clean up itself responsibly on close + break; + } + } + } +} + +#[derive(Debug, PartialEq, Eq)] +enum State { + /// Waiting for PTT + Idle, + /// PTT engaged but we are collecting the first full frame of audio data before starting the stream + StartTransmitting, + /// Streaming voice frames + Transmitting, + /// PTT disengaged; we are collecting the next frame of audio to use as a final frame + Ending, + /// PTT was re-enaged while ending; we will send final frame then go back to StartTransmitting + EndingWithPttRestart, +} + +fn resampler_params() -> SincInterpolationParameters { + SincInterpolationParameters { + sinc_len: 256, + f_cutoff: 0.95, + oversampling_factor: 128, + interpolation: rubato::SincInterpolationType::Cubic, + window: rubato::WindowFunction::BlackmanHarris2, + } +} + +trait Accumulator { + fn handle_samples(&mut self, samples: &[i16]); + /// Return 320 samples, enough for two Codec2 frames + fn try_next_frame(&mut self) -> Option>; + fn reset(&mut self); +} + +struct DirectAccumulator { + buffer: Vec, +} + +impl DirectAccumulator { + fn new() -> Self { + Self { buffer: Vec::new() } + } +} + +impl Accumulator for DirectAccumulator { + fn handle_samples(&mut self, samples: &[i16]) { + self.buffer.extend_from_slice(samples); + } + + fn try_next_frame(&mut self) -> Option> { + if self.buffer.len() >= 320 { + let part = self.buffer.split_off(320); + Some(std::mem::replace(&mut self.buffer, part)) + } else { + None + } + } + + fn reset(&mut self) { + self.buffer.clear(); + } +} + +struct ResamplingAccumulator { + input_rate: f64, + buffer: Vec, + resampler: SincFixedOut, +} + +impl ResamplingAccumulator { + fn new(input_rate: f64) -> Self { + Self { + input_rate, + buffer: Vec::new(), + resampler: make_resampler(input_rate), + } + } +} + +impl Accumulator for ResamplingAccumulator { + fn handle_samples(&mut self, samples: &[i16]) { + self.buffer.extend_from_slice(samples); + } + + fn try_next_frame(&mut self) -> Option> { + let required = self.resampler.input_frames_next(); + if self.buffer.len() >= required { + let mut part = self.buffer.split_off(required); + std::mem::swap(&mut self.buffer, &mut part); + let samples_f: Vec = part.iter().map(|s| *s as f32 / 16384.0f32).collect(); + let out = self.resampler.process(&[samples_f], None).unwrap(); + Some(out[0].iter().map(|s| (*s * 16383.0f32) as i16).collect()) + } else { + None + } + } + + fn reset(&mut self) { + self.buffer.clear(); + self.resampler = make_resampler(self.input_rate); + } +} + +fn make_resampler(input_rate: f64) -> SincFixedOut { + // want 320 samples at a time to create 2x Codec2 frames per M17 Voice frame + SincFixedOut::new(8000f64 / input_rate, 1.0, resampler_params(), 320, 1).unwrap() +} diff --git a/tools/m17rt-demod/src/main.rs b/tools/m17rt-demod/src/main.rs index 0da1501..3f7c479 100644 --- a/tools/m17rt-demod/src/main.rs +++ b/tools/m17rt-demod/src/main.rs @@ -1,7 +1,7 @@ use m17app::app::M17App; use m17app::soundcard::Soundcard; use m17app::soundmodem::{NullErrorHandler, NullOutputSink, NullPtt, Soundmodem}; -use m17codec2::Codec2Adapter; +use m17codec2::rx::Codec2RxAdapter; pub fn demod_test() { let soundcard = Soundcard::new("plughw:CARD=Device,DEV=0").unwrap(); @@ -12,7 +12,7 @@ pub fn demod_test() { NullErrorHandler::new(), ); let app = M17App::new(soundmodem); - app.add_stream_adapter(Codec2Adapter::new()).unwrap(); + app.add_stream_adapter(Codec2RxAdapter::new()).unwrap(); app.start().unwrap(); loop { diff --git a/tools/m17rt-mod/src/main.rs b/tools/m17rt-mod/src/main.rs index 104d83e..328a7da 100644 --- a/tools/m17rt-mod/src/main.rs +++ b/tools/m17rt-mod/src/main.rs @@ -3,7 +3,7 @@ use m17app::link_setup::M17Address; use m17app::serial::{PttPin, SerialPtt}; use m17app::soundcard::Soundcard; use m17app::soundmodem::{NullErrorHandler, Soundmodem}; -use m17codec2::WavePlayer; +use m17codec2::tx::WavePlayer; use std::path::PathBuf; pub fn mod_test() { -- 2.39.5