]>
code.octet-stream.net Git - m17rt/blob - m17app/src/soundmodem.rs
1 use crate::error
::{M17Error
, SoundmodemError
};
2 use crate::tnc
::{Tnc
, TncError
};
3 use m17core
::kiss
::MAX_FRAME_LEN
;
4 use m17core
::modem
::{Demodulator
, Modulator
, ModulatorAction
, SoftDemodulator
, SoftModulator
};
5 use m17core
::tnc
::SoftTnc
;
6 use std
::collections
::VecDeque
;
9 use std
::io
::{self, ErrorKind
, Read
, Write
};
10 use std
::path
::PathBuf
;
11 use std
::sync
::mpsc
::{channel
, sync_channel
, Receiver
, Sender
, SyncSender
, TryRecvError
};
12 use std
::sync
::RwLock
;
13 use std
::sync
::{Arc
, Mutex
};
14 use std
::time
::{Duration
, Instant
};
17 pub struct Soundmodem
{
18 event_tx
: SyncSender
<SoundmodemEvent
>,
19 kiss_out_rx
: Arc
<Mutex
<Receiver
<Arc
<[u8]>>>>,
20 partial_kiss_out
: Arc
<Mutex
<Option
<PartialKissOut
>>>,
24 pub fn new
<I
: InputSource
, O
: OutputSink
, P
: Ptt
, E
: ErrorHandler
>(
30 let (event_tx
, event_rx
) = sync_channel(128);
31 let (kiss_out_tx
, kiss_out_rx
) = sync_channel(128);
32 spawn_soundmodem_worker(
43 kiss_out_rx
: Arc
::new(Mutex
::new(kiss_out_rx
)),
44 partial_kiss_out
: Arc
::new(Mutex
::new(None
)),
49 #[derive(Debug, Clone, Copy)]
50 pub enum ErrorSource
{
56 impl Display
for ErrorSource
{
57 fn fmt(&self, f
: &mut std
::fmt
::Formatter
<'_
>) -> std
::fmt
::Result
{
59 Self::Input
=> write
!(f
, "Input"),
60 Self::Output
=> write
!(f
, "Output"),
61 Self::Ptt
=> write
!(f
, "PTT"),
66 pub trait ErrorHandler
: Send
+ Sync
+ '
static {
67 fn soundmodem_error(&mut self, source
: ErrorSource
, err
: SoundmodemError
);
70 impl<F
> ErrorHandler
for F
72 F
: FnMut(ErrorSource
, SoundmodemError
) + Send
+ Sync
+ '
static,
74 fn soundmodem_error(&mut self, source
: ErrorSource
, err
: SoundmodemError
) {
79 /// Soundmodem errors will be ignored.
80 pub struct NullErrorHandler
;
82 impl NullErrorHandler
{
83 pub fn new() -> Self {
88 impl Default
for NullErrorHandler
{
89 fn default() -> Self {
94 impl ErrorHandler
for NullErrorHandler
{
95 fn soundmodem_error(&mut self, source
: ErrorSource
, err
: SoundmodemError
) {
101 /// Soundmodem errors will be logged at DEBUG level via the `log` crate.
102 pub struct LogErrorHandler
;
104 impl LogErrorHandler
{
105 pub fn new() -> Self {
110 impl Default
for LogErrorHandler
{
111 fn default() -> Self {
116 impl ErrorHandler
for LogErrorHandler
{
117 fn soundmodem_error(&mut self, source
: ErrorSource
, err
: SoundmodemError
) {
118 log
::debug
!("Soundmodem error: {source} - {err}");
122 /// Soundmodem errors will be logged to stdout.
123 pub struct StdoutErrorHandler
;
125 impl StdoutErrorHandler
{
126 pub fn new() -> Self {
131 impl Default
for StdoutErrorHandler
{
132 fn default() -> Self {
137 impl ErrorHandler
for StdoutErrorHandler
{
138 fn soundmodem_error(&mut self, source
: ErrorSource
, err
: SoundmodemError
) {
139 println
!("Soundmodem error: {source} - {err}");
144 pub struct SoundmodemErrorSender
{
146 event_tx
: SyncSender
<SoundmodemEvent
>,
149 impl SoundmodemErrorSender
{
150 pub fn send_error
<E
: Into
<SoundmodemError
>>(&self, err
: E
) {
153 .send(SoundmodemEvent
::RuntimeError(self.source
, err
.into
()));
157 struct PartialKissOut
{
162 impl Read
for Soundmodem
{
163 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
165 let mut partial_kiss_out
= self.partial_kiss_out
.lock().unwrap
();
166 if let Some(partial
) = partial_kiss_out
.as_mut() {
167 let remaining
= partial
.output
.len() - partial
.idx
;
168 let to_write
= remaining
.min(buf
.len());
170 .copy_from_slice(&partial
.output
[partial
.idx
..(partial
.idx
+ to_write
)]);
171 if to_write
== remaining
{
172 *partial_kiss_out
= None
;
174 partial
.idx
+= to_write
;
180 let rx
= self.kiss_out_rx
.lock().unwrap
();
182 .map_err(|s
| io
::Error
::new(ErrorKind
::Other
, format
!("{:?}", s
)))?
184 let to_write
= output
.len().min(buf
.len());
185 buf
[0..to_write
].copy_from_slice(&output
[0..to_write
]);
186 if to_write
!= output
.len() {
187 *self.partial_kiss_out
.lock().unwrap
() = Some(PartialKissOut
{
196 impl Write
for Soundmodem
{
197 fn write(&mut self, buf
: &[u8]) -> std
::io
::Result
<usize> {
198 let _
= self.event_tx
.try_send(SoundmodemEvent
::Kiss(buf
.into
()));
202 fn flush(&mut self) -> std
::io
::Result
<()> {
207 impl Tnc
for Soundmodem
{
208 fn try_clone(&mut self) -> Result
<Self, TncError
> {
210 event_tx
: self.event_tx
.clone(),
211 kiss_out_rx
: self.kiss_out_rx
.clone(),
212 partial_kiss_out
: self.partial_kiss_out
.clone(),
216 fn start(&mut self) {
217 let _
= self.event_tx
.send(SoundmodemEvent
::Start
);
220 fn close(&mut self) {
221 let _
= self.event_tx
.send(SoundmodemEvent
::Close
);
225 pub enum SoundmodemEvent
{
227 BasebandInput(Arc
<[i16]>),
230 DidReadFromOutputBuffer
{ len
: usize, timestamp
: Instant
},
232 RuntimeError(ErrorSource
, SoundmodemError
),
235 fn spawn_soundmodem_worker(
236 event_tx
: SyncSender
<SoundmodemEvent
>,
237 event_rx
: Receiver
<SoundmodemEvent
>,
238 kiss_out_tx
: SyncSender
<Arc
<[u8]>>,
239 input
: Box
<dyn InputSource
>,
240 output
: Box
<dyn OutputSink
>,
241 mut ptt_driver
: Box
<dyn Ptt
>,
242 mut error_handler
: Box
<dyn ErrorHandler
>,
244 std
::thread
::spawn(move || {
245 // TODO: should be able to provide a custom Demodulator for a soundmodem
246 let mut demodulator
= SoftDemodulator
::new();
247 let mut modulator
= SoftModulator
::new();
248 let mut tnc
= SoftTnc
::new();
249 let mut buf
= [0u8; MAX_FRAME_LEN
];
250 let out_buffer
= Arc
::new(RwLock
::new(OutputBuffer
::new()));
251 let mut out_samples
= [0i16; 1024];
252 let start
= Instant
::now();
254 while let Ok(ev
) = event_rx
.recv() {
255 // Update clock on TNC before we do anything
256 let sample_time
= start
.elapsed();
257 let secs
= sample_time
.as_secs();
258 let nanos
= sample_time
.subsec_nanos();
259 // Accurate to within approx 1 sample
260 let now_samples
= 48000 * secs
+ (nanos
as u64 / 20833);
261 tnc
.set_now(now_samples
);
265 SoundmodemEvent
::Kiss(k
) => {
266 let _n
= tnc
.write_kiss(&k
);
267 // TODO: what does it mean if we fail to write it all?
268 // Probably we have to read frames for tx first - revisit this during tx
270 SoundmodemEvent
::BasebandInput(b
) => {
272 if let Some(frame
) = demodulator
.demod(*sample
) {
273 tnc
.handle_frame(frame
);
275 let n
= tnc
.read_kiss(&mut buf
);
277 let _
= kiss_out_tx
.try_send(buf
[0..n
].into
());
284 tnc
.set_data_carrier_detect(demodulator
.data_carrier_detect());
286 SoundmodemEvent
::Start
=> {
287 let input_errors
= SoundmodemErrorSender
{
288 source
: ErrorSource
::Input
,
289 event_tx
: event_tx
.clone(),
291 input
.start(event_tx
.clone(), input_errors
);
292 let output_errors
= SoundmodemErrorSender
{
293 source
: ErrorSource
::Output
,
294 event_tx
: event_tx
.clone(),
296 output
.start(event_tx
.clone(), out_buffer
.clone(), output_errors
);
298 SoundmodemEvent
::Close
=> {
301 if let Err(e
) = ptt_driver
.ptt_off() {
302 error_handler
.soundmodem_error(ErrorSource
::Ptt
, e
);
306 SoundmodemEvent
::DidReadFromOutputBuffer
{ len
, timestamp
} => {
307 let (occupied
, internal_latency
) = {
308 let out_buffer
= out_buffer
.read().unwrap
();
309 (out_buffer
.samples
.len(), out_buffer
.latency
)
311 let internal_latency
= (internal_latency
.as_secs_f32() * 48000.0) as usize;
312 let dynamic_latency
=
313 len
.saturating_sub((timestamp
.elapsed().as_secs_f32() * 48000.0) as usize);
314 modulator
.update
_o
utp
ut
_b
uffer
(
317 internal_latency
+ dynamic_latency
,
320 SoundmodemEvent
::OutputUnderrun
=> {
321 log
::debug
!("output underrun");
322 // TODO: cancel transmission, send empty data frame to host
324 SoundmodemEvent
::RuntimeError(source
, err
) => {
325 error_handler
.soundmodem_error(source
, err
);
330 let new_ptt
= tnc
.ptt();
333 if let Err(e
) = ptt_driver
.ptt_on() {
334 error_handler
.soundmodem_error(ErrorSource
::Ptt
, e
);
336 } else if let Err(e
) = ptt_driver
.ptt_off() {
337 error_handler
.soundmodem_error(ErrorSource
::Ptt
, e
);
342 // Let the modulator do what it wants
343 while let Some(action
) = modulator
.run() {
345 ModulatorAction
::SetIdle(idling
) => {
346 out_buffer
.write().unwrap
().idl
ing
= idling
;
348 ModulatorAction
::GetNextFrame
=> {
349 modulator
.provide_next_frame(tnc
.read_tx_frame());
351 ModulatorAction
::ReadOutput
=> loop {
352 let n
= modulator
.read_output_samples(&mut out_samples
);
356 let mut out_buffer
= out_buffer
.write().unwrap
();
357 for s
in &out_samples
[0..n
] {
358 out_buffer
.samples
.push_back(*s
);
361 ModulatorAction
::TransmissionWillEnd(in_samples
) => {
362 tnc
.set_tx_end_time(in_samples
);
370 pub trait InputSource
: Send
+ Sync
+ '
static {
371 fn start(&self, samples
: SyncSender
<SoundmodemEvent
>, errors
: SoundmodemErrorSender
);
375 pub struct InputRrcFile
{
377 end_tx
: Mutex
<Option
<Sender
<()>>>,
381 pub fn new(path
: PathBuf
) -> Result
<Self, M17Error
> {
382 let mut file
= File
::open(&path
).map_err(|_
| M17Error
::InvalidRrcPath(path
.clone()))?
;
383 let mut baseband
= vec
![];
384 file
.read_to_end(&mut baseband
)
385 .map_err(|_
| M17Error
::RrcReadFailed(path
))?
;
387 baseband
: baseband
.into
(),
388 end_tx
: Mutex
::new(None
),
393 impl InputSource
for InputRrcFile
{
394 fn start(&self, samples
: SyncSender
<SoundmodemEvent
>, errors
: SoundmodemErrorSender
) {
395 let (end_tx
, end_rx
) = channel();
396 let baseband
= self.baseband
.clone();
397 std
::thread
::spawn(move || {
398 // assuming 48 kHz for now
399 const TICK
: Duration
= Duration
::from_millis(25);
400 const SAMPLES_PER_TICK
: usize = 1200;
402 let mut next_tick
= Instant
::now() + TICK
;
403 let mut buf
= [0i16; SAMPLES_PER_TICK
];
406 for sample
in baseband
408 .map(|pair
| i16::from_le_bytes([pair
[0], pair
[1]]))
412 if idx
== SAMPLES_PER_TICK
{
414 .try_send(SoundmodemEvent
::BasebandInput(buf
.into
()))
417 errors
.send_error(InputRrcError
::Overflow
);
421 std
::thread
::sleep(next_tick
.duration_since(Instant
::now()));
423 if end_rx
.try_recv() != Err(TryRecvError
::Empty
) {
428 *self.end_tx
.lock().unwrap
() = Some(end_tx
);
432 let _
= self.end_tx
.lock().unwrap
().take();
436 #[derive(Debug, Error)]
437 pub enum InputRrcError
{
438 #[error("overflow occurred feeding sample to soundmodem")]
442 pub struct NullInputSource
{
443 end_tx
: Mutex
<Option
<Sender
<()>>>,
446 impl NullInputSource
{
447 pub fn new() -> Self {
449 end_tx
: Mutex
::new(None
),
454 impl InputSource
for NullInputSource
{
455 fn start(&self, samples
: SyncSender
<SoundmodemEvent
>, errors
: SoundmodemErrorSender
) {
456 let (end_tx
, end_rx
) = channel();
457 std
::thread
::spawn(move || {
458 // assuming 48 kHz for now
459 const TICK
: Duration
= Duration
::from_millis(25);
460 const SAMPLES_PER_TICK
: usize = 1200;
461 let mut next_tick
= Instant
::now() + TICK
;
464 std
::thread
::sleep(next_tick
.duration_since(Instant
::now()));
466 if end_rx
.try_recv() != Err(TryRecvError
::Empty
) {
470 .try_send(SoundmodemEvent
::BasebandInput(
471 [0i16; SAMPLES_PER_TICK
].into
(),
475 errors
.send_error(NullInputError
::Overflow
);
479 *self.end_tx
.lock().unwrap
() = Some(end_tx
);
483 let _
= self.end_tx
.lock().unwrap
().take();
487 #[derive(Debug, Error)]
488 pub enum NullInputError
{
489 #[error("overflow occurred feeding sample to soundmodem")]
493 impl Default
for NullInputSource
{
494 fn default() -> Self {
499 pub struct OutputBuffer
{
501 // TODO: something more efficient
502 pub samples
: VecDeque
<i16>,
503 pub latency
: Duration
,
507 pub fn new() -> Self {
510 samples
: VecDeque
::new(),
511 latency
: Duration
::ZERO
,
516 impl Default
for OutputBuffer
{
517 fn default() -> Self {
522 pub trait OutputSink
: Send
+ Sync
+ '
static {
525 event_tx
: SyncSender
<SoundmodemEvent
>,
526 buffer
: Arc
<RwLock
<OutputBuffer
>>,
527 errors
: SoundmodemErrorSender
,
532 pub struct OutputRrcFile
{
534 end_tx
: Mutex
<Option
<Sender
<()>>>,
538 pub fn new(path
: PathBuf
) -> Self {
541 end_tx
: Mutex
::new(None
),
546 impl OutputSink
for OutputRrcFile
{
549 event_tx
: SyncSender
<SoundmodemEvent
>,
550 buffer
: Arc
<RwLock
<OutputBuffer
>>,
551 errors
: SoundmodemErrorSender
,
553 let (end_tx
, end_rx
) = channel();
554 let mut file
= match File
::create(self.path
.clone()) {
557 errors
.send_error(OutputRrcError
::Open(e
));
561 std
::thread
::spawn(move || {
562 // assuming 48 kHz for now
563 const TICK
: Duration
= Duration
::from_millis(25);
564 const SAMPLES_PER_TICK
: usize = 1200;
566 // flattened BE i16s for writing
567 let mut buf
= [0u8; SAMPLES_PER_TICK
* 2];
568 let mut next_tick
= Instant
::now() + TICK
;
571 std
::thread
::sleep(next_tick
.duration_since(Instant
::now()));
573 if end_rx
.try_recv() != Err(TryRecvError
::Empty
) {
576 // For now only write deliberately modulated (non-idling) samples
577 // Multiple transmissions will get smooshed together
578 let mut buf_used
= 0;
580 let mut buffer
= buffer
.write().unwrap
();
581 for out
in buf
.chunks_mut(2) {
582 if let Some(s
) = buffer
.samples
.pop_front() {
583 let be
= s
.to_le_bytes();
584 out
.copy_from_slice(&[be
[0], be
[1]]);
586 } else if !buffer
.idl
ing
{
587 let _
= event_tx
.send(SoundmodemEvent
::OutputUnderrun
);
591 if let Err(e
) = file
.write_all(&buf
[0..buf_used
]) {
592 errors
.send_error(OutputRrcError
::WriteError(e
));
595 let _
= event_tx
.send(SoundmodemEvent
::DidReadFromOutputBuffer
{
597 timestamp
: Instant
::now(),
601 *self.end_tx
.lock().unwrap
() = Some(end_tx
);
605 let _
= self.end_tx
.lock().unwrap
().take();
609 #[derive(Debug, Error)]
610 pub enum OutputRrcError
{
611 #[error("unable to open rrc file for writing: {0}")]
612 Open(#[source] std::io::Error),
614 #[error("error writing to output file: {0}")]
615 WriteError(#[source] std::io::Error),
618 pub struct NullOutputSink
{
619 end_tx
: Mutex
<Option
<Sender
<()>>>,
622 impl NullOutputSink
{
623 pub fn new() -> Self {
625 end_tx
: Mutex
::new(None
),
630 impl Default
for NullOutputSink
{
631 fn default() -> Self {
636 impl OutputSink
for NullOutputSink
{
639 event_tx
: SyncSender
<SoundmodemEvent
>,
640 buffer
: Arc
<RwLock
<OutputBuffer
>>,
641 _errors
: SoundmodemErrorSender
,
643 let (end_tx
, end_rx
) = channel();
644 std
::thread
::spawn(move || {
645 // assuming 48 kHz for now
646 const TICK
: Duration
= Duration
::from_millis(25);
647 const SAMPLES_PER_TICK
: usize = 1200;
648 let mut next_tick
= Instant
::now() + TICK
;
651 std
::thread
::sleep(next_tick
.duration_since(Instant
::now()));
653 if end_rx
.try_recv() != Err(TryRecvError
::Empty
) {
657 let mut buffer
= buffer
.write().unwrap
();
659 for _
in 0..SAMPLES_PER_TICK
{
660 if buffer
.samples
.pop_front().is
_none
() {
662 let _
= event_tx
.send(SoundmodemEvent
::OutputUnderrun
);
669 let _
= event_tx
.send(SoundmodemEvent
::DidReadFromOutputBuffer
{
671 timestamp
: Instant
::now(),
675 *self.end_tx
.lock().unwrap
() = Some(end_tx
);
679 let _
= self.end_tx
.lock().unwrap
().take();
683 pub trait Ptt
: Send
+ '
static {
684 fn ptt_on(&mut self) -> Result
<(), SoundmodemError
>;
685 fn ptt_off(&mut self) -> Result
<(), SoundmodemError
>;
688 /// There is no PTT because this TNC will never make transmissions on a real radio.
692 pub fn new() -> Self {
697 impl Default
for NullPtt
{
698 fn default() -> Self {
703 impl Ptt
for NullPtt
{
704 fn ptt_on(&mut self) -> Result
<(), SoundmodemError
> {
708 fn ptt_off(&mut self) -> Result
<(), SoundmodemError
> {