From: Thomas Karpiniec Date: Mon, 2 Jun 2025 10:14:33 +0000 (+1000) Subject: Implement TX for reflector TNC X-Git-Url: https://code.octet-stream.net/m17rt/commitdiff_plain/7a14bb4738cf011d9f99dc86abaee7042407b27d Implement TX for reflector TNC --- diff --git a/m17app/src/reflector.rs b/m17app/src/reflector.rs index 9735298..419093e 100644 --- a/m17app/src/reflector.rs +++ b/m17app/src/reflector.rs @@ -12,10 +12,11 @@ use std::{ use crate::{link_setup::M17Address, tnc::Tnc, util::out_buffer::OutBuffer}; use m17core::{ - kiss::KissFrame, + kiss::{KissBuffer, KissCommand, KissFrame, PORT_STREAM}, + protocol::{LsfFrame, StreamFrame}, reflector::{ - convert::VoiceToRf, - packet::{Connect, Pong, ServerMessage}, + convert::{RfToVoice, VoiceToRf}, + packet::{Connect, Pong, ServerMessage, Voice}, }, }; @@ -40,6 +41,8 @@ pub struct ReflectorClientTnc { kiss_out: OutBuffer, event_tx: Arc>>>, is_closed: Arc, + kiss_buffer: Arc>, + rf_to_voice: Arc>>, } impl ReflectorClientTnc { @@ -60,6 +63,8 @@ impl ReflectorClientTnc { kiss_out: OutBuffer::new(rx), event_tx: Arc::new(Mutex::new(None)), is_closed: Arc::new(AtomicBool::new(false)), + kiss_buffer: Arc::new(Mutex::new(KissBuffer::new())), + rf_to_voice: Arc::new(Mutex::new(None)), } } } @@ -72,7 +77,42 @@ impl Read for ReflectorClientTnc { impl Write for ReflectorClientTnc { fn write(&mut self, buf: &[u8]) -> std::io::Result { - Ok(buf.len()) + let mut kiss = self.kiss_buffer.lock().unwrap(); + let rem = kiss.buf_remaining(); + let sz = buf.len().max(rem.len()); + rem[0..sz].copy_from_slice(&buf[0..sz]); + if let Some(frame) = kiss.next_frame() { + if Ok(KissCommand::DataFrame) == frame.command() && frame.port() == Ok(PORT_STREAM) { + let mut payload = [0u8; 30]; + if let Ok(len) = frame.decode_payload(&mut payload) { + if len == 30 { + let lsf = LsfFrame(payload); + let mut to_voice = self.rf_to_voice.lock().unwrap(); + match &mut *to_voice { + Some(to_voice) => to_voice.process_lsf(lsf), + None => *to_voice = Some(RfToVoice::new(lsf)), + } + } else if len == 26 { + let frame_num_part = u16::from_be_bytes([payload[6], payload[7]]); + let frame = StreamFrame { + lich_idx: payload[5] >> 5, + lich_part: payload[0..5].try_into().unwrap(), + frame_number: frame_num_part & 0x7fff, + end_of_stream: frame_num_part & 0x8000 > 0, + stream_data: payload[8..24].try_into().unwrap(), + }; + let to_voice = self.rf_to_voice.lock().unwrap(); + if let Some(to_voice) = &*to_voice { + let voice = to_voice.process_stream(&frame); + if let Some(tx) = self.event_tx.lock().unwrap().as_ref() { + let _ = tx.send(TncEvent::TransmitVoice(voice)); + } + } + } + }; + } + } + Ok(sz) } fn flush(&mut self) -> std::io::Result<()> { @@ -107,6 +147,7 @@ impl Tnc for ReflectorClientTnc { enum TncEvent { Close, Received(ServerMessage), + TransmitVoice(Voice), } fn spawn_runner( @@ -156,7 +197,6 @@ fn spawn_runner( status.clone(), ); } - println!("single conn ended"); } status.lock().unwrap().status_changed(TncStatus::Closed); }); @@ -179,7 +219,7 @@ fn run_single_conn( let mut connect = Connect::new(); connect.set_address(config.local_callsign.address().to_owned()); connect.set_module(config.module); - socket.send_to(connect.as_bytes(), dest).unwrap(); + let _ = socket.send_to(connect.as_bytes(), dest); let mut converter = VoiceToRf::new(); let single_conn_ended = Arc::new(AtomicBool::new(false)); // TODO: unwrap @@ -192,7 +232,6 @@ fn run_single_conn( while let Ok(ev) = event_rx.recv_timeout(Duration::from_secs(30)) { match ev { TncEvent::Close => { - println!("writer: close"); break; } TncEvent::Received(server_msg) => match server_msg { @@ -230,10 +269,17 @@ fn run_single_conn( .address() .clone(), ); - socket.send_to(pong.as_bytes(), dest).unwrap(); + if socket.send_to(pong.as_bytes(), dest).is_err() { + break; + } } _ => {} }, + TncEvent::TransmitVoice(voice) => { + if socket.send_to(voice.as_bytes(), dest).is_err() { + break; + }; + } } } single_conn_ended.store(true, Ordering::Release); @@ -241,7 +287,6 @@ fn run_single_conn( .lock() .unwrap() .status_changed(TncStatus::Disconnected); - println!("write thread terminating"); } fn spawn_reader(socket: UdpSocket, event_tx: Sender, cancel: Arc) { @@ -257,7 +302,6 @@ fn spawn_reader(socket: UdpSocket, event_tx: Sender, cancel: Arc