From 5be3b9f857eefe983fbadae517a25da05b8b8982 Mon Sep 17 00:00:00 2001 From: Thomas Karpiniec Date: Wed, 14 May 2025 21:59:25 +1000 Subject: [PATCH 1/2] Factor out output buffering for KISS sockets --- m17app/src/lib.rs | 1 + m17app/src/soundmodem.rs | 47 ++++--------------------- m17app/src/util/mod.rs | 1 + m17app/src/util/out_buffer.rs | 64 +++++++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 41 deletions(-) create mode 100644 m17app/src/util/mod.rs create mode 100644 m17app/src/util/out_buffer.rs diff --git a/m17app/src/lib.rs b/m17app/src/lib.rs index 1881df3..8a98e79 100644 --- a/m17app/src/lib.rs +++ b/m17app/src/lib.rs @@ -9,6 +9,7 @@ pub mod serial; pub mod soundcard; pub mod soundmodem; pub mod tnc; +pub mod util; #[cfg(test)] mod test_util; diff --git a/m17app/src/soundmodem.rs b/m17app/src/soundmodem.rs index 59b56c9..c0cbfbb 100644 --- a/m17app/src/soundmodem.rs +++ b/m17app/src/soundmodem.rs @@ -1,12 +1,13 @@ use crate::error::{M17Error, SoundmodemError}; use crate::tnc::{Tnc, TncError}; +use crate::util::out_buffer::OutBuffer; use m17core::kiss::MAX_FRAME_LEN; use m17core::modem::{Demodulator, Modulator, ModulatorAction, SoftDemodulator, SoftModulator}; use m17core::tnc::SoftTnc; use std::collections::VecDeque; use std::fmt::Display; use std::fs::File; -use std::io::{self, ErrorKind, Read, Write}; +use std::io::{self, Read, Write}; use std::path::PathBuf; use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TryRecvError}; use std::sync::RwLock; @@ -16,8 +17,7 @@ use thiserror::Error; pub struct Soundmodem { event_tx: SyncSender, - kiss_out_rx: Arc>>>, - partial_kiss_out: Arc>>, + kiss_out: OutBuffer, } impl Soundmodem { @@ -40,8 +40,7 @@ impl Soundmodem { ); Self { event_tx, - kiss_out_rx: Arc::new(Mutex::new(kiss_out_rx)), - partial_kiss_out: Arc::new(Mutex::new(None)), + kiss_out: OutBuffer::new(kiss_out_rx), } } } @@ -154,42 +153,9 @@ impl SoundmodemErrorSender { } } -struct PartialKissOut { - output: Arc<[u8]>, - idx: usize, -} - impl Read for Soundmodem { fn read(&mut self, buf: &mut [u8]) -> io::Result { - { - let mut partial_kiss_out = self.partial_kiss_out.lock().unwrap(); - if let Some(partial) = partial_kiss_out.as_mut() { - let remaining = partial.output.len() - partial.idx; - let to_write = remaining.min(buf.len()); - buf[0..to_write] - .copy_from_slice(&partial.output[partial.idx..(partial.idx + to_write)]); - if to_write == remaining { - *partial_kiss_out = None; - } else { - partial.idx += to_write; - } - return Ok(to_write); - } - } - let output = { - let rx = self.kiss_out_rx.lock().unwrap(); - rx.recv() - .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s)))? - }; - let to_write = output.len().min(buf.len()); - buf[0..to_write].copy_from_slice(&output[0..to_write]); - if to_write != output.len() { - *self.partial_kiss_out.lock().unwrap() = Some(PartialKissOut { - output, - idx: to_write, - }) - } - Ok(to_write) + self.kiss_out.read(buf) } } @@ -208,8 +174,7 @@ impl Tnc for Soundmodem { fn try_clone(&mut self) -> Result { Ok(Self { event_tx: self.event_tx.clone(), - kiss_out_rx: self.kiss_out_rx.clone(), - partial_kiss_out: self.partial_kiss_out.clone(), + kiss_out: self.kiss_out.clone(), }) } diff --git a/m17app/src/util/mod.rs b/m17app/src/util/mod.rs new file mode 100644 index 0000000..01de5c7 --- /dev/null +++ b/m17app/src/util/mod.rs @@ -0,0 +1 @@ +pub mod out_buffer; diff --git a/m17app/src/util/out_buffer.rs b/m17app/src/util/out_buffer.rs new file mode 100644 index 0000000..c24e0a9 --- /dev/null +++ b/m17app/src/util/out_buffer.rs @@ -0,0 +1,64 @@ +//! Buffer between `read()` calls + +use std::{ + io::{self, ErrorKind, Read}, + sync::{mpsc::Receiver, Arc, Mutex}, +}; + +#[derive(Clone)] +struct PartialOut { + output: Arc<[u8]>, + idx: usize, +} + +/// Buffer binary chunks from an MPSC receiver, feeding arbitrary chunks to `read()` calls. +/// +/// Can be cloned, but should only be read from once at a time. +#[derive(Clone)] +pub struct OutBuffer { + rx: Arc>>>, + partial_out: Arc>>, +} + +impl OutBuffer { + pub fn new(rx: Receiver>) -> Self { + Self { + rx: Arc::new(Mutex::new(rx)), + partial_out: Arc::new(Mutex::new(None)), + } + } +} + +impl Read for OutBuffer { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + { + let mut partial_out = self.partial_out.lock().unwrap(); + if let Some(partial) = partial_out.as_mut() { + let remaining = partial.output.len() - partial.idx; + let to_write = remaining.min(buf.len()); + buf[0..to_write] + .copy_from_slice(&partial.output[partial.idx..(partial.idx + to_write)]); + if to_write == remaining { + *partial_out = None; + } else { + partial.idx += to_write; + } + return Ok(to_write); + } + } + let output = { + let rx = self.rx.lock().unwrap(); + rx.recv() + .map_err(|s| io::Error::new(ErrorKind::Other, format!("{:?}", s)))? + }; + let to_write = output.len().min(buf.len()); + buf[0..to_write].copy_from_slice(&output[0..to_write]); + if to_write != output.len() { + *self.partial_out.lock().unwrap() = Some(PartialOut { + output, + idx: to_write, + }) + } + Ok(to_write) + } +} -- 2.39.5 From 8feaa040715d4e7872c6ced423cd582a77b613c2 Mon Sep 17 00:00:00 2001 From: Thomas Karpiniec Date: Fri, 16 May 2025 11:10:23 +1000 Subject: [PATCH 2/2] add RfToVoice converter for reflector tx path --- m17core/src/reflector/convert.rs | 62 ++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/m17core/src/reflector/convert.rs b/m17core/src/reflector/convert.rs index 8e5a96d..7ad54c3 100644 --- a/m17core/src/reflector/convert.rs +++ b/m17core/src/reflector/convert.rs @@ -56,3 +56,65 @@ impl VoiceToRf { (lsf, stream) } } + +/// Accepts LSF and stream RF payloads and merges them into `Voice` packets for reflector use. +/// +/// For a series of transmissions this object should be re-used so that Stream ID is correctly +/// changed after each new LSF. +pub struct RfToVoice { + lsf: LsfFrame, + stream_id: u16, +} + +impl RfToVoice { + pub fn new(lsf: LsfFrame) -> Self { + Self { lsf, stream_id: 0 } + } + + pub fn process_lsf(&mut self, lsf: LsfFrame) { + self.lsf = lsf; + self.stream_id = self.stream_id.wrapping_add(1); + } + + pub fn process_stream(&self, stream: &StreamFrame) -> Voice { + let mut v = Voice::new(); + v.set_stream_id(self.stream_id); + v.set_frame_number(stream.frame_number); + v.set_end_of_stream(stream.end_of_stream); + v.set_payload(&stream.stream_data); + v.set_link_setup_frame(&self.lsf); + v + } +} + +#[cfg(test)] +mod tests { + use crate::{ + address::{Address, Callsign}, + protocol::{LsfFrame, StreamFrame}, + }; + + use super::{RfToVoice, VoiceToRf}; + + #[test] + fn convert_roundtrip() { + let lsf = LsfFrame::new_voice( + &Address::Callsign(Callsign(*b"VK7XT ")), + &Address::Broadcast, + ); + let stream = StreamFrame { + lich_idx: 0, + lich_part: lsf.0[0..5].try_into().unwrap(), + frame_number: 0, + end_of_stream: false, + stream_data: [1u8; 16], + }; + let rf_to_voice = RfToVoice::new(lsf.clone()); + let voice = rf_to_voice.process_stream(&stream); + + let mut voice_to_rf = VoiceToRf::new(); + let (lsf2, stream2) = voice_to_rf.next(&voice); + assert_eq!(lsf2, Some(lsf)); + assert_eq!(stream2, stream); + } +} -- 2.39.5