]> code.octet-stream.net Git - m17rt/blob - m17app/src/soundmodem.rs
Address some clippy lints
[m17rt] / m17app / src / soundmodem.rs
1 use crate::tnc::{Tnc, TncError};
2 use log::debug;
3 use m17core::kiss::MAX_FRAME_LEN;
4 use m17core::modem::{Demodulator, Modulator, ModulatorAction, SoftDemodulator, SoftModulator};
5 use m17core::tnc::SoftTnc;
6 use std::collections::VecDeque;
7 use std::fs::File;
8 use std::io::{self, ErrorKind, Read, Write};
9 use std::path::PathBuf;
10 use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TryRecvError};
11 use std::sync::RwLock;
12 use std::sync::{Arc, Mutex};
13 use std::time::{Duration, Instant};
14
15 pub struct Soundmodem {
16 event_tx: SyncSender<SoundmodemEvent>,
17 kiss_out_rx: Arc<Mutex<Receiver<Arc<[u8]>>>>,
18 partial_kiss_out: Arc<Mutex<Option<PartialKissOut>>>,
19 }
20
21 impl Soundmodem {
22 pub fn new<I: InputSource, O: OutputSink, P: Ptt>(input: I, output: O, ptt: P) -> Self {
23 // must create TNC here
24 let (event_tx, event_rx) = sync_channel(128);
25 let (kiss_out_tx, kiss_out_rx) = sync_channel(128);
26 spawn_soundmodem_worker(
27 event_tx.clone(),
28 event_rx,
29 kiss_out_tx,
30 Box::new(input),
31 Box::new(output),
32 Box::new(ptt),
33 );
34 Self {
35 event_tx,
36 kiss_out_rx: Arc::new(Mutex::new(kiss_out_rx)),
37 partial_kiss_out: Arc::new(Mutex::new(None)),
38 }
39 }
40 }
41
42 struct PartialKissOut {
43 output: Arc<[u8]>,
44 idx: usize,
45 }
46
47 impl Read for Soundmodem {
48 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
49 {
50 let mut partial_kiss_out = self.partial_kiss_out.lock().unwrap();
51 if let Some(partial) = partial_kiss_out.as_mut() {
52 let remaining = partial.output.len() - partial.idx;
53 let to_write = remaining.min(buf.len());
54 buf[0..to_write]
55 .copy_from_slice(&partial.output[partial.idx..(partial.idx + to_write)]);
56 if to_write == remaining {
57 *partial_kiss_out = None;
58 } else {
59 partial.idx += to_write;
60 }
61 return Ok(to_write);
62 }
63 }
64 let output = {
65 let rx = self.kiss_out_rx.lock().unwrap();
66 rx.recv()
67 .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s)))?
68 };
69 let to_write = output.len().min(buf.len());
70 buf[0..to_write].copy_from_slice(&output[0..to_write]);
71 if to_write != output.len() {
72 *self.partial_kiss_out.lock().unwrap() = Some(PartialKissOut {
73 output,
74 idx: to_write,
75 })
76 }
77 Ok(to_write)
78 }
79 }
80
81 impl Write for Soundmodem {
82 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
83 let _ = self.event_tx.try_send(SoundmodemEvent::Kiss(buf.into()));
84 Ok(buf.len())
85 }
86
87 fn flush(&mut self) -> std::io::Result<()> {
88 Ok(())
89 }
90 }
91
92 impl Tnc for Soundmodem {
93 fn try_clone(&mut self) -> Result<Self, TncError> {
94 Ok(Self {
95 event_tx: self.event_tx.clone(),
96 kiss_out_rx: self.kiss_out_rx.clone(),
97 partial_kiss_out: self.partial_kiss_out.clone(),
98 })
99 }
100
101 fn start(&mut self) -> Result<(), TncError> {
102 let _ = self.event_tx.send(SoundmodemEvent::Start);
103 Ok(())
104 }
105
106 fn close(&mut self) -> Result<(), TncError> {
107 let _ = self.event_tx.send(SoundmodemEvent::Close);
108 Ok(())
109 }
110 }
111
112 pub enum SoundmodemEvent {
113 Kiss(Arc<[u8]>),
114 BasebandInput(Arc<[i16]>),
115 Start,
116 Close,
117 DidReadFromOutputBuffer { len: usize, timestamp: Instant },
118 OutputUnderrun,
119 }
120
121 fn spawn_soundmodem_worker(
122 event_tx: SyncSender<SoundmodemEvent>,
123 event_rx: Receiver<SoundmodemEvent>,
124 kiss_out_tx: SyncSender<Arc<[u8]>>,
125 input: Box<dyn InputSource>,
126 output: Box<dyn OutputSink>,
127 mut ptt_driver: Box<dyn Ptt>,
128 ) {
129 std::thread::spawn(move || {
130 // TODO: should be able to provide a custom Demodulator for a soundmodem
131 let mut demodulator = SoftDemodulator::new();
132 let mut modulator = SoftModulator::new();
133 let mut tnc = SoftTnc::new();
134 let mut buf = [0u8; MAX_FRAME_LEN];
135 let out_buffer = Arc::new(RwLock::new(OutputBuffer::new()));
136 let mut out_samples = [0i16; 1024];
137 let start = Instant::now();
138 let mut ptt = false;
139 while let Ok(ev) = event_rx.recv() {
140 // Update clock on TNC before we do anything
141 let sample_time = (start.elapsed().as_nanos() / 48000) as u64;
142 tnc.set_now(sample_time);
143
144 // Handle event
145 match ev {
146 SoundmodemEvent::Kiss(k) => {
147 let _n = tnc.write_kiss(&k);
148 // TODO: what does it mean if we fail to write it all?
149 // Probably we have to read frames for tx first - revisit this during tx
150 }
151 SoundmodemEvent::BasebandInput(b) => {
152 for sample in &*b {
153 if let Some(frame) = demodulator.demod(*sample) {
154 tnc.handle_frame(frame);
155 loop {
156 let n = tnc.read_kiss(&mut buf);
157 if n > 0 {
158 let _ = kiss_out_tx.try_send(buf[0..n].into());
159 } else {
160 break;
161 }
162 }
163 }
164 }
165 tnc.set_data_carrier_detect(demodulator.data_carrier_detect());
166 }
167 SoundmodemEvent::Start => {
168 input.start(event_tx.clone());
169 output.start(event_tx.clone(), out_buffer.clone());
170 }
171 SoundmodemEvent::Close => {
172 ptt_driver.ptt_off();
173 break;
174 }
175 SoundmodemEvent::DidReadFromOutputBuffer { len, timestamp } => {
176 let (occupied, internal_latency) = {
177 let out_buffer = out_buffer.read().unwrap();
178 (out_buffer.samples.len(), out_buffer.latency)
179 };
180 let internal_latency = (internal_latency.as_secs_f32() * 48000.0) as usize;
181 let dynamic_latency =
182 len.saturating_sub((timestamp.elapsed().as_secs_f32() * 48000.0) as usize);
183 modulator.update_output_buffer(
184 occupied,
185 48000,
186 internal_latency + dynamic_latency,
187 );
188 }
189 SoundmodemEvent::OutputUnderrun => {
190 // TODO: cancel transmission, send empty data frame to host
191 }
192 }
193
194 // Update PTT state
195 let new_ptt = tnc.ptt();
196 if new_ptt != ptt {
197 if new_ptt {
198 ptt_driver.ptt_on();
199 } else {
200 ptt_driver.ptt_off();
201 }
202 }
203 ptt = new_ptt;
204
205 // Let the modulator do what it wants
206 while let Some(action) = modulator.run() {
207 match action {
208 ModulatorAction::SetIdle(idling) => {
209 out_buffer.write().unwrap().idling = idling;
210 }
211 ModulatorAction::GetNextFrame => {
212 modulator.provide_next_frame(tnc.read_tx_frame());
213 }
214 ModulatorAction::ReadOutput => loop {
215 let n = modulator.read_output_samples(&mut out_samples);
216 if n == 0 {
217 break;
218 }
219 let mut out_buffer = out_buffer.write().unwrap();
220 for s in &out_samples[0..n] {
221 out_buffer.samples.push_back(*s);
222 }
223 },
224 ModulatorAction::TransmissionWillEnd(in_samples) => {
225 tnc.set_tx_end_time(in_samples);
226 }
227 }
228 }
229 }
230 });
231 }
232
233 pub trait InputSource: Send + Sync + 'static {
234 fn start(&self, samples: SyncSender<SoundmodemEvent>);
235 fn close(&self);
236 }
237
238 pub struct InputRrcFile {
239 path: PathBuf,
240 end_tx: Mutex<Option<Sender<()>>>,
241 }
242
243 impl InputRrcFile {
244 pub fn new(path: PathBuf) -> Self {
245 Self {
246 path,
247 end_tx: Mutex::new(None),
248 }
249 }
250 }
251
252 impl InputSource for InputRrcFile {
253 fn start(&self, samples: SyncSender<SoundmodemEvent>) {
254 let (end_tx, end_rx) = channel();
255 let path = self.path.clone();
256 std::thread::spawn(move || {
257 // TODO: error handling
258 let mut file = File::open(path).unwrap();
259 let mut baseband = vec![];
260 file.read_to_end(&mut baseband).unwrap();
261
262 // assuming 48 kHz for now
263 const TICK: Duration = Duration::from_millis(25);
264 const SAMPLES_PER_TICK: usize = 1200;
265
266 let mut next_tick = Instant::now() + TICK;
267 let mut buf = [0i16; SAMPLES_PER_TICK];
268 let mut idx = 0;
269
270 for sample in baseband
271 .chunks(2)
272 .map(|pair| i16::from_le_bytes([pair[0], pair[1]]))
273 {
274 buf[idx] = sample;
275 idx += 1;
276 if idx == SAMPLES_PER_TICK {
277 if let Err(e) = samples.try_send(SoundmodemEvent::BasebandInput(buf.into())) {
278 debug!("overflow feeding soundmodem: {e:?}");
279 }
280 next_tick = next_tick + TICK;
281 idx = 0;
282 std::thread::sleep(next_tick.duration_since(Instant::now()));
283 }
284 if end_rx.try_recv() != Err(TryRecvError::Empty) {
285 break;
286 }
287 }
288 });
289 *self.end_tx.lock().unwrap() = Some(end_tx);
290 }
291
292 fn close(&self) {
293 let _ = self.end_tx.lock().unwrap().take();
294 }
295 }
296
297 pub struct NullInputSource {
298 end_tx: Mutex<Option<Sender<()>>>,
299 }
300
301 impl NullInputSource {
302 pub fn new() -> Self {
303 Self {
304 end_tx: Mutex::new(None),
305 }
306 }
307 }
308
309 impl InputSource for NullInputSource {
310 fn start(&self, samples: SyncSender<SoundmodemEvent>) {
311 let (end_tx, end_rx) = channel();
312 std::thread::spawn(move || {
313 // assuming 48 kHz for now
314 const TICK: Duration = Duration::from_millis(25);
315 const SAMPLES_PER_TICK: usize = 1200;
316 let mut next_tick = Instant::now() + TICK;
317
318 loop {
319 std::thread::sleep(next_tick.duration_since(Instant::now()));
320 next_tick = next_tick + TICK;
321 if end_rx.try_recv() != Err(TryRecvError::Empty) {
322 break;
323 }
324 if let Err(e) = samples.try_send(SoundmodemEvent::BasebandInput(
325 [0i16; SAMPLES_PER_TICK].into(),
326 )) {
327 debug!("overflow feeding soundmodem: {e:?}");
328 }
329 }
330 });
331 *self.end_tx.lock().unwrap() = Some(end_tx);
332 }
333
334 fn close(&self) {
335 let _ = self.end_tx.lock().unwrap().take();
336 }
337 }
338
339 pub struct OutputBuffer {
340 pub idling: bool,
341 // TODO: something more efficient
342 pub samples: VecDeque<i16>,
343 pub latency: Duration,
344 }
345
346 impl OutputBuffer {
347 pub fn new() -> Self {
348 Self {
349 idling: true,
350 samples: VecDeque::new(),
351 latency: Duration::ZERO,
352 }
353 }
354 }
355
356 pub trait OutputSink: Send + Sync + 'static {
357 fn start(&self, event_tx: SyncSender<SoundmodemEvent>, buffer: Arc<RwLock<OutputBuffer>>);
358 fn close(&self);
359 }
360
361 pub struct OutputRrcFile {
362 path: PathBuf,
363 end_tx: Mutex<Option<Sender<()>>>,
364 }
365
366 impl OutputRrcFile {
367 pub fn new(path: PathBuf) -> Self {
368 Self {
369 path,
370 end_tx: Mutex::new(None),
371 }
372 }
373 }
374
375 impl OutputSink for OutputRrcFile {
376 fn start(&self, event_tx: SyncSender<SoundmodemEvent>, buffer: Arc<RwLock<OutputBuffer>>) {
377 let (end_tx, end_rx) = channel();
378 let path = self.path.clone();
379 std::thread::spawn(move || {
380 // TODO: error handling
381 let mut file = File::create(path).unwrap();
382
383 // assuming 48 kHz for now
384 const TICK: Duration = Duration::from_millis(25);
385 const SAMPLES_PER_TICK: usize = 1200;
386
387 // flattened BE i16s for writing
388 let mut buf = [0u8; SAMPLES_PER_TICK * 2];
389 let mut next_tick = Instant::now() + TICK;
390
391 loop {
392 std::thread::sleep(next_tick.duration_since(Instant::now()));
393 next_tick = next_tick + TICK;
394 if end_rx.try_recv() != Err(TryRecvError::Empty) {
395 break;
396 }
397 // For now only write deliberately modulated (non-idling) samples
398 // Multiple transmissions will get smooshed together
399 let mut buf_used = 0;
400
401 let mut buffer = buffer.write().unwrap();
402 for out in buf.chunks_mut(2) {
403 if let Some(s) = buffer.samples.pop_front() {
404 let be = s.to_le_bytes();
405 out.copy_from_slice(&[be[0], be[1]]);
406 buf_used += 2;
407 } else if !buffer.idling {
408 debug!("output rrc file had underrun");
409 let _ = event_tx.send(SoundmodemEvent::OutputUnderrun);
410 break;
411 }
412 }
413 if let Err(e) = file.write_all(&buf[0..buf_used]) {
414 debug!("failed to write to rrc file: {e:?}");
415 break;
416 }
417 let _ = event_tx.send(SoundmodemEvent::DidReadFromOutputBuffer {
418 len: buf_used / 2,
419 timestamp: Instant::now(),
420 });
421 }
422 });
423 *self.end_tx.lock().unwrap() = Some(end_tx);
424 }
425
426 fn close(&self) {
427 let _ = self.end_tx.lock().unwrap().take();
428 }
429 }
430
431 pub struct NullOutputSink {
432 end_tx: Mutex<Option<Sender<()>>>,
433 }
434
435 impl NullOutputSink {
436 pub fn new() -> Self {
437 Self {
438 end_tx: Mutex::new(None),
439 }
440 }
441 }
442
443 impl OutputSink for NullOutputSink {
444 fn start(&self, event_tx: SyncSender<SoundmodemEvent>, buffer: Arc<RwLock<OutputBuffer>>) {
445 let (end_tx, end_rx) = channel();
446 std::thread::spawn(move || {
447 // assuming 48 kHz for now
448 const TICK: Duration = Duration::from_millis(25);
449 const SAMPLES_PER_TICK: usize = 1200;
450 let mut next_tick = Instant::now() + TICK;
451
452 loop {
453 std::thread::sleep(next_tick.duration_since(Instant::now()));
454 next_tick = next_tick + TICK;
455 if end_rx.try_recv() != Err(TryRecvError::Empty) {
456 break;
457 }
458
459 let mut buffer = buffer.write().unwrap();
460 let mut taken = 0;
461 for _ in 0..SAMPLES_PER_TICK {
462 if !buffer.samples.pop_front().is_some() {
463 if !buffer.idling {
464 debug!("null output had underrun");
465 let _ = event_tx.send(SoundmodemEvent::OutputUnderrun);
466 break;
467 }
468 } else {
469 taken += 1;
470 }
471 }
472 let _ = event_tx.send(SoundmodemEvent::DidReadFromOutputBuffer {
473 len: taken,
474 timestamp: Instant::now(),
475 });
476 }
477 });
478 *self.end_tx.lock().unwrap() = Some(end_tx);
479 }
480
481 fn close(&self) {
482 let _ = self.end_tx.lock().unwrap().take();
483 }
484 }
485
486 pub trait Ptt: Send + 'static {
487 fn ptt_on(&mut self);
488 fn ptt_off(&mut self);
489 }
490
491 /// There is no PTT because this TNC will never make transmissions on a real radio.
492 pub struct NullPtt;
493
494 impl NullPtt {
495 pub fn new() -> Self {
496 Self
497 }
498 }
499
500 impl Ptt for NullPtt {
501 fn ptt_on(&mut self) {}
502 fn ptt_off(&mut self) {}
503 }