]> code.octet-stream.net Git - m17rt/blob - m17codec2/src/tx.rs
c0eb5968fcdd31217ba3dbac6fec5ee0019ed1fe
[m17rt] / m17codec2 / src / tx.rs
1 use codec2::{Codec2, Codec2Mode};
2 use cpal::traits::DeviceTrait;
3 use cpal::traits::HostTrait;
4 use cpal::traits::StreamTrait;
5 use cpal::SampleFormat;
6 use cpal::SampleRate;
7 use log::debug;
8 use m17app::adapter::StreamAdapter;
9 use m17app::app::TxHandle;
10 use m17app::error::AdapterError;
11 use m17app::link_setup::LinkSetup;
12 use m17app::link_setup::M17Address;
13 use m17app::StreamFrame;
14 use rubato::Resampler;
15 use rubato::SincFixedOut;
16 use rubato::SincInterpolationParameters;
17 use std::path::PathBuf;
18 use std::sync::mpsc;
19 use std::sync::mpsc::channel;
20 use std::sync::Arc;
21 use std::sync::Mutex;
22 use std::time::Duration;
23 use std::time::Instant;
24
25 use crate::M17Codec2Error;
26
27 /// Transmits a wave file as an M17 stream
28 pub struct WavePlayer;
29
30 impl WavePlayer {
31 /// Plays a wave file (blocking).
32 ///
33 /// * `path`: wave file to transmit, must be 8 kHz mono and 16-bit LE
34 /// * `tx`: a `TxHandle` obtained from an `M17App`
35 /// * `source`: address of transmission source
36 /// * `destination`: address of transmission destination
37 /// * `channel_access_number`: from 0 to 15, usually 0
38 pub fn play(
39 path: PathBuf,
40 tx: TxHandle,
41 source: &M17Address,
42 destination: &M17Address,
43 channel_access_number: u8,
44 ) {
45 let mut reader = hound::WavReader::open(path).unwrap();
46 let mut samples = reader.samples::<i16>();
47
48 let mut codec = Codec2::new(Codec2Mode::MODE_3200);
49 let mut in_buf = [0i16; 160];
50 let mut out_buf = [0u8; 16];
51 let mut lsf_chunk: usize = 0;
52 const TICK: Duration = Duration::from_millis(40);
53 let mut next_tick = Instant::now() + TICK;
54 let mut frame_number = 0;
55
56 let mut setup = LinkSetup::new_voice(source, destination);
57 setup.set_channel_access_number(channel_access_number);
58 tx.transmit_stream_start(&setup);
59
60 loop {
61 let mut last_one = false;
62 for out in out_buf.chunks_mut(8) {
63 for i in in_buf.iter_mut() {
64 let sample = match samples.next() {
65 Some(Ok(sample)) => sample,
66 _ => {
67 last_one = true;
68 0
69 }
70 };
71 *i = sample;
72 }
73 codec.encode(out, &in_buf);
74 }
75 tx.transmit_stream_next(&StreamFrame {
76 lich_idx: lsf_chunk as u8,
77 lich_part: setup.lich_part(lsf_chunk as u8),
78 frame_number,
79 end_of_stream: last_one,
80 stream_data: out_buf,
81 });
82 frame_number += 1;
83 lsf_chunk = (lsf_chunk + 1) % 6;
84
85 if last_one {
86 break;
87 }
88
89 std::thread::sleep(next_tick.duration_since(Instant::now()));
90 next_tick += TICK;
91 }
92 }
93 }
94
95 /// Control transmissions into a Codec2TxAdapter
96 #[derive(Clone)]
97 pub struct Ptt {
98 tx: mpsc::Sender<Event>,
99 }
100
101 impl Ptt {
102 pub fn set_ptt(&self, ptt: bool) {
103 let _ = self.tx.send(if ptt { Event::PttOn } else { Event::PttOff });
104 }
105 }
106
107 /// Use a microphone and local PTT to transmit Codec2 voice data into an M17 channel.
108 pub struct Codec2TxAdapter {
109 input_card: Option<String>,
110 event_tx: mpsc::Sender<Event>,
111 event_rx: Mutex<Option<mpsc::Receiver<Event>>>,
112 source: M17Address,
113 destination: M17Address,
114 }
115
116 impl Codec2TxAdapter {
117 pub fn new(source: M17Address, destination: M17Address) -> Self {
118 let (event_tx, event_rx) = mpsc::channel();
119 Self {
120 input_card: None,
121 event_tx,
122 event_rx: Mutex::new(Some(event_rx)),
123 source,
124 destination,
125 }
126 }
127
128 pub fn set_input_card<S: Into<String>>(&mut self, card_name: S) {
129 self.input_card = Some(card_name.into());
130 }
131
132 pub fn ptt(&self) -> Ptt {
133 Ptt {
134 tx: self.event_tx.clone(),
135 }
136 }
137 }
138
139 enum Event {
140 PttOn,
141 PttOff,
142 MicSamples(Arc<[i16]>),
143 Close,
144 }
145
146 impl StreamAdapter for Codec2TxAdapter {
147 fn start(&self, handle: TxHandle) -> Result<(), AdapterError> {
148 let Some(event_rx) = self.event_rx.lock().unwrap().take() else {
149 return Err(M17Codec2Error::RepeatStart.into());
150 };
151 let event_tx = self.event_tx.clone();
152 let (setup_tx, setup_rx) = channel();
153 let input_card = self.input_card.clone();
154 let from = self.source.clone();
155 let to = self.destination.clone();
156 std::thread::spawn(move || {
157 stream_thread(event_tx, event_rx, setup_tx, input_card, handle, from, to)
158 });
159 let sample_rate = setup_rx.recv()??;
160 debug!("selected codec2 microphone sample rate {sample_rate}");
161
162 Ok(())
163 }
164
165 fn close(&self) -> Result<(), AdapterError> {
166 let _ = self.event_tx.send(Event::Close);
167 Ok(())
168 }
169
170 fn stream_began(&self, _link_setup: LinkSetup) {
171 // not interested in incoming transmissions
172 }
173
174 fn stream_data(&self, _frame_number: u16, _is_final: bool, _data: Arc<[u8; 16]>) {
175 // not interested in incoming transmissions
176
177 // the only reason this is an adapter at all is for future "transmission aborted" feedback
178 // when that's implemented by m17app
179 }
180 }
181
182 fn stream_thread(
183 event_tx: mpsc::Sender<Event>,
184 event_rx: mpsc::Receiver<Event>,
185 setup_tx: mpsc::Sender<Result<u32, AdapterError>>,
186 input_card: Option<String>,
187 handle: TxHandle,
188 source: M17Address,
189 destination: M17Address,
190 ) {
191 let host = cpal::default_host();
192 let device = if let Some(input_card) = input_card {
193 // TODO: more error handling for unwraps
194 match host
195 .input_devices()
196 .unwrap()
197 .find(|d| d.name().unwrap() == input_card)
198 {
199 Some(d) => d,
200 None => {
201 let _ = setup_tx.send(Err(M17Codec2Error::CardUnavailable(input_card).into()));
202 return;
203 }
204 }
205 } else {
206 match host.default_input_device() {
207 Some(d) => d,
208 None => {
209 let _ = setup_tx.send(Err(M17Codec2Error::DefaultCardUnavailable.into()));
210 return;
211 }
212 }
213 };
214 let card_name = device.name().unwrap();
215 let mut configs = match device.supported_input_configs() {
216 Ok(c) => c,
217 Err(e) => {
218 let _ = setup_tx.send(Err(
219 M17Codec2Error::InputConfigsUnavailable(card_name, e).into()
220 ));
221 return;
222 }
223 };
224 // TODO: rank these by most favourable, same for rx
225 let config = match configs.find(|c| {
226 (c.channels() == 1 || c.channels() == 2) && c.sample_format() == SampleFormat::I16
227 }) {
228 Some(c) => c,
229 None => {
230 let _ = setup_tx.send(Err(
231 M17Codec2Error::SupportedInputUnavailable(card_name).into()
232 ));
233 return;
234 }
235 };
236
237 let target_sample_rate =
238 if config.min_sample_rate().0 <= 8000 && config.max_sample_rate().0 >= 8000 {
239 8000
240 } else {
241 config.min_sample_rate().0
242 };
243 let channels = config.channels();
244
245 let mut acc: Box<dyn Accumulator> = if target_sample_rate != 8000 {
246 Box::new(ResamplingAccumulator::new(target_sample_rate as f64))
247 } else {
248 Box::new(DirectAccumulator::new())
249 };
250
251 let config = config.with_sample_rate(SampleRate(target_sample_rate));
252 let stream = match device.build_input_stream(
253 &config.into(),
254 move |data: &[i16], _info: &cpal::InputCallbackInfo| {
255 let mut samples = vec![];
256 for d in data.chunks(channels as usize) {
257 // if we were given multi-channel input we'll pick the first channel
258 // TODO: configurable?
259 samples.push(d[0]);
260 }
261 let _ = event_tx.send(Event::MicSamples(samples.into()));
262 },
263 |e| {
264 // abort here?
265 debug!("error occurred in codec2 recording: {e:?}");
266 },
267 None,
268 ) {
269 Ok(s) => s,
270 Err(e) => {
271 let _ = setup_tx.send(Err(
272 M17Codec2Error::InputStreamBuildError(card_name, e).into()
273 ));
274 return;
275 }
276 };
277
278 let _ = setup_tx.send(Ok(target_sample_rate));
279 let mut state = State::Idle;
280 let mut codec2 = Codec2::new(Codec2Mode::MODE_3200);
281 let link_setup = LinkSetup::new_voice(&source, &destination);
282 let mut lich_idx = 0;
283 let mut frame_number = 0;
284
285 // Now the main loop
286 while let Ok(ev) = event_rx.recv() {
287 match ev {
288 Event::PttOn => {
289 match state {
290 State::Idle => {
291 match stream.play() {
292 Ok(()) => (),
293 Err(_e) => {
294 // TODO: report M17Codec2Error::InputStreamPlayError(card_name, e).into()
295 break;
296 }
297 }
298 acc.reset();
299 codec2 = Codec2::new(Codec2Mode::MODE_3200);
300 state = State::StartTransmitting;
301 }
302 State::StartTransmitting => {}
303 State::Transmitting => {}
304 State::Ending => state = State::EndingWithPttRestart,
305 State::EndingWithPttRestart => {}
306 }
307 }
308 Event::PttOff => match state {
309 State::Idle => {}
310 State::StartTransmitting => state = State::Idle,
311 State::Transmitting => state = State::Ending,
312 State::Ending => {}
313 State::EndingWithPttRestart => state = State::Ending,
314 },
315 Event::MicSamples(samples) => {
316 match state {
317 State::Idle => {}
318 State::StartTransmitting
319 | State::Transmitting
320 | State::Ending
321 | State::EndingWithPttRestart => {
322 acc.handle_samples(&samples);
323 while let Some(frame) = acc.try_next_frame() {
324 let mut stream_data = [0u8; 16];
325 codec2.encode(&mut stream_data[0..8], &frame[0..160]);
326 codec2.encode(&mut stream_data[8..16], &frame[160..320]);
327
328 if state == State::StartTransmitting {
329 handle.transmit_stream_start(&link_setup);
330 lich_idx = 0;
331 frame_number = 0;
332 state = State::Transmitting;
333 }
334
335 let end_of_stream = state != State::Transmitting;
336 handle.transmit_stream_next(&StreamFrame {
337 lich_idx,
338 lich_part: link_setup.lich_part(lich_idx),
339 frame_number,
340 end_of_stream,
341 stream_data,
342 });
343 frame_number += 1;
344 lich_idx = (lich_idx + 1) % 6;
345
346 if end_of_stream {
347 break;
348 }
349 }
350
351 if state == State::Ending {
352 // when finished sending final stream frame
353 let _ = stream.pause();
354 state = State::Idle;
355 }
356
357 if state == State::EndingWithPttRestart {
358 acc.reset();
359 codec2 = Codec2::new(Codec2Mode::MODE_3200);
360 state = State::StartTransmitting;
361 }
362 }
363 }
364 }
365 Event::Close => {
366 // assume PTT etc. will clean up itself responsibly on close
367 break;
368 }
369 }
370 }
371 }
372
373 #[derive(Debug, PartialEq, Eq)]
374 enum State {
375 /// Waiting for PTT
376 Idle,
377 /// PTT engaged but we are collecting the first full frame of audio data before starting the stream
378 StartTransmitting,
379 /// Streaming voice frames
380 Transmitting,
381 /// PTT disengaged; we are collecting the next frame of audio to use as a final frame
382 Ending,
383 /// PTT was re-enaged while ending; we will send final frame then go back to StartTransmitting
384 EndingWithPttRestart,
385 }
386
387 fn resampler_params() -> SincInterpolationParameters {
388 SincInterpolationParameters {
389 sinc_len: 256,
390 f_cutoff: 0.95,
391 oversampling_factor: 128,
392 interpolation: rubato::SincInterpolationType::Cubic,
393 window: rubato::WindowFunction::BlackmanHarris2,
394 }
395 }
396
397 trait Accumulator {
398 fn handle_samples(&mut self, samples: &[i16]);
399 /// Return 320 samples, enough for two Codec2 frames
400 fn try_next_frame(&mut self) -> Option<Vec<i16>>;
401 fn reset(&mut self);
402 }
403
404 struct DirectAccumulator {
405 buffer: Vec<i16>,
406 }
407
408 impl DirectAccumulator {
409 fn new() -> Self {
410 Self { buffer: Vec::new() }
411 }
412 }
413
414 impl Accumulator for DirectAccumulator {
415 fn handle_samples(&mut self, samples: &[i16]) {
416 self.buffer.extend_from_slice(samples);
417 }
418
419 fn try_next_frame(&mut self) -> Option<Vec<i16>> {
420 if self.buffer.len() >= 320 {
421 let part = self.buffer.split_off(320);
422 Some(std::mem::replace(&mut self.buffer, part))
423 } else {
424 None
425 }
426 }
427
428 fn reset(&mut self) {
429 self.buffer.clear();
430 }
431 }
432
433 struct ResamplingAccumulator {
434 input_rate: f64,
435 buffer: Vec<i16>,
436 resampler: SincFixedOut<f32>,
437 }
438
439 impl ResamplingAccumulator {
440 fn new(input_rate: f64) -> Self {
441 Self {
442 input_rate,
443 buffer: Vec::new(),
444 resampler: make_resampler(input_rate),
445 }
446 }
447 }
448
449 impl Accumulator for ResamplingAccumulator {
450 fn handle_samples(&mut self, samples: &[i16]) {
451 self.buffer.extend_from_slice(samples);
452 }
453
454 fn try_next_frame(&mut self) -> Option<Vec<i16>> {
455 let required = self.resampler.input_frames_next();
456 if self.buffer.len() >= required {
457 let mut part = self.buffer.split_off(required);
458 std::mem::swap(&mut self.buffer, &mut part);
459 let samples_f: Vec<f32> = part.iter().map(|s| *s as f32 / 16384.0f32).collect();
460 let out = self.resampler.process(&[samples_f], None).unwrap();
461 Some(out[0].iter().map(|s| (*s * 16383.0f32) as i16).collect())
462 } else {
463 None
464 }
465 }
466
467 fn reset(&mut self) {
468 self.buffer.clear();
469 self.resampler = make_resampler(self.input_rate);
470 }
471 }
472
473 fn make_resampler(input_rate: f64) -> SincFixedOut<f32> {
474 // want 320 samples at a time to create 2x Codec2 frames per M17 Voice frame
475 SincFixedOut::new(8000f64 / input_rate, 1.0, resampler_params(), 320, 1).unwrap()
476 }