]> code.octet-stream.net Git - m17rt/commitdiff
Soundmodem rx works in realtime end-to-end using file source
authorThomas Karpiniec <tom.karpiniec@outlook.com>
Sun, 5 Jan 2025 10:10:01 +0000 (21:10 +1100)
committerThomas Karpiniec <tom.karpiniec@outlook.com>
Sun, 5 Jan 2025 10:10:01 +0000 (21:10 +1100)
demod/src/main.rs
m17app/src/soundmodem.rs
m17core/src/tnc.rs

index 78ff2b0732086a4ea0911b57ecd70274b095d19e..0f7f031d236a1adeaae1c4c8f49ef24c58983559 100755 (executable)
-use cpal::traits::DeviceTrait;
-use cpal::traits::HostTrait;
-use cpal::traits::StreamTrait;
-use cpal::{SampleFormat, SampleRate};
-use log::debug;
-use m17core::{
-    modem::{Demodulator, SoftDemodulator},
-    protocol::{Frame, LichCollection},
-};
-use std::{fs::File, io::Read};
-
-pub fn run_my_decode() {
-    let file = File::open("../../Data/test_vk7xt.rrc").unwrap();
-    let mut input = file;
-    let mut baseband = vec![];
-    input.read_to_end(&mut baseband).unwrap();
-
-    let mut lich = LichCollection::new();
-    let mut codec2_data = vec![];
-    let mut modem = SoftDemodulator::new();
-
-    for pair in baseband.chunks(2) {
-        let sample: i16 = i16::from_le_bytes([pair[0], pair[1]]);
-        if let Some(frame) = modem.demod(sample) {
-            debug!("Modem demodulated frame: {:?}", frame);
-            if let Frame::Stream(s) = frame {
-                for b in s.stream_data {
-                    codec2_data.push(b);
-
-                    let valid_before = lich.valid_segments();
-                    lich.set_segment(s.lich_idx, s.lich_part);
-                    let valid_after = lich.valid_segments();
-                    if valid_before != valid_after {
-                        debug!("Valid lich segments: {}", lich.valid_segments());
-                    }
-                    if valid_before == 5 && valid_after == 6 {
-                        if let Some(l) = lich.try_assemble() {
-                            debug!("Assembled complete lich: {l:?}");
-                        }
-                    }
-                }
-                if s.end_of_stream {
-                    debug!("len of codec2 data: {}", codec2_data.len());
-                    assert_eq!(codec2_data.len(), 1504);
-
-                    let samples =
-                        m17codec2::decode_codec2(&codec2_data, "../../Data/speech_out.raw");
-                    let host = cpal::default_host();
-                    let def = host.default_output_device().unwrap();
-                    let mut configs = def.supported_output_configs().unwrap();
-                    let config = configs
-                        .find(|c| c.channels() == 1 && c.sample_format() == SampleFormat::I16)
-                        .unwrap()
-                        .with_sample_rate(SampleRate(8000));
-                    let mut counter = 0;
-                    let mut index = 0;
-                    let stream = def
-                        .build_output_stream(
-                            &config.into(),
-                            move |data: &mut [i16], info: &cpal::OutputCallbackInfo| {
-                                debug!(
-                                    "callback {:?} playback {:?}",
-                                    info.timestamp().callback,
-                                    info.timestamp().playback
-                                );
-                                println!(
-                                    "iteration {counter} asked for {} samples at time {}",
-                                    data.len(),
-                                    std::time::SystemTime::now()
-                                        .duration_since(std::time::UNIX_EPOCH)
-                                        .unwrap()
-                                        .as_millis()
-                                );
-                                counter += 1;
-                                let qty = data.len().min(samples.len() - index);
-                                println!("providing {qty} samples");
-                                data[0..qty].copy_from_slice(&samples[index..(index + qty)]);
-                                index += qty;
-                            },
-                            move |_e| {
-                                println!("error occurred");
-                            },
-                            None,
-                        )
-                        .unwrap();
-                    stream.play().unwrap();
-
-                    std::thread::sleep(std::time::Duration::from_secs(10));
-                }
-            }
-        }
-    }
-}
-
-pub fn cpal_test() {
-    let host = cpal::default_host();
-    for d in host.devices().unwrap() {
-        println!("Found card: {:?}", d.name().unwrap());
-    }
-    let def = host.default_output_device().unwrap();
-    println!("the default output device is: {}", def.name().unwrap());
-
-    for c in def.supported_output_configs().unwrap() {
-        println!("config supported: {:?}", c);
-    }
-
-    println!("all supported output configs shown");
+use m17app::app::M17App;
+use m17app::soundmodem::{InputRrcFile, Soundmodem};
+use m17codec2::Codec2Adapter;
+use std::path::PathBuf;
+
+pub fn m17app_test() {
+    let path = PathBuf::from("../../Data/test_vk7xt.rrc");
+    let source = InputRrcFile::new(path);
+    let soundmodem = Soundmodem::new_with_input(source);
+    let app = M17App::new(soundmodem);
+    app.add_stream_adapter(Codec2Adapter::new());
+    app.start();
+    std::thread::sleep(std::time::Duration::from_secs(15));
 }
 
 fn main() {
     env_logger::init();
-    run_my_decode();
-    //cpal_test();
+    m17app_test();
 }
index b6fbf4692917d18851abb3ba0d419e5f0a3c5698..8b7f80f74aa4765aba9e74bd3667375c87b91c33 100644 (file)
@@ -2,6 +2,8 @@ use std::io::{self, ErrorKind, Read, Write};
 
 use crate::tnc::{Tnc, TncError};
 use log::debug;
+use m17core::kiss::MAX_FRAME_LEN;
+use m17core::modem::{Demodulator, SoftDemodulator};
 use m17core::tnc::SoftTnc;
 use std::fs::File;
 use std::path::PathBuf;
@@ -10,28 +12,68 @@ use std::sync::{Arc, Mutex};
 use std::time::{Duration, Instant};
 
 pub struct Soundmodem {
-    tnc: SoftTnc,
-    config: SoundmodemConfig,
+    event_tx: SyncSender<SoundmodemEvent>,
+    kiss_out_rx: Arc<Mutex<Receiver<Arc<[u8]>>>>,
+    partial_kiss_out: Arc<Mutex<Option<PartialKissOut>>>,
 }
 
-pub struct SoundmodemConfig {
-    // sound cards, PTT, etc.
-    input: Box<dyn InputSource>,
+impl Soundmodem {
+    pub fn new_with_input<T: InputSource>(input: T) -> Self {
+        // must create TNC here
+        let (event_tx, event_rx) = sync_channel(128);
+        let (kiss_out_tx, kiss_out_rx) = sync_channel(128);
+        spawn_soundmodem_worker(event_tx.clone(), event_rx, kiss_out_tx, Box::new(input));
+        Self {
+            event_tx,
+            kiss_out_rx: Arc::new(Mutex::new(kiss_out_rx)),
+            partial_kiss_out: Arc::new(Mutex::new(None)),
+        }
+    }
+}
+
+struct PartialKissOut {
+    output: Arc<[u8]>,
+    idx: usize,
 }
 
 impl Read for Soundmodem {
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        self.tnc
-            .read_kiss(buf)
-            .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s)))
+        {
+            let mut partial_kiss_out = self.partial_kiss_out.lock().unwrap();
+            if let Some(partial) = partial_kiss_out.as_mut() {
+                let remaining = partial.output.len() - partial.idx;
+                let to_write = remaining.min(buf.len());
+                buf[0..to_write]
+                    .copy_from_slice(&partial.output[partial.idx..(partial.idx + to_write)]);
+                if to_write == remaining {
+                    *partial_kiss_out = None;
+                } else {
+                    partial.idx += to_write;
+                }
+                return Ok(to_write);
+            }
+        }
+        let output = {
+            let rx = self.kiss_out_rx.lock().unwrap();
+            rx.recv()
+                .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s)))?
+        };
+        let to_write = output.len().min(buf.len());
+        buf[0..to_write].copy_from_slice(&output[0..to_write]);
+        if to_write != output.len() {
+            *self.partial_kiss_out.lock().unwrap() = Some(PartialKissOut {
+                output,
+                idx: to_write,
+            })
+        }
+        Ok(to_write)
     }
 }
 
 impl Write for Soundmodem {
     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-        self.tnc
-            .write_kiss(buf)
-            .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s)))
+        let _ = self.event_tx.try_send(SoundmodemEvent::Kiss(buf.into()));
+        Ok(buf.len())
     }
 
     fn flush(&mut self) -> std::io::Result<()> {
@@ -41,21 +83,69 @@ impl Write for Soundmodem {
 
 impl Tnc for Soundmodem {
     fn try_clone(&mut self) -> Result<Self, TncError> {
-        unimplemented!();
+        Ok(Self {
+            event_tx: self.event_tx.clone(),
+            kiss_out_rx: self.kiss_out_rx.clone(),
+            partial_kiss_out: self.partial_kiss_out.clone(),
+        })
     }
 
     fn start(&mut self) -> Result<(), TncError> {
-        unimplemented!();
+        let _ = self.event_tx.send(SoundmodemEvent::Start);
+        Ok(())
     }
 
     fn close(&mut self) -> Result<(), TncError> {
-        unimplemented!();
+        let _ = self.event_tx.send(SoundmodemEvent::Close);
+        Ok(())
     }
 }
 
 pub enum SoundmodemEvent {
     Kiss(Arc<[u8]>),
     BasebandInput(Arc<[i16]>),
+    Start,
+    Close,
+}
+
+fn spawn_soundmodem_worker(
+    event_tx: SyncSender<SoundmodemEvent>,
+    event_rx: Receiver<SoundmodemEvent>,
+    kiss_out_tx: SyncSender<Arc<[u8]>>,
+    input: Box<dyn InputSource>,
+) {
+    std::thread::spawn(move || {
+        // TODO: should be able to provide a custom Demodulator for a soundmodem
+        let mut demod = SoftDemodulator::new();
+        let mut tnc = SoftTnc::new();
+        let mut buf = [0u8; MAX_FRAME_LEN];
+        while let Ok(ev) = event_rx.recv() {
+            match ev {
+                SoundmodemEvent::Kiss(k) => {
+                    let _n = tnc.write_kiss(&k);
+                    // TODO: what does it mean if we fail to write it all?
+                    // Probably we have to read frames for tx first - revisit this during tx
+                }
+                SoundmodemEvent::BasebandInput(b) => {
+                    for sample in &*b {
+                        if let Some(frame) = demod.demod(*sample) {
+                            tnc.handle_frame(frame);
+                            loop {
+                                let n = tnc.read_kiss(&mut buf);
+                                if n > 0 {
+                                    let _ = kiss_out_tx.try_send(buf[0..n].into());
+                                } else {
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                }
+                SoundmodemEvent::Start => input.start(event_tx.clone()),
+                SoundmodemEvent::Close => break,
+            }
+        }
+    });
 }
 
 pub trait InputSource: Send + Sync + 'static {
@@ -82,6 +172,15 @@ pub struct InputRrcFile {
     end_tx: Mutex<Option<Sender<()>>>,
 }
 
+impl InputRrcFile {
+    pub fn new(path: PathBuf) -> Self {
+        Self {
+            path,
+            end_tx: Mutex::new(None),
+        }
+    }
+}
+
 impl InputSource for InputRrcFile {
     fn start(&self, samples: SyncSender<SoundmodemEvent>) {
         let (end_tx, end_rx) = channel();
index 26f8f4ab7e6764be7c374d31f8c8897022af7062..8b8e5a1c5bf90be43079927b9196a3377c4c9a88 100644 (file)
@@ -26,7 +26,7 @@ impl SoftTnc {
     }
 
     /// Process an individual `Frame` that has been decoded by the modem.
-    pub fn handle_frame(&mut self, frame: Frame) -> Result<(), SoftTncError> {
+    pub fn handle_frame(&mut self, frame: Frame) {
         match frame {
             Frame::Lsf(lsf) => {
                 // A new LSF implies a clean slate.
@@ -126,7 +126,6 @@ impl SoftTnc {
                 }
             }
         }
-        Ok(())
     }
 
     /// Update the number of samples that have been received by the incoming stream, as a form of timekeeping
@@ -144,7 +143,7 @@ impl SoftTnc {
     ///
     /// After each frame input, this should be consumed in a loop until length 0 is returned.
     /// This component will never block. Upstream interface can provide blocking `read()` if desired.
-    pub fn read_kiss(&mut self, target_buf: &mut [u8]) -> Result<usize, SoftTncError> {
+    pub fn read_kiss(&mut self, target_buf: &mut [u8]) -> usize {
         match self.outgoing_kiss.as_mut() {
             Some(outgoing) => {
                 let n = (outgoing.kiss_frame.len - outgoing.sent).min(target_buf.len());
@@ -154,13 +153,13 @@ impl SoftTnc {
                 if outgoing.sent == outgoing.kiss_frame.len {
                     self.outgoing_kiss = None;
                 }
-                Ok(n)
+                n
             }
-            None => Ok(0),
+            None => 0,
         }
     }
 
-    pub fn write_kiss(&mut self, buf: &[u8]) -> Result<usize, SoftTncError> {
+    pub fn write_kiss(&mut self, buf: &[u8]) -> usize {
         let target_buf = self.kiss_buffer.buf_remaining();
         let n = buf.len().min(target_buf.len());
         target_buf[0..n].copy_from_slice(&buf[0..n]);
@@ -168,7 +167,7 @@ impl SoftTnc {
         while let Some(_kiss_frame) = self.kiss_buffer.next_frame() {
             // TODO: handle host-to-TNC message
         }
-        Ok(n)
+        n
     }
 
     fn kiss_to_host(&mut self, kiss_frame: KissFrame) {
@@ -271,10 +270,10 @@ mod tests {
         };
         let mut tnc = SoftTnc::new();
         let mut kiss = KissFrame::new_empty();
-        assert_eq!(tnc.read_kiss(&mut kiss.data), Ok(0));
+        assert_eq!(tnc.read_kiss(&mut kiss.data), 0);
 
-        tnc.handle_frame(Frame::Lsf(lsf)).unwrap();
-        kiss.len = tnc.read_kiss(&mut kiss.data).unwrap();
+        tnc.handle_frame(Frame::Lsf(lsf));
+        kiss.len = tnc.read_kiss(&mut kiss.data);
         assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame);
         assert_eq!(kiss.port().unwrap(), PORT_STREAM);
 
@@ -282,16 +281,16 @@ mod tests {
         let n = kiss.decode_payload(&mut payload_buf).unwrap();
         assert_eq!(n, 30);
 
-        tnc.handle_frame(Frame::Stream(stream1)).unwrap();
-        kiss.len = tnc.read_kiss(&mut kiss.data).unwrap();
+        tnc.handle_frame(Frame::Stream(stream1));
+        kiss.len = tnc.read_kiss(&mut kiss.data);
         assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame);
         assert_eq!(kiss.port().unwrap(), PORT_STREAM);
 
         let n = kiss.decode_payload(&mut payload_buf).unwrap();
         assert_eq!(n, 26);
 
-        tnc.handle_frame(Frame::Stream(stream2)).unwrap();
-        kiss.len = tnc.read_kiss(&mut kiss.data).unwrap();
+        tnc.handle_frame(Frame::Stream(stream2));
+        kiss.len = tnc.read_kiss(&mut kiss.data);
         assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame);
         assert_eq!(kiss.port().unwrap(), PORT_STREAM);
 
@@ -361,9 +360,9 @@ mod tests {
         let mut tnc = SoftTnc::new();
         let mut kiss = KissFrame::new_empty();
         for f in frames {
-            tnc.handle_frame(Frame::Stream(f)).unwrap();
+            tnc.handle_frame(Frame::Stream(f));
         }
-        kiss.len = tnc.read_kiss(&mut kiss.data).unwrap();
+        kiss.len = tnc.read_kiss(&mut kiss.data);
         let mut payload_buf = [0u8; 2048];
         let n = kiss.decode_payload(&mut payload_buf).unwrap();
         assert_eq!(n, 30);
@@ -402,10 +401,10 @@ mod tests {
         };
         let mut tnc = SoftTnc::new();
         let mut kiss = KissFrame::new_empty();
-        assert_eq!(tnc.read_kiss(&mut kiss.data), Ok(0));
+        assert_eq!(tnc.read_kiss(&mut kiss.data), 0);
 
-        tnc.handle_frame(Frame::Lsf(lsf)).unwrap();
-        kiss.len = tnc.read_kiss(&mut kiss.data).unwrap();
+        tnc.handle_frame(Frame::Lsf(lsf));
+        kiss.len = tnc.read_kiss(&mut kiss.data);
         assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame);
         assert_eq!(kiss.port().unwrap(), PORT_STREAM);
 
@@ -413,16 +412,16 @@ mod tests {
         let n = kiss.decode_payload(&mut payload_buf).unwrap();
         assert_eq!(n, 30);
 
-        tnc.handle_frame(Frame::Stream(stream1)).unwrap();
-        kiss.len = tnc.read_kiss(&mut kiss.data).unwrap();
+        tnc.handle_frame(Frame::Stream(stream1));
+        kiss.len = tnc.read_kiss(&mut kiss.data);
         assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame);
         assert_eq!(kiss.port().unwrap(), PORT_STREAM);
 
         let n = kiss.decode_payload(&mut payload_buf).unwrap();
         assert_eq!(n, 26);
 
-        tnc.handle_frame(Frame::Stream(stream3)).unwrap();
-        kiss.len = tnc.read_kiss(&mut kiss.data).unwrap();
+        tnc.handle_frame(Frame::Stream(stream3));
+        kiss.len = tnc.read_kiss(&mut kiss.data);
         assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame);
         assert_eq!(kiss.port().unwrap(), PORT_STREAM);