]> code.octet-stream.net Git - m17rt/blob - m17app/src/soundmodem.rs
Remove unneeded mutex and provide some sample error handlers
[m17rt] / m17app / src / soundmodem.rs
1 use crate::error::{M17Error, SoundmodemError};
2 use crate::tnc::{Tnc, TncError};
3 use m17core::kiss::MAX_FRAME_LEN;
4 use m17core::modem::{Demodulator, Modulator, ModulatorAction, SoftDemodulator, SoftModulator};
5 use m17core::tnc::SoftTnc;
6 use std::collections::VecDeque;
7 use std::fmt::Display;
8 use std::fs::File;
9 use std::io::{self, ErrorKind, Read, Write};
10 use std::path::PathBuf;
11 use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TryRecvError};
12 use std::sync::RwLock;
13 use std::sync::{Arc, Mutex};
14 use std::time::{Duration, Instant};
15 use thiserror::Error;
16
17 pub struct Soundmodem {
18 event_tx: SyncSender<SoundmodemEvent>,
19 kiss_out_rx: Arc<Mutex<Receiver<Arc<[u8]>>>>,
20 partial_kiss_out: Arc<Mutex<Option<PartialKissOut>>>,
21 }
22
23 impl Soundmodem {
24 pub fn new<I: InputSource, O: OutputSink, P: Ptt, E: ErrorHandler>(
25 input: I,
26 output: O,
27 ptt: P,
28 error: E,
29 ) -> Self {
30 let (event_tx, event_rx) = sync_channel(128);
31 let (kiss_out_tx, kiss_out_rx) = sync_channel(128);
32 spawn_soundmodem_worker(
33 event_tx.clone(),
34 event_rx,
35 kiss_out_tx,
36 Box::new(input),
37 Box::new(output),
38 Box::new(ptt),
39 Box::new(error),
40 );
41 Self {
42 event_tx,
43 kiss_out_rx: Arc::new(Mutex::new(kiss_out_rx)),
44 partial_kiss_out: Arc::new(Mutex::new(None)),
45 }
46 }
47 }
48
49 #[derive(Debug, Clone, Copy)]
50 pub enum ErrorSource {
51 Input,
52 Output,
53 Ptt,
54 }
55
56 impl Display for ErrorSource {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 match self {
59 Self::Input => write!(f, "Input"),
60 Self::Output => write!(f, "Output"),
61 Self::Ptt => write!(f, "PTT"),
62 }
63 }
64 }
65
66 pub trait ErrorHandler: Send + Sync + 'static {
67 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError);
68 }
69
70 impl<F> ErrorHandler for F
71 where
72 F: FnMut(ErrorSource, SoundmodemError) + Send + Sync + 'static,
73 {
74 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError) {
75 self(source, err)
76 }
77 }
78
79 /// Soundmodem errors will be ignored.
80 pub struct NullErrorHandler;
81
82 impl NullErrorHandler {
83 pub fn new() -> Self {
84 Self {}
85 }
86 }
87
88 impl Default for NullErrorHandler {
89 fn default() -> Self {
90 Self::new()
91 }
92 }
93
94 impl ErrorHandler for NullErrorHandler {
95 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError) {
96 let _ = source;
97 let _ = err;
98 }
99 }
100
101 /// Soundmodem errors will be logged at DEBUG level via the `log` crate.
102 pub struct LogErrorHandler;
103
104 impl LogErrorHandler {
105 pub fn new() -> Self {
106 Self {}
107 }
108 }
109
110 impl Default for LogErrorHandler {
111 fn default() -> Self {
112 Self::new()
113 }
114 }
115
116 impl ErrorHandler for LogErrorHandler {
117 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError) {
118 log::debug!("Soundmodem error: {source} - {err}");
119 }
120 }
121
122 /// Soundmodem errors will be logged to stdout.
123 pub struct StdoutErrorHandler;
124
125 impl StdoutErrorHandler {
126 pub fn new() -> Self {
127 Self {}
128 }
129 }
130
131 impl Default for StdoutErrorHandler {
132 fn default() -> Self {
133 Self::new()
134 }
135 }
136
137 impl ErrorHandler for StdoutErrorHandler {
138 fn soundmodem_error(&mut self, source: ErrorSource, err: SoundmodemError) {
139 println!("Soundmodem error: {source} - {err}");
140 }
141 }
142
143 #[derive(Clone)]
144 pub struct SoundmodemErrorSender {
145 source: ErrorSource,
146 event_tx: SyncSender<SoundmodemEvent>,
147 }
148
149 impl SoundmodemErrorSender {
150 pub fn send_error<E: Into<SoundmodemError>>(&self, err: E) {
151 let _ = self
152 .event_tx
153 .send(SoundmodemEvent::RuntimeError(self.source, err.into()));
154 }
155 }
156
157 struct PartialKissOut {
158 output: Arc<[u8]>,
159 idx: usize,
160 }
161
162 impl Read for Soundmodem {
163 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
164 {
165 let mut partial_kiss_out = self.partial_kiss_out.lock().unwrap();
166 if let Some(partial) = partial_kiss_out.as_mut() {
167 let remaining = partial.output.len() - partial.idx;
168 let to_write = remaining.min(buf.len());
169 buf[0..to_write]
170 .copy_from_slice(&partial.output[partial.idx..(partial.idx + to_write)]);
171 if to_write == remaining {
172 *partial_kiss_out = None;
173 } else {
174 partial.idx += to_write;
175 }
176 return Ok(to_write);
177 }
178 }
179 let output = {
180 let rx = self.kiss_out_rx.lock().unwrap();
181 rx.recv()
182 .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s)))?
183 };
184 let to_write = output.len().min(buf.len());
185 buf[0..to_write].copy_from_slice(&output[0..to_write]);
186 if to_write != output.len() {
187 *self.partial_kiss_out.lock().unwrap() = Some(PartialKissOut {
188 output,
189 idx: to_write,
190 })
191 }
192 Ok(to_write)
193 }
194 }
195
196 impl Write for Soundmodem {
197 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
198 let _ = self.event_tx.try_send(SoundmodemEvent::Kiss(buf.into()));
199 Ok(buf.len())
200 }
201
202 fn flush(&mut self) -> std::io::Result<()> {
203 Ok(())
204 }
205 }
206
207 impl Tnc for Soundmodem {
208 fn try_clone(&mut self) -> Result<Self, TncError> {
209 Ok(Self {
210 event_tx: self.event_tx.clone(),
211 kiss_out_rx: self.kiss_out_rx.clone(),
212 partial_kiss_out: self.partial_kiss_out.clone(),
213 })
214 }
215
216 fn start(&mut self) {
217 let _ = self.event_tx.send(SoundmodemEvent::Start);
218 }
219
220 fn close(&mut self) {
221 let _ = self.event_tx.send(SoundmodemEvent::Close);
222 }
223 }
224
225 pub enum SoundmodemEvent {
226 Kiss(Arc<[u8]>),
227 BasebandInput(Arc<[i16]>),
228 Start,
229 Close,
230 DidReadFromOutputBuffer { len: usize, timestamp: Instant },
231 OutputUnderrun,
232 RuntimeError(ErrorSource, SoundmodemError),
233 }
234
235 fn spawn_soundmodem_worker(
236 event_tx: SyncSender<SoundmodemEvent>,
237 event_rx: Receiver<SoundmodemEvent>,
238 kiss_out_tx: SyncSender<Arc<[u8]>>,
239 input: Box<dyn InputSource>,
240 output: Box<dyn OutputSink>,
241 mut ptt_driver: Box<dyn Ptt>,
242 mut error_handler: Box<dyn ErrorHandler>,
243 ) {
244 std::thread::spawn(move || {
245 // TODO: should be able to provide a custom Demodulator for a soundmodem
246 let mut demodulator = SoftDemodulator::new();
247 let mut modulator = SoftModulator::new();
248 let mut tnc = SoftTnc::new();
249 let mut buf = [0u8; MAX_FRAME_LEN];
250 let out_buffer = Arc::new(RwLock::new(OutputBuffer::new()));
251 let mut out_samples = [0i16; 1024];
252 let start = Instant::now();
253 let mut ptt = false;
254 while let Ok(ev) = event_rx.recv() {
255 // Update clock on TNC before we do anything
256 let sample_time = start.elapsed();
257 let secs = sample_time.as_secs();
258 let nanos = sample_time.subsec_nanos();
259 // Accurate to within approx 1 sample
260 let now_samples = 48000 * secs + (nanos as u64 / 20833);
261 tnc.set_now(now_samples);
262
263 // Handle event
264 match ev {
265 SoundmodemEvent::Kiss(k) => {
266 let _n = tnc.write_kiss(&k);
267 // TODO: what does it mean if we fail to write it all?
268 // Probably we have to read frames for tx first - revisit this during tx
269 }
270 SoundmodemEvent::BasebandInput(b) => {
271 for sample in &*b {
272 if let Some(frame) = demodulator.demod(*sample) {
273 tnc.handle_frame(frame);
274 loop {
275 let n = tnc.read_kiss(&mut buf);
276 if n > 0 {
277 let _ = kiss_out_tx.try_send(buf[0..n].into());
278 } else {
279 break;
280 }
281 }
282 }
283 }
284 tnc.set_data_carrier_detect(demodulator.data_carrier_detect());
285 }
286 SoundmodemEvent::Start => {
287 let input_errors = SoundmodemErrorSender {
288 source: ErrorSource::Input,
289 event_tx: event_tx.clone(),
290 };
291 input.start(event_tx.clone(), input_errors);
292 let output_errors = SoundmodemErrorSender {
293 source: ErrorSource::Output,
294 event_tx: event_tx.clone(),
295 };
296 output.start(event_tx.clone(), out_buffer.clone(), output_errors);
297 }
298 SoundmodemEvent::Close => {
299 input.close();
300 output.close();
301 if let Err(e) = ptt_driver.ptt_off() {
302 error_handler.soundmodem_error(ErrorSource::Ptt, e);
303 }
304 break;
305 }
306 SoundmodemEvent::DidReadFromOutputBuffer { len, timestamp } => {
307 let (occupied, internal_latency) = {
308 let out_buffer = out_buffer.read().unwrap();
309 (out_buffer.samples.len(), out_buffer.latency)
310 };
311 let internal_latency = (internal_latency.as_secs_f32() * 48000.0) as usize;
312 let dynamic_latency =
313 len.saturating_sub((timestamp.elapsed().as_secs_f32() * 48000.0) as usize);
314 modulator.update_output_buffer(
315 occupied,
316 48000,
317 internal_latency + dynamic_latency,
318 );
319 }
320 SoundmodemEvent::OutputUnderrun => {
321 log::debug!("output underrun");
322 // TODO: cancel transmission, send empty data frame to host
323 }
324 SoundmodemEvent::RuntimeError(source, err) => {
325 error_handler.soundmodem_error(source, err);
326 }
327 }
328
329 // Update PTT state
330 let new_ptt = tnc.ptt();
331 if new_ptt != ptt {
332 if new_ptt {
333 if let Err(e) = ptt_driver.ptt_on() {
334 error_handler.soundmodem_error(ErrorSource::Ptt, e);
335 }
336 } else if let Err(e) = ptt_driver.ptt_off() {
337 error_handler.soundmodem_error(ErrorSource::Ptt, e);
338 }
339 }
340 ptt = new_ptt;
341
342 // Let the modulator do what it wants
343 while let Some(action) = modulator.run() {
344 match action {
345 ModulatorAction::SetIdle(idling) => {
346 out_buffer.write().unwrap().idling = idling;
347 }
348 ModulatorAction::GetNextFrame => {
349 modulator.provide_next_frame(tnc.read_tx_frame());
350 }
351 ModulatorAction::ReadOutput => loop {
352 let n = modulator.read_output_samples(&mut out_samples);
353 if n == 0 {
354 break;
355 }
356 let mut out_buffer = out_buffer.write().unwrap();
357 for s in &out_samples[0..n] {
358 out_buffer.samples.push_back(*s);
359 }
360 },
361 ModulatorAction::TransmissionWillEnd(in_samples) => {
362 tnc.set_tx_end_time(in_samples);
363 }
364 }
365 }
366 }
367 });
368 }
369
370 pub trait InputSource: Send + Sync + 'static {
371 fn start(&self, samples: SyncSender<SoundmodemEvent>, errors: SoundmodemErrorSender);
372 fn close(&self);
373 }
374
375 pub struct InputRrcFile {
376 baseband: Arc<[u8]>,
377 end_tx: Mutex<Option<Sender<()>>>,
378 }
379
380 impl InputRrcFile {
381 pub fn new(path: PathBuf) -> Result<Self, M17Error> {
382 let mut file = File::open(&path).map_err(|_| M17Error::InvalidRrcPath(path.clone()))?;
383 let mut baseband = vec![];
384 file.read_to_end(&mut baseband)
385 .map_err(|_| M17Error::RrcReadFailed(path))?;
386 Ok(Self {
387 baseband: baseband.into(),
388 end_tx: Mutex::new(None),
389 })
390 }
391 }
392
393 impl InputSource for InputRrcFile {
394 fn start(&self, samples: SyncSender<SoundmodemEvent>, errors: SoundmodemErrorSender) {
395 let (end_tx, end_rx) = channel();
396 let baseband = self.baseband.clone();
397 std::thread::spawn(move || {
398 // assuming 48 kHz for now
399 const TICK: Duration = Duration::from_millis(25);
400 const SAMPLES_PER_TICK: usize = 1200;
401
402 let mut next_tick = Instant::now() + TICK;
403 let mut buf = [0i16; SAMPLES_PER_TICK];
404 let mut idx = 0;
405
406 for sample in baseband
407 .chunks(2)
408 .map(|pair| i16::from_le_bytes([pair[0], pair[1]]))
409 {
410 buf[idx] = sample;
411 idx += 1;
412 if idx == SAMPLES_PER_TICK {
413 if samples
414 .try_send(SoundmodemEvent::BasebandInput(buf.into()))
415 .is_err()
416 {
417 errors.send_error(InputRrcError::Overflow);
418 }
419 next_tick += TICK;
420 idx = 0;
421 std::thread::sleep(next_tick.duration_since(Instant::now()));
422 }
423 if end_rx.try_recv() != Err(TryRecvError::Empty) {
424 break;
425 }
426 }
427 });
428 *self.end_tx.lock().unwrap() = Some(end_tx);
429 }
430
431 fn close(&self) {
432 let _ = self.end_tx.lock().unwrap().take();
433 }
434 }
435
436 #[derive(Debug, Error)]
437 pub enum InputRrcError {
438 #[error("overflow occurred feeding sample to soundmodem")]
439 Overflow,
440 }
441
442 pub struct NullInputSource {
443 end_tx: Mutex<Option<Sender<()>>>,
444 }
445
446 impl NullInputSource {
447 pub fn new() -> Self {
448 Self {
449 end_tx: Mutex::new(None),
450 }
451 }
452 }
453
454 impl InputSource for NullInputSource {
455 fn start(&self, samples: SyncSender<SoundmodemEvent>, errors: SoundmodemErrorSender) {
456 let (end_tx, end_rx) = channel();
457 std::thread::spawn(move || {
458 // assuming 48 kHz for now
459 const TICK: Duration = Duration::from_millis(25);
460 const SAMPLES_PER_TICK: usize = 1200;
461 let mut next_tick = Instant::now() + TICK;
462
463 loop {
464 std::thread::sleep(next_tick.duration_since(Instant::now()));
465 next_tick += TICK;
466 if end_rx.try_recv() != Err(TryRecvError::Empty) {
467 break;
468 }
469 if samples
470 .try_send(SoundmodemEvent::BasebandInput(
471 [0i16; SAMPLES_PER_TICK].into(),
472 ))
473 .is_err()
474 {
475 errors.send_error(NullInputError::Overflow);
476 }
477 }
478 });
479 *self.end_tx.lock().unwrap() = Some(end_tx);
480 }
481
482 fn close(&self) {
483 let _ = self.end_tx.lock().unwrap().take();
484 }
485 }
486
487 #[derive(Debug, Error)]
488 pub enum NullInputError {
489 #[error("overflow occurred feeding sample to soundmodem")]
490 Overflow,
491 }
492
493 impl Default for NullInputSource {
494 fn default() -> Self {
495 Self::new()
496 }
497 }
498
499 pub struct OutputBuffer {
500 pub idling: bool,
501 // TODO: something more efficient
502 pub samples: VecDeque<i16>,
503 pub latency: Duration,
504 }
505
506 impl OutputBuffer {
507 pub fn new() -> Self {
508 Self {
509 idling: true,
510 samples: VecDeque::new(),
511 latency: Duration::ZERO,
512 }
513 }
514 }
515
516 impl Default for OutputBuffer {
517 fn default() -> Self {
518 Self::new()
519 }
520 }
521
522 pub trait OutputSink: Send + Sync + 'static {
523 fn start(
524 &self,
525 event_tx: SyncSender<SoundmodemEvent>,
526 buffer: Arc<RwLock<OutputBuffer>>,
527 errors: SoundmodemErrorSender,
528 );
529 fn close(&self);
530 }
531
532 pub struct OutputRrcFile {
533 path: PathBuf,
534 end_tx: Mutex<Option<Sender<()>>>,
535 }
536
537 impl OutputRrcFile {
538 pub fn new(path: PathBuf) -> Self {
539 Self {
540 path,
541 end_tx: Mutex::new(None),
542 }
543 }
544 }
545
546 impl OutputSink for OutputRrcFile {
547 fn start(
548 &self,
549 event_tx: SyncSender<SoundmodemEvent>,
550 buffer: Arc<RwLock<OutputBuffer>>,
551 errors: SoundmodemErrorSender,
552 ) {
553 let (end_tx, end_rx) = channel();
554 let mut file = match File::create(self.path.clone()) {
555 Ok(f) => f,
556 Err(e) => {
557 errors.send_error(OutputRrcError::Open(e));
558 return;
559 }
560 };
561 std::thread::spawn(move || {
562 // assuming 48 kHz for now
563 const TICK: Duration = Duration::from_millis(25);
564 const SAMPLES_PER_TICK: usize = 1200;
565
566 // flattened BE i16s for writing
567 let mut buf = [0u8; SAMPLES_PER_TICK * 2];
568 let mut next_tick = Instant::now() + TICK;
569
570 loop {
571 std::thread::sleep(next_tick.duration_since(Instant::now()));
572 next_tick += TICK;
573 if end_rx.try_recv() != Err(TryRecvError::Empty) {
574 break;
575 }
576 // For now only write deliberately modulated (non-idling) samples
577 // Multiple transmissions will get smooshed together
578 let mut buf_used = 0;
579
580 let mut buffer = buffer.write().unwrap();
581 for out in buf.chunks_mut(2) {
582 if let Some(s) = buffer.samples.pop_front() {
583 let be = s.to_le_bytes();
584 out.copy_from_slice(&[be[0], be[1]]);
585 buf_used += 2;
586 } else if !buffer.idling {
587 let _ = event_tx.send(SoundmodemEvent::OutputUnderrun);
588 break;
589 }
590 }
591 if let Err(e) = file.write_all(&buf[0..buf_used]) {
592 errors.send_error(OutputRrcError::WriteError(e));
593 break;
594 }
595 let _ = event_tx.send(SoundmodemEvent::DidReadFromOutputBuffer {
596 len: buf_used / 2,
597 timestamp: Instant::now(),
598 });
599 }
600 });
601 *self.end_tx.lock().unwrap() = Some(end_tx);
602 }
603
604 fn close(&self) {
605 let _ = self.end_tx.lock().unwrap().take();
606 }
607 }
608
609 #[derive(Debug, Error)]
610 pub enum OutputRrcError {
611 #[error("unable to open rrc file for writing: {0}")]
612 Open(#[source] std::io::Error),
613
614 #[error("error writing to output file: {0}")]
615 WriteError(#[source] std::io::Error),
616 }
617
618 pub struct NullOutputSink {
619 end_tx: Mutex<Option<Sender<()>>>,
620 }
621
622 impl NullOutputSink {
623 pub fn new() -> Self {
624 Self {
625 end_tx: Mutex::new(None),
626 }
627 }
628 }
629
630 impl Default for NullOutputSink {
631 fn default() -> Self {
632 Self::new()
633 }
634 }
635
636 impl OutputSink for NullOutputSink {
637 fn start(
638 &self,
639 event_tx: SyncSender<SoundmodemEvent>,
640 buffer: Arc<RwLock<OutputBuffer>>,
641 _errors: SoundmodemErrorSender,
642 ) {
643 let (end_tx, end_rx) = channel();
644 std::thread::spawn(move || {
645 // assuming 48 kHz for now
646 const TICK: Duration = Duration::from_millis(25);
647 const SAMPLES_PER_TICK: usize = 1200;
648 let mut next_tick = Instant::now() + TICK;
649
650 loop {
651 std::thread::sleep(next_tick.duration_since(Instant::now()));
652 next_tick += TICK;
653 if end_rx.try_recv() != Err(TryRecvError::Empty) {
654 break;
655 }
656
657 let mut buffer = buffer.write().unwrap();
658 let mut taken = 0;
659 for _ in 0..SAMPLES_PER_TICK {
660 if buffer.samples.pop_front().is_none() {
661 if !buffer.idling {
662 let _ = event_tx.send(SoundmodemEvent::OutputUnderrun);
663 break;
664 }
665 } else {
666 taken += 1;
667 }
668 }
669 let _ = event_tx.send(SoundmodemEvent::DidReadFromOutputBuffer {
670 len: taken,
671 timestamp: Instant::now(),
672 });
673 }
674 });
675 *self.end_tx.lock().unwrap() = Some(end_tx);
676 }
677
678 fn close(&self) {
679 let _ = self.end_tx.lock().unwrap().take();
680 }
681 }
682
683 pub trait Ptt: Send + 'static {
684 fn ptt_on(&mut self) -> Result<(), SoundmodemError>;
685 fn ptt_off(&mut self) -> Result<(), SoundmodemError>;
686 }
687
688 /// There is no PTT because this TNC will never make transmissions on a real radio.
689 pub struct NullPtt;
690
691 impl NullPtt {
692 pub fn new() -> Self {
693 Self
694 }
695 }
696
697 impl Default for NullPtt {
698 fn default() -> Self {
699 Self::new()
700 }
701 }
702
703 impl Ptt for NullPtt {
704 fn ptt_on(&mut self) -> Result<(), SoundmodemError> {
705 Ok(())
706 }
707
708 fn ptt_off(&mut self) -> Result<(), SoundmodemError> {
709 Ok(())
710 }
711 }