]> code.octet-stream.net Git - m17rt/commitdiff
Implement TX for reflector TNC
authorThomas Karpiniec <tom.karpiniec@outlook.com>
Mon, 2 Jun 2025 10:14:33 +0000 (20:14 +1000)
committerThomas Karpiniec <tom.karpiniec@outlook.com>
Mon, 2 Jun 2025 10:14:33 +0000 (20:14 +1000)
m17app/src/reflector.rs
m17core/src/reflector/convert.rs

index 9735298131bd510132fb21b41bd7311d2e8f0a22..419093eb7b0dc2b41f77d4385315e4761fc58e3c 100644 (file)
@@ -12,10 +12,11 @@ use std::{
 
 use crate::{link_setup::M17Address, tnc::Tnc, util::out_buffer::OutBuffer};
 use m17core::{
-    kiss::KissFrame,
+    kiss::{KissBuffer, KissCommand, KissFrame, PORT_STREAM},
+    protocol::{LsfFrame, StreamFrame},
     reflector::{
-        convert::VoiceToRf,
-        packet::{Connect, Pong, ServerMessage},
+        convert::{RfToVoice, VoiceToRf},
+        packet::{Connect, Pong, ServerMessage, Voice},
     },
 };
 
@@ -40,6 +41,8 @@ pub struct ReflectorClientTnc {
     kiss_out: OutBuffer,
     event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
     is_closed: Arc<AtomicBool>,
+    kiss_buffer: Arc<Mutex<KissBuffer>>,
+    rf_to_voice: Arc<Mutex<Option<RfToVoice>>>,
 }
 
 impl ReflectorClientTnc {
@@ -60,6 +63,8 @@ impl ReflectorClientTnc {
             kiss_out: OutBuffer::new(rx),
             event_tx: Arc::new(Mutex::new(None)),
             is_closed: Arc::new(AtomicBool::new(false)),
+            kiss_buffer: Arc::new(Mutex::new(KissBuffer::new())),
+            rf_to_voice: Arc::new(Mutex::new(None)),
         }
     }
 }
@@ -72,7 +77,42 @@ impl Read for ReflectorClientTnc {
 
 impl Write for ReflectorClientTnc {
     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-        Ok(buf.len())
+        let mut kiss = self.kiss_buffer.lock().unwrap();
+        let rem = kiss.buf_remaining();
+        let sz = buf.len().max(rem.len());
+        rem[0..sz].copy_from_slice(&buf[0..sz]);
+        if let Some(frame) = kiss.next_frame() {
+            if Ok(KissCommand::DataFrame) == frame.command() && frame.port() == Ok(PORT_STREAM) {
+                let mut payload = [0u8; 30];
+                if let Ok(len) = frame.decode_payload(&mut payload) {
+                    if len == 30 {
+                        let lsf = LsfFrame(payload);
+                        let mut to_voice = self.rf_to_voice.lock().unwrap();
+                        match &mut *to_voice {
+                            Some(to_voice) => to_voice.process_lsf(lsf),
+                            None => *to_voice = Some(RfToVoice::new(lsf)),
+                        }
+                    } else if len == 26 {
+                        let frame_num_part = u16::from_be_bytes([payload[6], payload[7]]);
+                        let frame = StreamFrame {
+                            lich_idx: payload[5] >> 5,
+                            lich_part: payload[0..5].try_into().unwrap(),
+                            frame_number: frame_num_part & 0x7fff,
+                            end_of_stream: frame_num_part & 0x8000 > 0,
+                            stream_data: payload[8..24].try_into().unwrap(),
+                        };
+                        let to_voice = self.rf_to_voice.lock().unwrap();
+                        if let Some(to_voice) = &*to_voice {
+                            let voice = to_voice.process_stream(&frame);
+                            if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
+                                let _ = tx.send(TncEvent::TransmitVoice(voice));
+                            }
+                        }
+                    }
+                };
+            }
+        }
+        Ok(sz)
     }
 
     fn flush(&mut self) -> std::io::Result<()> {
@@ -107,6 +147,7 @@ impl Tnc for ReflectorClientTnc {
 enum TncEvent {
     Close,
     Received(ServerMessage),
+    TransmitVoice(Voice),
 }
 
 fn spawn_runner(
@@ -156,7 +197,6 @@ fn spawn_runner(
                     status.clone(),
                 );
             }
-            println!("single conn ended");
         }
         status.lock().unwrap().status_changed(TncStatus::Closed);
     });
@@ -179,7 +219,7 @@ fn run_single_conn(
     let mut connect = Connect::new();
     connect.set_address(config.local_callsign.address().to_owned());
     connect.set_module(config.module);
-    socket.send_to(connect.as_bytes(), dest).unwrap();
+    let _ = socket.send_to(connect.as_bytes(), dest);
     let mut converter = VoiceToRf::new();
     let single_conn_ended = Arc::new(AtomicBool::new(false));
     // TODO: unwrap
@@ -192,7 +232,6 @@ fn run_single_conn(
     while let Ok(ev) = event_rx.recv_timeout(Duration::from_secs(30)) {
         match ev {
             TncEvent::Close => {
-                println!("writer: close");
                 break;
             }
             TncEvent::Received(server_msg) => match server_msg {
@@ -230,10 +269,17 @@ fn run_single_conn(
                             .address()
                             .clone(),
                     );
-                    socket.send_to(pong.as_bytes(), dest).unwrap();
+                    if socket.send_to(pong.as_bytes(), dest).is_err() {
+                        break;
+                    }
                 }
                 _ => {}
             },
+            TncEvent::TransmitVoice(voice) => {
+                if socket.send_to(voice.as_bytes(), dest).is_err() {
+                    break;
+                };
+            }
         }
     }
     single_conn_ended.store(true, Ordering::Release);
@@ -241,7 +287,6 @@ fn run_single_conn(
         .lock()
         .unwrap()
         .status_changed(TncStatus::Disconnected);
-    println!("write thread terminating");
 }
 
 fn spawn_reader(socket: UdpSocket, event_tx: Sender<TncEvent>, cancel: Arc<AtomicBool>) {
@@ -257,7 +302,6 @@ fn spawn_reader(socket: UdpSocket, event_tx: Sender<TncEvent>, cancel: Arc<Atomi
                 }
             }
         }
-        println!("read thread terminating");
     });
 }
 
index ecbb5a5c6573bc673add323e8e4abcd8ee3f9e14..fd52df87377908b2b3101f230c607c4c56346dbe 100644 (file)
@@ -62,6 +62,7 @@ impl VoiceToRf {
 ///
 /// For a series of transmissions this object should be re-used so that Stream ID is correctly
 /// changed after each new LSF.
+#[derive(Debug, Clone)]
 pub struct RfToVoice {
     lsf: LsfFrame,
     stream_id: u16,