From: Thomas Karpiniec Date: Sun, 5 Jan 2025 10:10:01 +0000 (+1100) Subject: Soundmodem rx works in realtime end-to-end using file source X-Git-Url: https://code.octet-stream.net/m17rt/commitdiff_plain/ed89ed42024f30e94e27084ec2b9ee0acb038b62?ds=sidebyside Soundmodem rx works in realtime end-to-end using file source --- diff --git a/demod/src/main.rs b/demod/src/main.rs index 78ff2b0..0f7f031 100755 --- a/demod/src/main.rs +++ b/demod/src/main.rs @@ -1,114 +1,19 @@ -use cpal::traits::DeviceTrait; -use cpal::traits::HostTrait; -use cpal::traits::StreamTrait; -use cpal::{SampleFormat, SampleRate}; -use log::debug; -use m17core::{ - modem::{Demodulator, SoftDemodulator}, - protocol::{Frame, LichCollection}, -}; -use std::{fs::File, io::Read}; - -pub fn run_my_decode() { - let file = File::open("../../Data/test_vk7xt.rrc").unwrap(); - let mut input = file; - let mut baseband = vec![]; - input.read_to_end(&mut baseband).unwrap(); - - let mut lich = LichCollection::new(); - let mut codec2_data = vec![]; - let mut modem = SoftDemodulator::new(); - - for pair in baseband.chunks(2) { - let sample: i16 = i16::from_le_bytes([pair[0], pair[1]]); - if let Some(frame) = modem.demod(sample) { - debug!("Modem demodulated frame: {:?}", frame); - if let Frame::Stream(s) = frame { - for b in s.stream_data { - codec2_data.push(b); - - let valid_before = lich.valid_segments(); - lich.set_segment(s.lich_idx, s.lich_part); - let valid_after = lich.valid_segments(); - if valid_before != valid_after { - debug!("Valid lich segments: {}", lich.valid_segments()); - } - if valid_before == 5 && valid_after == 6 { - if let Some(l) = lich.try_assemble() { - debug!("Assembled complete lich: {l:?}"); - } - } - } - if s.end_of_stream { - debug!("len of codec2 data: {}", codec2_data.len()); - assert_eq!(codec2_data.len(), 1504); - - let samples = - m17codec2::decode_codec2(&codec2_data, "../../Data/speech_out.raw"); - let host = cpal::default_host(); - let def = host.default_output_device().unwrap(); - let mut configs = def.supported_output_configs().unwrap(); - let config = configs - .find(|c| c.channels() == 1 && c.sample_format() == SampleFormat::I16) - .unwrap() - .with_sample_rate(SampleRate(8000)); - let mut counter = 0; - let mut index = 0; - let stream = def - .build_output_stream( - &config.into(), - move |data: &mut [i16], info: &cpal::OutputCallbackInfo| { - debug!( - "callback {:?} playback {:?}", - info.timestamp().callback, - info.timestamp().playback - ); - println!( - "iteration {counter} asked for {} samples at time {}", - data.len(), - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() - ); - counter += 1; - let qty = data.len().min(samples.len() - index); - println!("providing {qty} samples"); - data[0..qty].copy_from_slice(&samples[index..(index + qty)]); - index += qty; - }, - move |_e| { - println!("error occurred"); - }, - None, - ) - .unwrap(); - stream.play().unwrap(); - - std::thread::sleep(std::time::Duration::from_secs(10)); - } - } - } - } -} - -pub fn cpal_test() { - let host = cpal::default_host(); - for d in host.devices().unwrap() { - println!("Found card: {:?}", d.name().unwrap()); - } - let def = host.default_output_device().unwrap(); - println!("the default output device is: {}", def.name().unwrap()); - - for c in def.supported_output_configs().unwrap() { - println!("config supported: {:?}", c); - } - - println!("all supported output configs shown"); +use m17app::app::M17App; +use m17app::soundmodem::{InputRrcFile, Soundmodem}; +use m17codec2::Codec2Adapter; +use std::path::PathBuf; + +pub fn m17app_test() { + let path = PathBuf::from("../../Data/test_vk7xt.rrc"); + let source = InputRrcFile::new(path); + let soundmodem = Soundmodem::new_with_input(source); + let app = M17App::new(soundmodem); + app.add_stream_adapter(Codec2Adapter::new()); + app.start(); + std::thread::sleep(std::time::Duration::from_secs(15)); } fn main() { env_logger::init(); - run_my_decode(); - //cpal_test(); + m17app_test(); } diff --git a/m17app/src/soundmodem.rs b/m17app/src/soundmodem.rs index b6fbf46..8b7f80f 100644 --- a/m17app/src/soundmodem.rs +++ b/m17app/src/soundmodem.rs @@ -2,6 +2,8 @@ use std::io::{self, ErrorKind, Read, Write}; use crate::tnc::{Tnc, TncError}; use log::debug; +use m17core::kiss::MAX_FRAME_LEN; +use m17core::modem::{Demodulator, SoftDemodulator}; use m17core::tnc::SoftTnc; use std::fs::File; use std::path::PathBuf; @@ -10,28 +12,68 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; pub struct Soundmodem { - tnc: SoftTnc, - config: SoundmodemConfig, + event_tx: SyncSender, + kiss_out_rx: Arc>>>, + partial_kiss_out: Arc>>, } -pub struct SoundmodemConfig { - // sound cards, PTT, etc. - input: Box, +impl Soundmodem { + pub fn new_with_input(input: T) -> Self { + // must create TNC here + let (event_tx, event_rx) = sync_channel(128); + let (kiss_out_tx, kiss_out_rx) = sync_channel(128); + spawn_soundmodem_worker(event_tx.clone(), event_rx, kiss_out_tx, Box::new(input)); + Self { + event_tx, + kiss_out_rx: Arc::new(Mutex::new(kiss_out_rx)), + partial_kiss_out: Arc::new(Mutex::new(None)), + } + } +} + +struct PartialKissOut { + output: Arc<[u8]>, + idx: usize, } impl Read for Soundmodem { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.tnc - .read_kiss(buf) - .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s))) + { + let mut partial_kiss_out = self.partial_kiss_out.lock().unwrap(); + if let Some(partial) = partial_kiss_out.as_mut() { + let remaining = partial.output.len() - partial.idx; + let to_write = remaining.min(buf.len()); + buf[0..to_write] + .copy_from_slice(&partial.output[partial.idx..(partial.idx + to_write)]); + if to_write == remaining { + *partial_kiss_out = None; + } else { + partial.idx += to_write; + } + return Ok(to_write); + } + } + let output = { + let rx = self.kiss_out_rx.lock().unwrap(); + rx.recv() + .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s)))? + }; + let to_write = output.len().min(buf.len()); + buf[0..to_write].copy_from_slice(&output[0..to_write]); + if to_write != output.len() { + *self.partial_kiss_out.lock().unwrap() = Some(PartialKissOut { + output, + idx: to_write, + }) + } + Ok(to_write) } } impl Write for Soundmodem { fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.tnc - .write_kiss(buf) - .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s))) + let _ = self.event_tx.try_send(SoundmodemEvent::Kiss(buf.into())); + Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { @@ -41,21 +83,69 @@ impl Write for Soundmodem { impl Tnc for Soundmodem { fn try_clone(&mut self) -> Result { - unimplemented!(); + Ok(Self { + event_tx: self.event_tx.clone(), + kiss_out_rx: self.kiss_out_rx.clone(), + partial_kiss_out: self.partial_kiss_out.clone(), + }) } fn start(&mut self) -> Result<(), TncError> { - unimplemented!(); + let _ = self.event_tx.send(SoundmodemEvent::Start); + Ok(()) } fn close(&mut self) -> Result<(), TncError> { - unimplemented!(); + let _ = self.event_tx.send(SoundmodemEvent::Close); + Ok(()) } } pub enum SoundmodemEvent { Kiss(Arc<[u8]>), BasebandInput(Arc<[i16]>), + Start, + Close, +} + +fn spawn_soundmodem_worker( + event_tx: SyncSender, + event_rx: Receiver, + kiss_out_tx: SyncSender>, + input: Box, +) { + std::thread::spawn(move || { + // TODO: should be able to provide a custom Demodulator for a soundmodem + let mut demod = SoftDemodulator::new(); + let mut tnc = SoftTnc::new(); + let mut buf = [0u8; MAX_FRAME_LEN]; + while let Ok(ev) = event_rx.recv() { + match ev { + SoundmodemEvent::Kiss(k) => { + let _n = tnc.write_kiss(&k); + // TODO: what does it mean if we fail to write it all? + // Probably we have to read frames for tx first - revisit this during tx + } + SoundmodemEvent::BasebandInput(b) => { + for sample in &*b { + if let Some(frame) = demod.demod(*sample) { + tnc.handle_frame(frame); + loop { + let n = tnc.read_kiss(&mut buf); + if n > 0 { + let _ = kiss_out_tx.try_send(buf[0..n].into()); + } else { + break; + } + } + } + } + } + SoundmodemEvent::Start => input.start(event_tx.clone()), + SoundmodemEvent::Close => break, + } + } + }); } pub trait InputSource: Send + Sync + 'static { @@ -82,6 +172,15 @@ pub struct InputRrcFile { end_tx: Mutex>>, } +impl InputRrcFile { + pub fn new(path: PathBuf) -> Self { + Self { + path, + end_tx: Mutex::new(None), + } + } +} + impl InputSource for InputRrcFile { fn start(&self, samples: SyncSender) { let (end_tx, end_rx) = channel(); diff --git a/m17core/src/tnc.rs b/m17core/src/tnc.rs index 26f8f4a..8b8e5a1 100644 --- a/m17core/src/tnc.rs +++ b/m17core/src/tnc.rs @@ -26,7 +26,7 @@ impl SoftTnc { } /// Process an individual `Frame` that has been decoded by the modem. - pub fn handle_frame(&mut self, frame: Frame) -> Result<(), SoftTncError> { + pub fn handle_frame(&mut self, frame: Frame) { match frame { Frame::Lsf(lsf) => { // A new LSF implies a clean slate. @@ -126,7 +126,6 @@ impl SoftTnc { } } } - Ok(()) } /// Update the number of samples that have been received by the incoming stream, as a form of timekeeping @@ -144,7 +143,7 @@ impl SoftTnc { /// /// After each frame input, this should be consumed in a loop until length 0 is returned. /// This component will never block. Upstream interface can provide blocking `read()` if desired. - pub fn read_kiss(&mut self, target_buf: &mut [u8]) -> Result { + pub fn read_kiss(&mut self, target_buf: &mut [u8]) -> usize { match self.outgoing_kiss.as_mut() { Some(outgoing) => { let n = (outgoing.kiss_frame.len - outgoing.sent).min(target_buf.len()); @@ -154,13 +153,13 @@ impl SoftTnc { if outgoing.sent == outgoing.kiss_frame.len { self.outgoing_kiss = None; } - Ok(n) + n } - None => Ok(0), + None => 0, } } - pub fn write_kiss(&mut self, buf: &[u8]) -> Result { + pub fn write_kiss(&mut self, buf: &[u8]) -> usize { let target_buf = self.kiss_buffer.buf_remaining(); let n = buf.len().min(target_buf.len()); target_buf[0..n].copy_from_slice(&buf[0..n]); @@ -168,7 +167,7 @@ impl SoftTnc { while let Some(_kiss_frame) = self.kiss_buffer.next_frame() { // TODO: handle host-to-TNC message } - Ok(n) + n } fn kiss_to_host(&mut self, kiss_frame: KissFrame) { @@ -271,10 +270,10 @@ mod tests { }; let mut tnc = SoftTnc::new(); let mut kiss = KissFrame::new_empty(); - assert_eq!(tnc.read_kiss(&mut kiss.data), Ok(0)); + assert_eq!(tnc.read_kiss(&mut kiss.data), 0); - tnc.handle_frame(Frame::Lsf(lsf)).unwrap(); - kiss.len = tnc.read_kiss(&mut kiss.data).unwrap(); + tnc.handle_frame(Frame::Lsf(lsf)); + kiss.len = tnc.read_kiss(&mut kiss.data); assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame); assert_eq!(kiss.port().unwrap(), PORT_STREAM); @@ -282,16 +281,16 @@ mod tests { let n = kiss.decode_payload(&mut payload_buf).unwrap(); assert_eq!(n, 30); - tnc.handle_frame(Frame::Stream(stream1)).unwrap(); - kiss.len = tnc.read_kiss(&mut kiss.data).unwrap(); + tnc.handle_frame(Frame::Stream(stream1)); + kiss.len = tnc.read_kiss(&mut kiss.data); assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame); assert_eq!(kiss.port().unwrap(), PORT_STREAM); let n = kiss.decode_payload(&mut payload_buf).unwrap(); assert_eq!(n, 26); - tnc.handle_frame(Frame::Stream(stream2)).unwrap(); - kiss.len = tnc.read_kiss(&mut kiss.data).unwrap(); + tnc.handle_frame(Frame::Stream(stream2)); + kiss.len = tnc.read_kiss(&mut kiss.data); assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame); assert_eq!(kiss.port().unwrap(), PORT_STREAM); @@ -361,9 +360,9 @@ mod tests { let mut tnc = SoftTnc::new(); let mut kiss = KissFrame::new_empty(); for f in frames { - tnc.handle_frame(Frame::Stream(f)).unwrap(); + tnc.handle_frame(Frame::Stream(f)); } - kiss.len = tnc.read_kiss(&mut kiss.data).unwrap(); + kiss.len = tnc.read_kiss(&mut kiss.data); let mut payload_buf = [0u8; 2048]; let n = kiss.decode_payload(&mut payload_buf).unwrap(); assert_eq!(n, 30); @@ -402,10 +401,10 @@ mod tests { }; let mut tnc = SoftTnc::new(); let mut kiss = KissFrame::new_empty(); - assert_eq!(tnc.read_kiss(&mut kiss.data), Ok(0)); + assert_eq!(tnc.read_kiss(&mut kiss.data), 0); - tnc.handle_frame(Frame::Lsf(lsf)).unwrap(); - kiss.len = tnc.read_kiss(&mut kiss.data).unwrap(); + tnc.handle_frame(Frame::Lsf(lsf)); + kiss.len = tnc.read_kiss(&mut kiss.data); assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame); assert_eq!(kiss.port().unwrap(), PORT_STREAM); @@ -413,16 +412,16 @@ mod tests { let n = kiss.decode_payload(&mut payload_buf).unwrap(); assert_eq!(n, 30); - tnc.handle_frame(Frame::Stream(stream1)).unwrap(); - kiss.len = tnc.read_kiss(&mut kiss.data).unwrap(); + tnc.handle_frame(Frame::Stream(stream1)); + kiss.len = tnc.read_kiss(&mut kiss.data); assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame); assert_eq!(kiss.port().unwrap(), PORT_STREAM); let n = kiss.decode_payload(&mut payload_buf).unwrap(); assert_eq!(n, 26); - tnc.handle_frame(Frame::Stream(stream3)).unwrap(); - kiss.len = tnc.read_kiss(&mut kiss.data).unwrap(); + tnc.handle_frame(Frame::Stream(stream3)); + kiss.len = tnc.read_kiss(&mut kiss.data); assert_eq!(kiss.command().unwrap(), KissCommand::DataFrame); assert_eq!(kiss.port().unwrap(), PORT_STREAM);