X-Git-Url: https://code.octet-stream.net/m17rt/blobdiff_plain/659d8bcef911fdcb63bf76c9f4fab7dc74cbcf08..HEAD:/m17app/src/reflector.rs diff --git a/m17app/src/reflector.rs b/m17app/src/reflector.rs index 9735298..b5825bb 100644 --- a/m17app/src/reflector.rs +++ b/m17app/src/reflector.rs @@ -2,9 +2,9 @@ use std::{ io::{self, Read, Write}, net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket}, sync::{ + Arc, Mutex, atomic::{AtomicBool, Ordering}, mpsc::{self, Receiver, Sender}, - Arc, Mutex, }, thread, time::Duration, @@ -12,19 +12,20 @@ use std::{ use crate::{link_setup::M17Address, tnc::Tnc, util::out_buffer::OutBuffer}; use m17core::{ - kiss::KissFrame, + kiss::{KissBuffer, KissCommand, KissFrame, PORT_STREAM}, + protocol::{LsfFrame, StreamFrame}, reflector::{ - convert::VoiceToRf, - packet::{Connect, Pong, ServerMessage}, + convert::{RfToVoice, VoiceToRf}, + packet::{Connect, Pong, ServerMessage, Voice}, }, }; #[derive(Debug, PartialEq, Eq, Clone)] pub struct ReflectorClientConfig { - hostname: String, - port: u16, - module: char, - local_callsign: M17Address, + pub hostname: String, + pub port: u16, + pub module: char, + pub local_callsign: M17Address, } type WrappedStatusHandler = Arc>; @@ -40,6 +41,8 @@ pub struct ReflectorClientTnc { kiss_out: OutBuffer, event_tx: Arc>>>, is_closed: Arc, + kiss_buffer: Arc>, + rf_to_voice: Arc>>, } impl ReflectorClientTnc { @@ -60,6 +63,8 @@ impl ReflectorClientTnc { kiss_out: OutBuffer::new(rx), event_tx: Arc::new(Mutex::new(None)), is_closed: Arc::new(AtomicBool::new(false)), + kiss_buffer: Arc::new(Mutex::new(KissBuffer::new())), + rf_to_voice: Arc::new(Mutex::new(None)), } } } @@ -72,7 +77,43 @@ impl Read for ReflectorClientTnc { impl Write for ReflectorClientTnc { fn write(&mut self, buf: &[u8]) -> std::io::Result { - Ok(buf.len()) + let mut kiss = self.kiss_buffer.lock().unwrap(); + let rem = kiss.buf_remaining(); + let sz = buf.len().min(rem.len()); + rem[0..sz].copy_from_slice(&buf[0..sz]); + kiss.did_write(sz); + if let Some(frame) = kiss.next_frame() { + if Ok(KissCommand::DataFrame) == frame.command() && frame.port() == Ok(PORT_STREAM) { + let mut payload = [0u8; 30]; + if let Ok(len) = frame.decode_payload(&mut payload) { + if len == 30 { + let lsf = LsfFrame(payload); + let mut to_voice = self.rf_to_voice.lock().unwrap(); + match &mut *to_voice { + Some(to_voice) => to_voice.process_lsf(lsf), + None => *to_voice = Some(RfToVoice::new(lsf)), + } + } else if len == 26 { + let frame_num_part = u16::from_be_bytes([payload[6], payload[7]]); + let frame = StreamFrame { + lich_idx: payload[5] >> 5, + lich_part: payload[0..5].try_into().unwrap(), + frame_number: frame_num_part & 0x7fff, + end_of_stream: frame_num_part & 0x8000 > 0, + stream_data: payload[8..24].try_into().unwrap(), + }; + let to_voice = self.rf_to_voice.lock().unwrap(); + if let Some(to_voice) = &*to_voice { + let voice = to_voice.process_stream(&frame); + if let Some(tx) = self.event_tx.lock().unwrap().as_ref() { + let _ = tx.send(TncEvent::TransmitVoice(voice)); + } + } + } + }; + } + } + Ok(sz) } fn flush(&mut self) -> std::io::Result<()> { @@ -107,6 +148,7 @@ impl Tnc for ReflectorClientTnc { enum TncEvent { Close, Received(ServerMessage), + TransmitVoice(Voice), } fn spawn_runner( @@ -155,8 +197,9 @@ fn spawn_runner( config.clone(), status.clone(), ); + // Cool off a bit if connect rejected, etc. + thread::sleep(Duration::from_secs(10)); } - println!("single conn ended"); } status.lock().unwrap().status_changed(TncStatus::Closed); }); @@ -177,9 +220,9 @@ fn run_single_conn( }; let mut connect = Connect::new(); - connect.set_address(config.local_callsign.address().to_owned()); + connect.set_address(config.local_callsign.address()); connect.set_module(config.module); - socket.send_to(connect.as_bytes(), dest).unwrap(); + let _ = socket.send_to(connect.as_bytes(), dest); let mut converter = VoiceToRf::new(); let single_conn_ended = Arc::new(AtomicBool::new(false)); // TODO: unwrap @@ -192,7 +235,6 @@ fn run_single_conn( while let Ok(ev) = event_rx.recv_timeout(Duration::from_secs(30)) { match ev { TncEvent::Close => { - println!("writer: close"); break; } TncEvent::Received(server_msg) => match server_msg { @@ -224,16 +266,18 @@ fn run_single_conn( } ServerMessage::Ping(_ping) => { let mut pong = Pong::new(); - pong.set_address( - M17Address::from_callsign("VK7XT") - .unwrap() - .address() - .clone(), - ); - socket.send_to(pong.as_bytes(), dest).unwrap(); + pong.set_address(config.local_callsign.address()); + if socket.send_to(pong.as_bytes(), dest).is_err() { + break; + } } _ => {} }, + TncEvent::TransmitVoice(voice) => { + if socket.send_to(voice.as_bytes(), dest).is_err() { + break; + }; + } } } single_conn_ended.store(true, Ordering::Release); @@ -241,7 +285,6 @@ fn run_single_conn( .lock() .unwrap() .status_changed(TncStatus::Disconnected); - println!("write thread terminating"); } fn spawn_reader(socket: UdpSocket, event_tx: Sender, cancel: Arc) { @@ -257,7 +300,6 @@ fn spawn_reader(socket: UdpSocket, event_tx: Sender, cancel: Arc