X-Git-Url: https://code.octet-stream.net/m17rt/blobdiff_plain/b0e7a62e7d8184888184ca9d11464611ed12a97c..9058451e46e4d36264282abe381aa9b6fd2c773f:/m17app/src/soundmodem.rs?ds=inline diff --git a/m17app/src/soundmodem.rs b/m17app/src/soundmodem.rs index b6fbf46..99085a0 100644 --- a/m17app/src/soundmodem.rs +++ b/m17app/src/soundmodem.rs @@ -1,37 +1,90 @@ -use std::io::{self, ErrorKind, Read, Write}; - use crate::tnc::{Tnc, TncError}; +use cpal::traits::DeviceTrait; +use cpal::traits::HostTrait; +use cpal::traits::StreamTrait; +use cpal::{SampleFormat, SampleRate}; use log::debug; +use m17core::kiss::MAX_FRAME_LEN; +use m17core::modem::{Demodulator, Modulator, ModulatorAction, SoftDemodulator, SoftModulator}; use m17core::tnc::SoftTnc; +use std::collections::VecDeque; use std::fs::File; +use std::io::{self, ErrorKind, Read, Write}; use std::path::PathBuf; use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TryRecvError}; +use std::sync::RwLock; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; pub struct Soundmodem { - tnc: SoftTnc, - config: SoundmodemConfig, + event_tx: SyncSender, + kiss_out_rx: Arc>>>, + partial_kiss_out: Arc>>, } -pub struct SoundmodemConfig { - // sound cards, PTT, etc. - input: Box, +impl Soundmodem { + pub fn new_with_input_and_output(input: I, output: O) -> Self { + // must create TNC here + let (event_tx, event_rx) = sync_channel(128); + let (kiss_out_tx, kiss_out_rx) = sync_channel(128); + spawn_soundmodem_worker( + event_tx.clone(), + event_rx, + kiss_out_tx, + Box::new(input), + Box::new(output), + ); + Self { + event_tx, + kiss_out_rx: Arc::new(Mutex::new(kiss_out_rx)), + partial_kiss_out: Arc::new(Mutex::new(None)), + } + } +} + +struct PartialKissOut { + output: Arc<[u8]>, + idx: usize, } impl Read for Soundmodem { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.tnc - .read_kiss(buf) - .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s))) + { + let mut partial_kiss_out = self.partial_kiss_out.lock().unwrap(); + if let Some(partial) = partial_kiss_out.as_mut() { + let remaining = partial.output.len() - partial.idx; + let to_write = remaining.min(buf.len()); + buf[0..to_write] + .copy_from_slice(&partial.output[partial.idx..(partial.idx + to_write)]); + if to_write == remaining { + *partial_kiss_out = None; + } else { + partial.idx += to_write; + } + return Ok(to_write); + } + } + let output = { + let rx = self.kiss_out_rx.lock().unwrap(); + rx.recv() + .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s)))? + }; + let to_write = output.len().min(buf.len()); + buf[0..to_write].copy_from_slice(&output[0..to_write]); + if to_write != output.len() { + *self.partial_kiss_out.lock().unwrap() = Some(PartialKissOut { + output, + idx: to_write, + }) + } + Ok(to_write) } } impl Write for Soundmodem { fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.tnc - .write_kiss(buf) - .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s))) + let _ = self.event_tx.try_send(SoundmodemEvent::Kiss(buf.into())); + Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { @@ -41,21 +94,139 @@ impl Write for Soundmodem { impl Tnc for Soundmodem { fn try_clone(&mut self) -> Result { - unimplemented!(); + Ok(Self { + event_tx: self.event_tx.clone(), + kiss_out_rx: self.kiss_out_rx.clone(), + partial_kiss_out: self.partial_kiss_out.clone(), + }) } fn start(&mut self) -> Result<(), TncError> { - unimplemented!(); + let _ = self.event_tx.send(SoundmodemEvent::Start); + Ok(()) } fn close(&mut self) -> Result<(), TncError> { - unimplemented!(); + let _ = self.event_tx.send(SoundmodemEvent::Close); + Ok(()) } } pub enum SoundmodemEvent { Kiss(Arc<[u8]>), BasebandInput(Arc<[i16]>), + Start, + Close, + DidReadFromOutputBuffer { len: usize, timestamp: Instant }, + OutputUnderrun, +} + +fn spawn_soundmodem_worker( + event_tx: SyncSender, + event_rx: Receiver, + kiss_out_tx: SyncSender>, + input: Box, + output: Box, +) { + std::thread::spawn(move || { + // TODO: should be able to provide a custom Demodulator for a soundmodem + let mut demodulator = SoftDemodulator::new(); + let mut modulator = SoftModulator::new(); + let mut tnc = SoftTnc::new(); + let mut buf = [0u8; MAX_FRAME_LEN]; + let out_buffer = Arc::new(RwLock::new(OutputBuffer::new())); + let mut out_samples = [0i16; 1024]; + let start = Instant::now(); + let mut ptt = false; + while let Ok(ev) = event_rx.recv() { + // Update clock on TNC before we do anything + let sample_time = (start.elapsed().as_nanos() / 48000) as u64; + tnc.set_now(sample_time); + + // Handle event + match ev { + SoundmodemEvent::Kiss(k) => { + let _n = tnc.write_kiss(&k); + // TODO: what does it mean if we fail to write it all? + // Probably we have to read frames for tx first - revisit this during tx + } + SoundmodemEvent::BasebandInput(b) => { + for sample in &*b { + if let Some(frame) = demodulator.demod(*sample) { + tnc.handle_frame(frame); + loop { + let n = tnc.read_kiss(&mut buf); + if n > 0 { + let _ = kiss_out_tx.try_send(buf[0..n].into()); + } else { + break; + } + } + } + } + tnc.set_data_carrier_detect(demodulator.data_carrier_detect()); + } + SoundmodemEvent::Start => { + input.start(event_tx.clone()); + output.start(event_tx.clone(), out_buffer.clone()); + } + SoundmodemEvent::Close => break, + SoundmodemEvent::DidReadFromOutputBuffer { len, timestamp } => { + let (occupied, internal_latency) = { + let out_buffer = out_buffer.read().unwrap(); + (out_buffer.samples.len(), out_buffer.latency) + }; + let internal_latency = (internal_latency.as_secs_f32() * 48000.0) as usize; + let dynamic_latency = + len.saturating_sub((timestamp.elapsed().as_secs_f32() * 48000.0) as usize); + modulator.update_output_buffer( + occupied, + 48000, + internal_latency + dynamic_latency, + ); + } + SoundmodemEvent::OutputUnderrun => { + // TODO: cancel transmission, send empty data frame to host + } + } + + // Update PTT state + let new_ptt = tnc.ptt(); + if new_ptt != ptt { + if new_ptt { + // turn it on + } else { + // turn it off + } + } + ptt = new_ptt; + + // Let the modulator do what it wants + while let Some(action) = modulator.run() { + match action { + ModulatorAction::SetIdle(idling) => { + out_buffer.write().unwrap().idling = idling; + } + ModulatorAction::GetNextFrame => { + modulator.provide_next_frame(tnc.read_tx_frame()); + } + ModulatorAction::ReadOutput => loop { + let n = modulator.read_output_samples(&mut out_samples); + if n == 0 { + break; + } + let mut out_buffer = out_buffer.write().unwrap(); + for s in &out_samples[0..n] { + out_buffer.samples.push_back(*s); + } + }, + ModulatorAction::TransmissionWillEnd(in_samples) => { + tnc.set_tx_end_time(in_samples); + } + } + } + } + }); } pub trait InputSource: Send + Sync + 'static { @@ -64,16 +235,69 @@ pub trait InputSource: Send + Sync + 'static { } pub struct InputSoundcard { - cpal_name: String, + // TODO: allow for inversion both here and in output + cpal_name: Option, + end_tx: Mutex>>, +} + +impl InputSoundcard { + pub fn new() -> Self { + Self { + cpal_name: None, + end_tx: Mutex::new(None), + } + } + + pub fn new_with_card(card_name: String) -> Self { + Self { + cpal_name: Some(card_name), + end_tx: Mutex::new(None), + } + } } impl InputSource for InputSoundcard { fn start(&self, samples: SyncSender) { - todo!() + let (end_tx, end_rx) = channel(); + let cpal_name = self.cpal_name.clone(); + std::thread::spawn(move || { + let host = cpal::default_host(); + let device = if let Some(name) = cpal_name.as_deref() { + host.input_devices() + .unwrap() + .find(|d| d.name().unwrap() == name) + .unwrap() + } else { + host.default_input_device().unwrap() + }; + let mut configs = device.supported_input_configs().unwrap(); + let config = configs + .find(|c| c.channels() == 1 && c.sample_format() == SampleFormat::I16) + .unwrap() + .with_sample_rate(SampleRate(48000)); + let stream = device + .build_input_stream( + &config.into(), + move |data: &[i16], _info: &cpal::InputCallbackInfo| { + debug!("input has given us {} samples", data.len()); + let out: Vec = data.iter().map(|s| *s).collect(); + let _ = samples.try_send(SoundmodemEvent::BasebandInput(out.into())); + }, + |e| { + // TODO: abort? + debug!("error occurred in soundcard input: {e:?}"); + }, + None, + ) + .unwrap(); + stream.play().unwrap(); + let _ = end_rx.recv(); + }); + *self.end_tx.lock().unwrap() = Some(end_tx); } fn close(&self) { - todo!() + let _ = self.end_tx.lock().unwrap().take(); } } @@ -82,6 +306,15 @@ pub struct InputRrcFile { end_tx: Mutex>>, } +impl InputRrcFile { + pub fn new(path: PathBuf) -> Self { + Self { + path, + end_tx: Mutex::new(None), + } + } +} + impl InputSource for InputRrcFile { fn start(&self, samples: SyncSender) { let (end_tx, end_rx) = channel(); @@ -126,3 +359,135 @@ impl InputSource for InputRrcFile { let _ = self.end_tx.lock().unwrap().take(); } } + +pub struct OutputBuffer { + idling: bool, + // TODO: something more efficient + samples: VecDeque, + latency: Duration, +} + +impl OutputBuffer { + pub fn new() -> Self { + Self { + idling: true, + samples: VecDeque::new(), + latency: Duration::ZERO, + } + } +} + +pub trait OutputSink: Send + Sync + 'static { + fn start(&self, event_tx: SyncSender, buffer: Arc>); + fn close(&self); +} + +pub struct OutputRrcFile { + path: PathBuf, + end_tx: Mutex>>, +} + +impl OutputRrcFile { + pub fn new(path: PathBuf) -> Self { + Self { + path, + end_tx: Mutex::new(None), + } + } +} + +impl OutputSink for OutputRrcFile { + fn start(&self, event_tx: SyncSender, buffer: Arc>) { + let (end_tx, end_rx) = channel(); + let path = self.path.clone(); + std::thread::spawn(move || { + // TODO: error handling + let mut file = File::create(path).unwrap(); + + // assuming 48 kHz for now + const TICK: Duration = Duration::from_millis(25); + const SAMPLES_PER_TICK: usize = 1200; + + // flattened BE i16s for writing + let mut buf = [0u8; SAMPLES_PER_TICK * 2]; + let mut next_tick = Instant::now() + TICK; + + loop { + std::thread::sleep(next_tick.duration_since(Instant::now())); + next_tick = next_tick + TICK; + if end_rx.try_recv() != Err(TryRecvError::Empty) { + break; + } + + let mut buffer = buffer.write().unwrap(); + for out in buf.chunks_mut(2) { + if let Some(s) = buffer.samples.pop_front() { + let be = s.to_be_bytes(); + out.copy_from_slice(&[be[0], be[1]]); + } else if buffer.idling { + out.copy_from_slice(&[0, 0]); + } else { + debug!("output rrc file had underrun"); + let _ = event_tx.send(SoundmodemEvent::OutputUnderrun); + break; + } + } + if let Err(e) = file.write_all(&buf) { + debug!("failed to write to rrc file: {e:?}"); + break; + } + } + }); + *self.end_tx.lock().unwrap() = Some(end_tx); + } + + fn close(&self) { + let _ = self.end_tx.lock().unwrap().take(); + } +} + +pub struct NullOutputSink { + end_tx: Mutex>>, +} + +impl NullOutputSink { + pub fn new() -> Self { + Self { + end_tx: Mutex::new(None), + } + } +} + +impl OutputSink for NullOutputSink { + fn start(&self, event_tx: SyncSender, buffer: Arc>) { + let (end_tx, end_rx) = channel(); + std::thread::spawn(move || { + // assuming 48 kHz for now + const TICK: Duration = Duration::from_millis(25); + const SAMPLES_PER_TICK: usize = 1200; + let mut next_tick = Instant::now() + TICK; + + loop { + std::thread::sleep(next_tick.duration_since(Instant::now())); + next_tick = next_tick + TICK; + if end_rx.try_recv() != Err(TryRecvError::Empty) { + break; + } + + let mut buffer = buffer.write().unwrap(); + for _ in 0..SAMPLES_PER_TICK { + if !buffer.samples.pop_front().is_some() && !buffer.idling { + debug!("null output had underrun"); + let _ = event_tx.send(SoundmodemEvent::OutputUnderrun); + break; + } + } + } + }); + *self.end_tx.lock().unwrap() = Some(end_tx); + } + + fn close(&self) { + let _ = self.end_tx.lock().unwrap().take(); + } +}