]> code.octet-stream.net Git - m17rt/commitdiff
Add codec2 tx adapter
authorThomas Karpiniec <tom.karpiniec@outlook.com>
Tue, 27 May 2025 09:15:31 +0000 (19:15 +1000)
committerThomas Karpiniec <tom.karpiniec@outlook.com>
Tue, 27 May 2025 09:15:31 +0000 (19:15 +1000)
m17codec2/src/error.rs
m17codec2/src/rx.rs
m17codec2/src/tx.rs
tools/m17rt-demod/src/main.rs
tools/m17rt-mod/src/main.rs

index ccf94b9894226905a2d538b3aa7aecac9d285531..7fa6583f1f3af20521d5895829a7bfcef30af7ef 100644 (file)
@@ -2,6 +2,9 @@ use thiserror::Error;
 
 #[derive(Debug, Error)]
 pub enum M17Codec2Error {
+    #[error("tried to start adapter more than once")]
+    RepeatStart,
+
     #[error("selected card '{0}' does not exist or is in use")]
     CardUnavailable(String),
 
@@ -19,4 +22,16 @@ pub enum M17Codec2Error {
 
     #[error("selected card '{0}' was unable to play an output stream: '{1}'")]
     OutputStreamPlayError(String, #[source] cpal::PlayStreamError),
+
+    #[error("selected card '{0}' failed to list available input configs: '{1}'")]
+    InputConfigsUnavailable(String, #[source] cpal::SupportedStreamConfigsError),
+
+    #[error("selected card '{0}' did not offer a compatible input config type, either due to hardware limitations or because it is currently in use")]
+    SupportedInputUnavailable(String),
+
+    #[error("selected card '{0}' was unable to build an input stream: '{1}'")]
+    InputStreamBuildError(String, #[source] cpal::BuildStreamError),
+
+    #[error("selected card '{0}' was unable to play an input stream: '{1}'")]
+    InputStreamPlayError(String, #[source] cpal::PlayStreamError),
 }
index c6d3cdc2504d82b5e023b5c75a8fdcb2d1653180..7c456109fc85ce703ecd9a9e30c867f184d30246 100644 (file)
@@ -50,7 +50,6 @@ impl Codec2RxAdapter {
     pub fn new() -> Self {
         Self {
             state: Arc::new(Mutex::new(AdapterState {
-                tx: None,
                 out_buf: VecDeque::new(),
                 codec2: Codec2::new(Codec2Mode::MODE_3200),
                 end_tx: None,
@@ -72,7 +71,6 @@ impl Default for Codec2RxAdapter {
 }
 
 struct AdapterState {
-    tx: Option<TxHandle>,
     /// Circular buffer of output samples for playback
     out_buf: VecDeque<i16>,
     codec2: Codec2,
@@ -81,9 +79,7 @@ struct AdapterState {
 }
 
 impl StreamAdapter for Codec2RxAdapter {
-    fn start(&self, handle: TxHandle) -> Result<(), AdapterError> {
-        self.state.lock().unwrap().tx = Some(handle);
-
+    fn start(&self, _handle: TxHandle) -> Result<(), AdapterError> {
         let (end_tx, end_rx) = channel();
         let (setup_tx, setup_rx) = channel();
         let state = self.state.clone();
@@ -92,7 +88,7 @@ impl StreamAdapter for Codec2RxAdapter {
         self.state.lock().unwrap().end_tx = Some(end_tx);
         // Propagate any errors arising in the thread
         let sample_rate = setup_rx.recv()??;
-        debug!("selected codec2 output sample rate {sample_rate}");
+        debug!("selected codec2 speaker sample rate {sample_rate}");
         if sample_rate != 8000 {
             let params = SincInterpolationParameters {
                 sinc_len: 256,
@@ -110,7 +106,6 @@ impl StreamAdapter for Codec2RxAdapter {
 
     fn close(&self) -> Result<(), AdapterError> {
         let mut state = self.state.lock().unwrap();
-        state.tx = None;
         state.end_tx = None;
         Ok(())
     }
index b824268f600c40b40a5da22e50d7d82acde46ef4..c0eb5968fcdd31217ba3dbac6fec5ee0019ed1fe 100644 (file)
@@ -1,12 +1,29 @@
 use codec2::{Codec2, Codec2Mode};
+use cpal::traits::DeviceTrait;
+use cpal::traits::HostTrait;
+use cpal::traits::StreamTrait;
+use cpal::SampleFormat;
+use cpal::SampleRate;
+use log::debug;
+use m17app::adapter::StreamAdapter;
 use m17app::app::TxHandle;
+use m17app::error::AdapterError;
 use m17app::link_setup::LinkSetup;
 use m17app::link_setup::M17Address;
 use m17app::StreamFrame;
+use rubato::Resampler;
+use rubato::SincFixedOut;
+use rubato::SincInterpolationParameters;
 use std::path::PathBuf;
+use std::sync::mpsc;
+use std::sync::mpsc::channel;
+use std::sync::Arc;
+use std::sync::Mutex;
 use std::time::Duration;
 use std::time::Instant;
 
+use crate::M17Codec2Error;
+
 /// Transmits a wave file as an M17 stream
 pub struct WavePlayer;
 
@@ -74,3 +91,386 @@ impl WavePlayer {
         }
     }
 }
+
+/// Control transmissions into a Codec2TxAdapter
+#[derive(Clone)]
+pub struct Ptt {
+    tx: mpsc::Sender<Event>,
+}
+
+impl Ptt {
+    pub fn set_ptt(&self, ptt: bool) {
+        let _ = self.tx.send(if ptt { Event::PttOn } else { Event::PttOff });
+    }
+}
+
+/// Use a microphone and local PTT to transmit Codec2 voice data into an M17 channel.
+pub struct Codec2TxAdapter {
+    input_card: Option<String>,
+    event_tx: mpsc::Sender<Event>,
+    event_rx: Mutex<Option<mpsc::Receiver<Event>>>,
+    source: M17Address,
+    destination: M17Address,
+}
+
+impl Codec2TxAdapter {
+    pub fn new(source: M17Address, destination: M17Address) -> Self {
+        let (event_tx, event_rx) = mpsc::channel();
+        Self {
+            input_card: None,
+            event_tx,
+            event_rx: Mutex::new(Some(event_rx)),
+            source,
+            destination,
+        }
+    }
+
+    pub fn set_input_card<S: Into<String>>(&mut self, card_name: S) {
+        self.input_card = Some(card_name.into());
+    }
+
+    pub fn ptt(&self) -> Ptt {
+        Ptt {
+            tx: self.event_tx.clone(),
+        }
+    }
+}
+
+enum Event {
+    PttOn,
+    PttOff,
+    MicSamples(Arc<[i16]>),
+    Close,
+}
+
+impl StreamAdapter for Codec2TxAdapter {
+    fn start(&self, handle: TxHandle) -> Result<(), AdapterError> {
+        let Some(event_rx) = self.event_rx.lock().unwrap().take() else {
+            return Err(M17Codec2Error::RepeatStart.into());
+        };
+        let event_tx = self.event_tx.clone();
+        let (setup_tx, setup_rx) = channel();
+        let input_card = self.input_card.clone();
+        let from = self.source.clone();
+        let to = self.destination.clone();
+        std::thread::spawn(move || {
+            stream_thread(event_tx, event_rx, setup_tx, input_card, handle, from, to)
+        });
+        let sample_rate = setup_rx.recv()??;
+        debug!("selected codec2 microphone sample rate {sample_rate}");
+
+        Ok(())
+    }
+
+    fn close(&self) -> Result<(), AdapterError> {
+        let _ = self.event_tx.send(Event::Close);
+        Ok(())
+    }
+
+    fn stream_began(&self, _link_setup: LinkSetup) {
+        // not interested in incoming transmissions
+    }
+
+    fn stream_data(&self, _frame_number: u16, _is_final: bool, _data: Arc<[u8; 16]>) {
+        // not interested in incoming transmissions
+
+        // the only reason this is an adapter at all is for future "transmission aborted" feedback
+        // when that's implemented by m17app
+    }
+}
+
+fn stream_thread(
+    event_tx: mpsc::Sender<Event>,
+    event_rx: mpsc::Receiver<Event>,
+    setup_tx: mpsc::Sender<Result<u32, AdapterError>>,
+    input_card: Option<String>,
+    handle: TxHandle,
+    source: M17Address,
+    destination: M17Address,
+) {
+    let host = cpal::default_host();
+    let device = if let Some(input_card) = input_card {
+        // TODO: more error handling for unwraps
+        match host
+            .input_devices()
+            .unwrap()
+            .find(|d| d.name().unwrap() == input_card)
+        {
+            Some(d) => d,
+            None => {
+                let _ = setup_tx.send(Err(M17Codec2Error::CardUnavailable(input_card).into()));
+                return;
+            }
+        }
+    } else {
+        match host.default_input_device() {
+            Some(d) => d,
+            None => {
+                let _ = setup_tx.send(Err(M17Codec2Error::DefaultCardUnavailable.into()));
+                return;
+            }
+        }
+    };
+    let card_name = device.name().unwrap();
+    let mut configs = match device.supported_input_configs() {
+        Ok(c) => c,
+        Err(e) => {
+            let _ = setup_tx.send(Err(
+                M17Codec2Error::InputConfigsUnavailable(card_name, e).into()
+            ));
+            return;
+        }
+    };
+    // TODO: rank these by most favourable, same for rx
+    let config = match configs.find(|c| {
+        (c.channels() == 1 || c.channels() == 2) && c.sample_format() == SampleFormat::I16
+    }) {
+        Some(c) => c,
+        None => {
+            let _ = setup_tx.send(Err(
+                M17Codec2Error::SupportedInputUnavailable(card_name).into()
+            ));
+            return;
+        }
+    };
+
+    let target_sample_rate =
+        if config.min_sample_rate().0 <= 8000 && config.max_sample_rate().0 >= 8000 {
+            8000
+        } else {
+            config.min_sample_rate().0
+        };
+    let channels = config.channels();
+
+    let mut acc: Box<dyn Accumulator> = if target_sample_rate != 8000 {
+        Box::new(ResamplingAccumulator::new(target_sample_rate as f64))
+    } else {
+        Box::new(DirectAccumulator::new())
+    };
+
+    let config = config.with_sample_rate(SampleRate(target_sample_rate));
+    let stream = match device.build_input_stream(
+        &config.into(),
+        move |data: &[i16], _info: &cpal::InputCallbackInfo| {
+            let mut samples = vec![];
+            for d in data.chunks(channels as usize) {
+                // if we were given multi-channel input we'll pick the first channel
+                // TODO: configurable?
+                samples.push(d[0]);
+            }
+            let _ = event_tx.send(Event::MicSamples(samples.into()));
+        },
+        |e| {
+            // abort here?
+            debug!("error occurred in codec2 recording: {e:?}");
+        },
+        None,
+    ) {
+        Ok(s) => s,
+        Err(e) => {
+            let _ = setup_tx.send(Err(
+                M17Codec2Error::InputStreamBuildError(card_name, e).into()
+            ));
+            return;
+        }
+    };
+
+    let _ = setup_tx.send(Ok(target_sample_rate));
+    let mut state = State::Idle;
+    let mut codec2 = Codec2::new(Codec2Mode::MODE_3200);
+    let link_setup = LinkSetup::new_voice(&source, &destination);
+    let mut lich_idx = 0;
+    let mut frame_number = 0;
+
+    // Now the main loop
+    while let Ok(ev) = event_rx.recv() {
+        match ev {
+            Event::PttOn => {
+                match state {
+                    State::Idle => {
+                        match stream.play() {
+                            Ok(()) => (),
+                            Err(_e) => {
+                                // TODO: report M17Codec2Error::InputStreamPlayError(card_name, e).into()
+                                break;
+                            }
+                        }
+                        acc.reset();
+                        codec2 = Codec2::new(Codec2Mode::MODE_3200);
+                        state = State::StartTransmitting;
+                    }
+                    State::StartTransmitting => {}
+                    State::Transmitting => {}
+                    State::Ending => state = State::EndingWithPttRestart,
+                    State::EndingWithPttRestart => {}
+                }
+            }
+            Event::PttOff => match state {
+                State::Idle => {}
+                State::StartTransmitting => state = State::Idle,
+                State::Transmitting => state = State::Ending,
+                State::Ending => {}
+                State::EndingWithPttRestart => state = State::Ending,
+            },
+            Event::MicSamples(samples) => {
+                match state {
+                    State::Idle => {}
+                    State::StartTransmitting
+                    | State::Transmitting
+                    | State::Ending
+                    | State::EndingWithPttRestart => {
+                        acc.handle_samples(&samples);
+                        while let Some(frame) = acc.try_next_frame() {
+                            let mut stream_data = [0u8; 16];
+                            codec2.encode(&mut stream_data[0..8], &frame[0..160]);
+                            codec2.encode(&mut stream_data[8..16], &frame[160..320]);
+
+                            if state == State::StartTransmitting {
+                                handle.transmit_stream_start(&link_setup);
+                                lich_idx = 0;
+                                frame_number = 0;
+                                state = State::Transmitting;
+                            }
+
+                            let end_of_stream = state != State::Transmitting;
+                            handle.transmit_stream_next(&StreamFrame {
+                                lich_idx,
+                                lich_part: link_setup.lich_part(lich_idx),
+                                frame_number,
+                                end_of_stream,
+                                stream_data,
+                            });
+                            frame_number += 1;
+                            lich_idx = (lich_idx + 1) % 6;
+
+                            if end_of_stream {
+                                break;
+                            }
+                        }
+
+                        if state == State::Ending {
+                            // when finished sending final stream frame
+                            let _ = stream.pause();
+                            state = State::Idle;
+                        }
+
+                        if state == State::EndingWithPttRestart {
+                            acc.reset();
+                            codec2 = Codec2::new(Codec2Mode::MODE_3200);
+                            state = State::StartTransmitting;
+                        }
+                    }
+                }
+            }
+            Event::Close => {
+                // assume PTT etc. will clean up itself responsibly on close
+                break;
+            }
+        }
+    }
+}
+
+#[derive(Debug, PartialEq, Eq)]
+enum State {
+    /// Waiting for PTT
+    Idle,
+    /// PTT engaged but we are collecting the first full frame of audio data before starting the stream
+    StartTransmitting,
+    /// Streaming voice frames
+    Transmitting,
+    /// PTT disengaged; we are collecting the next frame of audio to use as a final frame
+    Ending,
+    /// PTT was re-enaged while ending; we will send final frame then go back to StartTransmitting
+    EndingWithPttRestart,
+}
+
+fn resampler_params() -> SincInterpolationParameters {
+    SincInterpolationParameters {
+        sinc_len: 256,
+        f_cutoff: 0.95,
+        oversampling_factor: 128,
+        interpolation: rubato::SincInterpolationType::Cubic,
+        window: rubato::WindowFunction::BlackmanHarris2,
+    }
+}
+
+trait Accumulator {
+    fn handle_samples(&mut self, samples: &[i16]);
+    /// Return 320 samples, enough for two Codec2 frames
+    fn try_next_frame(&mut self) -> Option<Vec<i16>>;
+    fn reset(&mut self);
+}
+
+struct DirectAccumulator {
+    buffer: Vec<i16>,
+}
+
+impl DirectAccumulator {
+    fn new() -> Self {
+        Self { buffer: Vec::new() }
+    }
+}
+
+impl Accumulator for DirectAccumulator {
+    fn handle_samples(&mut self, samples: &[i16]) {
+        self.buffer.extend_from_slice(samples);
+    }
+
+    fn try_next_frame(&mut self) -> Option<Vec<i16>> {
+        if self.buffer.len() >= 320 {
+            let part = self.buffer.split_off(320);
+            Some(std::mem::replace(&mut self.buffer, part))
+        } else {
+            None
+        }
+    }
+
+    fn reset(&mut self) {
+        self.buffer.clear();
+    }
+}
+
+struct ResamplingAccumulator {
+    input_rate: f64,
+    buffer: Vec<i16>,
+    resampler: SincFixedOut<f32>,
+}
+
+impl ResamplingAccumulator {
+    fn new(input_rate: f64) -> Self {
+        Self {
+            input_rate,
+            buffer: Vec::new(),
+            resampler: make_resampler(input_rate),
+        }
+    }
+}
+
+impl Accumulator for ResamplingAccumulator {
+    fn handle_samples(&mut self, samples: &[i16]) {
+        self.buffer.extend_from_slice(samples);
+    }
+
+    fn try_next_frame(&mut self) -> Option<Vec<i16>> {
+        let required = self.resampler.input_frames_next();
+        if self.buffer.len() >= required {
+            let mut part = self.buffer.split_off(required);
+            std::mem::swap(&mut self.buffer, &mut part);
+            let samples_f: Vec<f32> = part.iter().map(|s| *s as f32 / 16384.0f32).collect();
+            let out = self.resampler.process(&[samples_f], None).unwrap();
+            Some(out[0].iter().map(|s| (*s * 16383.0f32) as i16).collect())
+        } else {
+            None
+        }
+    }
+
+    fn reset(&mut self) {
+        self.buffer.clear();
+        self.resampler = make_resampler(self.input_rate);
+    }
+}
+
+fn make_resampler(input_rate: f64) -> SincFixedOut<f32> {
+    // want 320 samples at a time to create 2x Codec2 frames per M17 Voice frame
+    SincFixedOut::new(8000f64 / input_rate, 1.0, resampler_params(), 320, 1).unwrap()
+}
index 0da1501c4c69a13511d12b58ce1c4569eeaa451f..3f7c47988ccac44ae3773b6431e03caa0db07594 100644 (file)
@@ -1,7 +1,7 @@
 use m17app::app::M17App;
 use m17app::soundcard::Soundcard;
 use m17app::soundmodem::{NullErrorHandler, NullOutputSink, NullPtt, Soundmodem};
-use m17codec2::Codec2Adapter;
+use m17codec2::rx::Codec2RxAdapter;
 
 pub fn demod_test() {
     let soundcard = Soundcard::new("plughw:CARD=Device,DEV=0").unwrap();
@@ -12,7 +12,7 @@ pub fn demod_test() {
         NullErrorHandler::new(),
     );
     let app = M17App::new(soundmodem);
-    app.add_stream_adapter(Codec2Adapter::new()).unwrap();
+    app.add_stream_adapter(Codec2RxAdapter::new()).unwrap();
     app.start().unwrap();
 
     loop {
index 104d83e45ac8893354ef9051259c133dcad7d490..328a7dac65c1480328b8f0e45af1a34a956b9fc2 100644 (file)
@@ -3,7 +3,7 @@ use m17app::link_setup::M17Address;
 use m17app::serial::{PttPin, SerialPtt};
 use m17app::soundcard::Soundcard;
 use m17app::soundmodem::{NullErrorHandler, Soundmodem};
-use m17codec2::WavePlayer;
+use m17codec2::tx::WavePlayer;
 use std::path::PathBuf;
 
 pub fn mod_test() {