]> code.octet-stream.net Git - m17rt/blob - m17app/src/soundmodem.rs
add RfToVoice converter for reflector tx path
[m17rt] / m17app / src / soundmodem.rs
1 use crate::error::{M17Error, SoundmodemError};
2 use crate::tnc::{Tnc, TncError};
3 use crate::util::out_buffer::OutBuffer;
4 use m17core::kiss::MAX_FRAME_LEN;
5 use m17core::modem::{Demodulator, Modulator, ModulatorAction, SoftDemodulator, SoftModulator};
6 use m17core::tnc::SoftTnc;
7 use std::collections::VecDeque;
8 use std::fmt::Display;
9 use std::fs::File;
10 use std::io::{self, Read, Write};
11 use std::path::PathBuf;
12 use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TryRecvError};
13 use std::sync::RwLock;
14 use std::sync::{Arc, Mutex};
15 use std::time::{Duration, Instant};
16 use thiserror::Error;
17
18 pub struct Soundmodem {
19 event_tx: SyncSender<SoundmodemEvent>,
20 kiss_out: OutBuffer,
21 }
22
23 impl Soundmodem {
24 pub fn new<I: InputSource, O: OutputSink, P: Ptt, E: ErrorHandler>(
25 input: I,
26 output: O,
27 ptt: P,
28 error: E,
29 ) -> Self {
30 let (event_tx, event_rx) = sync_channel(128);
31 let (kiss_out_tx, kiss_out_rx) = sync_channel(128);
32 spawn_soundmodem_worker(
33 event_tx.clone(),
34 event_rx,
35 kiss_out_tx,
36 Box::new(input),
37 Box::new(output),
38 Box::new(ptt),
39 Box::new(error),
40 );
41 Self {
42 event_tx,
43 kiss_out: OutBuffer::new(kiss_out_rx),
44 }
45 }
46 }
47
48 #[derive(Debug, Clone, Copy)]
49 pub enum ErrorSource {
50 Input,
51 Output,
52 Ptt,
53 }
54
55 impl Display for ErrorSource {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 match self {
58 Self::Input => write!(f, "Input"),
59 Self::Output => write!(f, "Output"),
60 Self::Ptt => write!(f, "PTT"),
61 }
62 }
63 }
64
65 pub trait ErrorHandler: Send + Sync + 'static {
66 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError);
67 }
68
69 impl<F> ErrorHandler for F
70 where
71 F: FnMut(ErrorSource, SoundmodemError) + Send + Sync + 'static,
72 {
73 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError) {
74 self(source, err)
75 }
76 }
77
78 /// Soundmodem errors will be ignored.
79 pub struct NullErrorHandler;
80
81 impl NullErrorHandler {
82 pub fn new() -> Self {
83 Self {}
84 }
85 }
86
87 impl Default for NullErrorHandler {
88 fn default() -> Self {
89 Self::new()
90 }
91 }
92
93 impl ErrorHandler for NullErrorHandler {
94 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError) {
95 let _ = source;
96 let _ = err;
97 }
98 }
99
100 /// Soundmodem errors will be logged at DEBUG level via the `log` crate.
101 pub struct LogErrorHandler;
102
103 impl LogErrorHandler {
104 pub fn new() -> Self {
105 Self {}
106 }
107 }
108
109 impl Default for LogErrorHandler {
110 fn default() -> Self {
111 Self::new()
112 }
113 }
114
115 impl ErrorHandler for LogErrorHandler {
116 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError) {
117 log::debug!("Soundmodem error: {source} - {err}");
118 }
119 }
120
121 /// Soundmodem errors will be logged to stdout.
122 pub struct StdoutErrorHandler;
123
124 impl StdoutErrorHandler {
125 pub fn new() -> Self {
126 Self {}
127 }
128 }
129
130 impl Default for StdoutErrorHandler {
131 fn default() -> Self {
132 Self::new()
133 }
134 }
135
136 impl ErrorHandler for StdoutErrorHandler {
137 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError) {
138 println!("Soundmodem error: {source} - {err}");
139 }
140 }
141
142 #[derive(Clone)]
143 pub struct SoundmodemErrorSender {
144 source: ErrorSource,
145 event_tx: SyncSender<SoundmodemEvent>,
146 }
147
148 impl SoundmodemErrorSender {
149 pub fn send_error<E: Into<SoundmodemError>>(&self, err: E) {
150 let _ = self
151 .event_tx
152 .send(SoundmodemEvent::RuntimeError(self.source, err.into()));
153 }
154 }
155
156 impl Read for Soundmodem {
157 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
158 self.kiss_out.read(buf)
159 }
160 }
161
162 impl Write for Soundmodem {
163 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
164 let _ = self.event_tx.try_send(SoundmodemEvent::Kiss(buf.into()));
165 Ok(buf.len())
166 }
167
168 fn flush(&mut self) -> std::io::Result<()> {
169 Ok(())
170 }
171 }
172
173 impl Tnc for Soundmodem {
174 fn try_clone(&mut self) -> Result<Self, TncError> {
175 Ok(Self {
176 event_tx: self.event_tx.clone(),
177 kiss_out: self.kiss_out.clone(),
178 })
179 }
180
181 fn start(&mut self) {
182 let _ = self.event_tx.send(SoundmodemEvent::Start);
183 }
184
185 fn close(&mut self) {
186 let _ = self.event_tx.send(SoundmodemEvent::Close);
187 }
188 }
189
190 pub enum SoundmodemEvent {
191 Kiss(Arc<[u8]>),
192 BasebandInput(Arc<[i16]>),
193 Start,
194 Close,
195 DidReadFromOutputBuffer { len: usize, timestamp: Instant },
196 OutputUnderrun,
197 RuntimeError(ErrorSource, SoundmodemError),
198 }
199
200 fn spawn_soundmodem_worker(
201 event_tx: SyncSender<SoundmodemEvent>,
202 event_rx: Receiver<SoundmodemEvent>,
203 kiss_out_tx: SyncSender<Arc<[u8]>>,
204 input: Box<dyn InputSource>,
205 output: Box<dyn OutputSink>,
206 mut ptt_driver: Box<dyn Ptt>,
207 mut error_handler: Box<dyn ErrorHandler>,
208 ) {
209 std::thread::spawn(move || {
210 // TODO: should be able to provide a custom Demodulator for a soundmodem
211 let mut demodulator = SoftDemodulator::new();
212 let mut modulator = SoftModulator::new();
213 let mut tnc = SoftTnc::new();
214 let mut buf = [0u8; MAX_FRAME_LEN];
215 let out_buffer = Arc::new(RwLock::new(OutputBuffer::new()));
216 let mut out_samples = [0i16; 1024];
217 let start = Instant::now();
218 let mut ptt = false;
219 while let Ok(ev) = event_rx.recv() {
220 // Update clock on TNC before we do anything
221 let sample_time = start.elapsed();
222 let secs = sample_time.as_secs();
223 let nanos = sample_time.subsec_nanos();
224 // Accurate to within approx 1 sample
225 let now_samples = 48000 * secs + (nanos as u64 / 20833);
226 tnc.set_now(now_samples);
227
228 // Handle event
229 match ev {
230 SoundmodemEvent::Kiss(k) => {
231 let _n = tnc.write_kiss(&k);
232 // TODO: what does it mean if we fail to write it all?
233 // Probably we have to read frames for tx first - revisit this during tx
234 }
235 SoundmodemEvent::BasebandInput(b) => {
236 for sample in &*b {
237 if let Some(frame) = demodulator.demod(*sample) {
238 tnc.handle_frame(frame);
239 loop {
240 let n = tnc.read_kiss(&mut buf);
241 if n > 0 {
242 let _ = kiss_out_tx.try_send(buf[0..n].into());
243 } else {
244 break;
245 }
246 }
247 }
248 }
249 tnc.set_data_carrier_detect(demodulator.data_carrier_detect());
250 }
251 SoundmodemEvent::Start => {
252 let input_errors = SoundmodemErrorSender {
253 source: ErrorSource::Input,
254 event_tx: event_tx.clone(),
255 };
256 input.start(event_tx.clone(), input_errors);
257 let output_errors = SoundmodemErrorSender {
258 source: ErrorSource::Output,
259 event_tx: event_tx.clone(),
260 };
261 output.start(event_tx.clone(), out_buffer.clone(), output_errors);
262 }
263 SoundmodemEvent::Close => {
264 input.close();
265 output.close();
266 if let Err(e) = ptt_driver.ptt_off() {
267 error_handler.soundmodem_error(ErrorSource::Ptt, e);
268 }
269 break;
270 }
271 SoundmodemEvent::DidReadFromOutputBuffer { len, timestamp } => {
272 let (occupied, internal_latency) = {
273 let out_buffer = out_buffer.read().unwrap();
274 (out_buffer.samples.len(), out_buffer.latency)
275 };
276 let internal_latency = (internal_latency.as_secs_f32() * 48000.0) as usize;
277 let dynamic_latency =
278 len.saturating_sub((timestamp.elapsed().as_secs_f32() * 48000.0) as usize);
279 modulator.update_output_buffer(
280 occupied,
281 48000,
282 internal_latency + dynamic_latency,
283 );
284 }
285 SoundmodemEvent::OutputUnderrun => {
286 log::debug!("output underrun");
287 // TODO: cancel transmission, send empty data frame to host
288 }
289 SoundmodemEvent::RuntimeError(source, err) => {
290 error_handler.soundmodem_error(source, err);
291 }
292 }
293
294 // Update PTT state
295 let new_ptt = tnc.ptt();
296 if new_ptt != ptt {
297 if new_ptt {
298 if let Err(e) = ptt_driver.ptt_on() {
299 error_handler.soundmodem_error(ErrorSource::Ptt, e);
300 }
301 } else if let Err(e) = ptt_driver.ptt_off() {
302 error_handler.soundmodem_error(ErrorSource::Ptt, e);
303 }
304 }
305 ptt = new_ptt;
306
307 // Let the modulator do what it wants
308 while let Some(action) = modulator.run() {
309 match action {
310 ModulatorAction::SetIdle(idling) => {
311 out_buffer.write().unwrap().idling = idling;
312 }
313 ModulatorAction::GetNextFrame => {
314 modulator.provide_next_frame(tnc.read_tx_frame());
315 }
316 ModulatorAction::ReadOutput => loop {
317 let n = modulator.read_output_samples(&mut out_samples);
318 if n == 0 {
319 break;
320 }
321 let mut out_buffer = out_buffer.write().unwrap();
322 for s in &out_samples[0..n] {
323 out_buffer.samples.push_back(*s);
324 }
325 },
326 ModulatorAction::TransmissionWillEnd(in_samples) => {
327 tnc.set_tx_end_time(in_samples);
328 }
329 }
330 }
331 }
332 });
333 }
334
335 pub trait InputSource: Send + Sync + 'static {
336 fn start(&self, samples: SyncSender<SoundmodemEvent>, errors: SoundmodemErrorSender);
337 fn close(&self);
338 }
339
340 pub struct InputRrcFile {
341 baseband: Arc<[u8]>,
342 end_tx: Mutex<Option<Sender<()>>>,
343 }
344
345 impl InputRrcFile {
346 pub fn new(path: PathBuf) -> Result<Self, M17Error> {
347 let mut file = File::open(&path).map_err(|_| M17Error::InvalidRrcPath(path.clone()))?;
348 let mut baseband = vec![];
349 file.read_to_end(&mut baseband)
350 .map_err(|_| M17Error::RrcReadFailed(path))?;
351 Ok(Self {
352 baseband: baseband.into(),
353 end_tx: Mutex::new(None),
354 })
355 }
356 }
357
358 impl InputSource for InputRrcFile {
359 fn start(&self, samples: SyncSender<SoundmodemEvent>, errors: SoundmodemErrorSender) {
360 let (end_tx, end_rx) = channel();
361 let baseband = self.baseband.clone();
362 std::thread::spawn(move || {
363 // assuming 48 kHz for now
364 const TICK: Duration = Duration::from_millis(25);
365 const SAMPLES_PER_TICK: usize = 1200;
366
367 let mut next_tick = Instant::now() + TICK;
368 let mut buf = [0i16; SAMPLES_PER_TICK];
369 let mut idx = 0;
370
371 for sample in baseband
372 .chunks(2)
373 .map(|pair| i16::from_le_bytes([pair[0], pair[1]]))
374 {
375 buf[idx] = sample;
376 idx += 1;
377 if idx == SAMPLES_PER_TICK {
378 if samples
379 .try_send(SoundmodemEvent::BasebandInput(buf.into()))
380 .is_err()
381 {
382 errors.send_error(InputRrcError::Overflow);
383 }
384 next_tick += TICK;
385 idx = 0;
386 std::thread::sleep(next_tick.duration_since(Instant::now()));
387 }
388 if end_rx.try_recv() != Err(TryRecvError::Empty) {
389 break;
390 }
391 }
392 });
393 *self.end_tx.lock().unwrap() = Some(end_tx);
394 }
395
396 fn close(&self) {
397 let _ = self.end_tx.lock().unwrap().take();
398 }
399 }
400
401 #[derive(Debug, Error)]
402 pub enum InputRrcError {
403 #[error("overflow occurred feeding sample to soundmodem")]
404 Overflow,
405 }
406
407 pub struct NullInputSource {
408 end_tx: Mutex<Option<Sender<()>>>,
409 }
410
411 impl NullInputSource {
412 pub fn new() -> Self {
413 Self {
414 end_tx: Mutex::new(None),
415 }
416 }
417 }
418
419 impl InputSource for NullInputSource {
420 fn start(&self, samples: SyncSender<SoundmodemEvent>, errors: SoundmodemErrorSender) {
421 let (end_tx, end_rx) = channel();
422 std::thread::spawn(move || {
423 // assuming 48 kHz for now
424 const TICK: Duration = Duration::from_millis(25);
425 const SAMPLES_PER_TICK: usize = 1200;
426 let mut next_tick = Instant::now() + TICK;
427
428 loop {
429 std::thread::sleep(next_tick.duration_since(Instant::now()));
430 next_tick += TICK;
431 if end_rx.try_recv() != Err(TryRecvError::Empty) {
432 break;
433 }
434 if samples
435 .try_send(SoundmodemEvent::BasebandInput(
436 [0i16; SAMPLES_PER_TICK].into(),
437 ))
438 .is_err()
439 {
440 errors.send_error(NullInputError::Overflow);
441 }
442 }
443 });
444 *self.end_tx.lock().unwrap() = Some(end_tx);
445 }
446
447 fn close(&self) {
448 let _ = self.end_tx.lock().unwrap().take();
449 }
450 }
451
452 #[derive(Debug, Error)]
453 pub enum NullInputError {
454 #[error("overflow occurred feeding sample to soundmodem")]
455 Overflow,
456 }
457
458 impl Default for NullInputSource {
459 fn default() -> Self {
460 Self::new()
461 }
462 }
463
464 pub struct OutputBuffer {
465 pub idling: bool,
466 // TODO: something more efficient
467 pub samples: VecDeque<i16>,
468 pub latency: Duration,
469 }
470
471 impl OutputBuffer {
472 pub fn new() -> Self {
473 Self {
474 idling: true,
475 samples: VecDeque::new(),
476 latency: Duration::ZERO,
477 }
478 }
479 }
480
481 impl Default for OutputBuffer {
482 fn default() -> Self {
483 Self::new()
484 }
485 }
486
487 pub trait OutputSink: Send + Sync + 'static {
488 fn start(
489 &self,
490 event_tx: SyncSender<SoundmodemEvent>,
491 buffer: Arc<RwLock<OutputBuffer>>,
492 errors: SoundmodemErrorSender,
493 );
494 fn close(&self);
495 }
496
497 pub struct OutputRrcFile {
498 path: PathBuf,
499 end_tx: Mutex<Option<Sender<()>>>,
500 }
501
502 impl OutputRrcFile {
503 pub fn new(path: PathBuf) -> Self {
504 Self {
505 path,
506 end_tx: Mutex::new(None),
507 }
508 }
509 }
510
511 impl OutputSink for OutputRrcFile {
512 fn start(
513 &self,
514 event_tx: SyncSender<SoundmodemEvent>,
515 buffer: Arc<RwLock<OutputBuffer>>,
516 errors: SoundmodemErrorSender,
517 ) {
518 let (end_tx, end_rx) = channel();
519 let mut file = match File::create(self.path.clone()) {
520 Ok(f) => f,
521 Err(e) => {
522 errors.send_error(OutputRrcError::Open(e));
523 return;
524 }
525 };
526 std::thread::spawn(move || {
527 // assuming 48 kHz for now
528 const TICK: Duration = Duration::from_millis(25);
529 const SAMPLES_PER_TICK: usize = 1200;
530
531 // flattened BE i16s for writing
532 let mut buf = [0u8; SAMPLES_PER_TICK * 2];
533 let mut next_tick = Instant::now() + TICK;
534
535 loop {
536 std::thread::sleep(next_tick.duration_since(Instant::now()));
537 next_tick += TICK;
538 if end_rx.try_recv() != Err(TryRecvError::Empty) {
539 break;
540 }
541 // For now only write deliberately modulated (non-idling) samples
542 // Multiple transmissions will get smooshed together
543 let mut buf_used = 0;
544
545 let mut buffer = buffer.write().unwrap();
546 for out in buf.chunks_mut(2) {
547 if let Some(s) = buffer.samples.pop_front() {
548 let be = s.to_le_bytes();
549 out.copy_from_slice(&[be[0], be[1]]);
550 buf_used += 2;
551 } else if !buffer.idling {
552 let _ = event_tx.send(SoundmodemEvent::OutputUnderrun);
553 break;
554 }
555 }
556 if let Err(e) = file.write_all(&buf[0..buf_used]) {
557 errors.send_error(OutputRrcError::WriteError(e));
558 break;
559 }
560 let _ = event_tx.send(SoundmodemEvent::DidReadFromOutputBuffer {
561 len: buf_used / 2,
562 timestamp: Instant::now(),
563 });
564 }
565 });
566 *self.end_tx.lock().unwrap() = Some(end_tx);
567 }
568
569 fn close(&self) {
570 let _ = self.end_tx.lock().unwrap().take();
571 }
572 }
573
574 #[derive(Debug, Error)]
575 pub enum OutputRrcError {
576 #[error("unable to open rrc file for writing: {0}")]
577 Open(#[source] std::io::Error),
578
579 #[error("error writing to output file: {0}")]
580 WriteError(#[source] std::io::Error),
581 }
582
583 pub struct NullOutputSink {
584 end_tx: Mutex<Option<Sender<()>>>,
585 }
586
587 impl NullOutputSink {
588 pub fn new() -> Self {
589 Self {
590 end_tx: Mutex::new(None),
591 }
592 }
593 }
594
595 impl Default for NullOutputSink {
596 fn default() -> Self {
597 Self::new()
598 }
599 }
600
601 impl OutputSink for NullOutputSink {
602 fn start(
603 &self,
604 event_tx: SyncSender<SoundmodemEvent>,
605 buffer: Arc<RwLock<OutputBuffer>>,
606 _errors: SoundmodemErrorSender,
607 ) {
608 let (end_tx, end_rx) = channel();
609 std::thread::spawn(move || {
610 // assuming 48 kHz for now
611 const TICK: Duration = Duration::from_millis(25);
612 const SAMPLES_PER_TICK: usize = 1200;
613 let mut next_tick = Instant::now() + TICK;
614
615 loop {
616 std::thread::sleep(next_tick.duration_since(Instant::now()));
617 next_tick += TICK;
618 if end_rx.try_recv() != Err(TryRecvError::Empty) {
619 break;
620 }
621
622 let mut buffer = buffer.write().unwrap();
623 let mut taken = 0;
624 for _ in 0..SAMPLES_PER_TICK {
625 if buffer.samples.pop_front().is_none() {
626 if !buffer.idling {
627 let _ = event_tx.send(SoundmodemEvent::OutputUnderrun);
628 break;
629 }
630 } else {
631 taken += 1;
632 }
633 }
634 let _ = event_tx.send(SoundmodemEvent::DidReadFromOutputBuffer {
635 len: taken,
636 timestamp: Instant::now(),
637 });
638 }
639 });
640 *self.end_tx.lock().unwrap() = Some(end_tx);
641 }
642
643 fn close(&self) {
644 let _ = self.end_tx.lock().unwrap().take();
645 }
646 }
647
648 pub trait Ptt: Send + 'static {
649 fn ptt_on(&mut self) -> Result<(), SoundmodemError>;
650 fn ptt_off(&mut self) -> Result<(), SoundmodemError>;
651 }
652
653 /// There is no PTT because this TNC will never make transmissions on a real radio.
654 pub struct NullPtt;
655
656 impl NullPtt {
657 pub fn new() -> Self {
658 Self
659 }
660 }
661
662 impl Default for NullPtt {
663 fn default() -> Self {
664 Self::new()
665 }
666 }
667
668 impl Ptt for NullPtt {
669 fn ptt_on(&mut self) -> Result<(), SoundmodemError> {
670 Ok(())
671 }
672
673 fn ptt_off(&mut self) -> Result<(), SoundmodemError> {
674 Ok(())
675 }
676 }