]> code.octet-stream.net Git - m17rt/blobdiff - m17app/src/reflector.rs
Make netclient work against mrefd
[m17rt] / m17app / src / reflector.rs
index 9735298131bd510132fb21b41bd7311d2e8f0a22..b5825bb584922ad7b552e2f2bcfe431fbe959133 100644 (file)
@@ -2,9 +2,9 @@ use std::{
     io::{self, Read, Write},
     net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket},
     sync::{
+        Arc, Mutex,
         atomic::{AtomicBool, Ordering},
         mpsc::{self, Receiver, Sender},
-        Arc, Mutex,
     },
     thread,
     time::Duration,
@@ -12,19 +12,20 @@ use std::{
 
 use crate::{link_setup::M17Address, tnc::Tnc, util::out_buffer::OutBuffer};
 use m17core::{
-    kiss::KissFrame,
+    kiss::{KissBuffer, KissCommand, KissFrame, PORT_STREAM},
+    protocol::{LsfFrame, StreamFrame},
     reflector::{
-        convert::VoiceToRf,
-        packet::{Connect, Pong, ServerMessage},
+        convert::{RfToVoice, VoiceToRf},
+        packet::{Connect, Pong, ServerMessage, Voice},
     },
 };
 
 #[derive(Debug, PartialEq, Eq, Clone)]
 pub struct ReflectorClientConfig {
-    hostname: String,
-    port: u16,
-    module: char,
-    local_callsign: M17Address,
+    pub hostname: String,
+    pub port: u16,
+    pub module: char,
+    pub local_callsign: M17Address,
 }
 
 type WrappedStatusHandler = Arc<Mutex<dyn StatusHandler + Send + 'static>>;
@@ -40,6 +41,8 @@ pub struct ReflectorClientTnc {
     kiss_out: OutBuffer,
     event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
     is_closed: Arc<AtomicBool>,
+    kiss_buffer: Arc<Mutex<KissBuffer>>,
+    rf_to_voice: Arc<Mutex<Option<RfToVoice>>>,
 }
 
 impl ReflectorClientTnc {
@@ -60,6 +63,8 @@ impl ReflectorClientTnc {
             kiss_out: OutBuffer::new(rx),
             event_tx: Arc::new(Mutex::new(None)),
             is_closed: Arc::new(AtomicBool::new(false)),
+            kiss_buffer: Arc::new(Mutex::new(KissBuffer::new())),
+            rf_to_voice: Arc::new(Mutex::new(None)),
         }
     }
 }
@@ -72,7 +77,43 @@ impl Read for ReflectorClientTnc {
 
 impl Write for ReflectorClientTnc {
     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-        Ok(buf.len())
+        let mut kiss = self.kiss_buffer.lock().unwrap();
+        let rem = kiss.buf_remaining();
+        let sz = buf.len().min(rem.len());
+        rem[0..sz].copy_from_slice(&buf[0..sz]);
+        kiss.did_write(sz);
+        if let Some(frame) = kiss.next_frame() {
+            if Ok(KissCommand::DataFrame) == frame.command() && frame.port() == Ok(PORT_STREAM) {
+                let mut payload = [0u8; 30];
+                if let Ok(len) = frame.decode_payload(&mut payload) {
+                    if len == 30 {
+                        let lsf = LsfFrame(payload);
+                        let mut to_voice = self.rf_to_voice.lock().unwrap();
+                        match &mut *to_voice {
+                            Some(to_voice) => to_voice.process_lsf(lsf),
+                            None => *to_voice = Some(RfToVoice::new(lsf)),
+                        }
+                    } else if len == 26 {
+                        let frame_num_part = u16::from_be_bytes([payload[6], payload[7]]);
+                        let frame = StreamFrame {
+                            lich_idx: payload[5] >> 5,
+                            lich_part: payload[0..5].try_into().unwrap(),
+                            frame_number: frame_num_part & 0x7fff,
+                            end_of_stream: frame_num_part & 0x8000 > 0,
+                            stream_data: payload[8..24].try_into().unwrap(),
+                        };
+                        let to_voice = self.rf_to_voice.lock().unwrap();
+                        if let Some(to_voice) = &*to_voice {
+                            let voice = to_voice.process_stream(&frame);
+                            if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
+                                let _ = tx.send(TncEvent::TransmitVoice(voice));
+                            }
+                        }
+                    }
+                };
+            }
+        }
+        Ok(sz)
     }
 
     fn flush(&mut self) -> std::io::Result<()> {
@@ -107,6 +148,7 @@ impl Tnc for ReflectorClientTnc {
 enum TncEvent {
     Close,
     Received(ServerMessage),
+    TransmitVoice(Voice),
 }
 
 fn spawn_runner(
@@ -155,8 +197,9 @@ fn spawn_runner(
                     config.clone(),
                     status.clone(),
                 );
+                // Cool off a bit if connect rejected, etc.
+                thread::sleep(Duration::from_secs(10));
             }
-            println!("single conn ended");
         }
         status.lock().unwrap().status_changed(TncStatus::Closed);
     });
@@ -177,9 +220,9 @@ fn run_single_conn(
     };
 
     let mut connect = Connect::new();
-    connect.set_address(config.local_callsign.address().to_owned());
+    connect.set_address(config.local_callsign.address());
     connect.set_module(config.module);
-    socket.send_to(connect.as_bytes(), dest).unwrap();
+    let _ = socket.send_to(connect.as_bytes(), dest);
     let mut converter = VoiceToRf::new();
     let single_conn_ended = Arc::new(AtomicBool::new(false));
     // TODO: unwrap
@@ -192,7 +235,6 @@ fn run_single_conn(
     while let Ok(ev) = event_rx.recv_timeout(Duration::from_secs(30)) {
         match ev {
             TncEvent::Close => {
-                println!("writer: close");
                 break;
             }
             TncEvent::Received(server_msg) => match server_msg {
@@ -224,16 +266,18 @@ fn run_single_conn(
                 }
                 ServerMessage::Ping(_ping) => {
                     let mut pong = Pong::new();
-                    pong.set_address(
-                        M17Address::from_callsign("VK7XT")
-                            .unwrap()
-                            .address()
-                            .clone(),
-                    );
-                    socket.send_to(pong.as_bytes(), dest).unwrap();
+                    pong.set_address(config.local_callsign.address());
+                    if socket.send_to(pong.as_bytes(), dest).is_err() {
+                        break;
+                    }
                 }
                 _ => {}
             },
+            TncEvent::TransmitVoice(voice) => {
+                if socket.send_to(voice.as_bytes(), dest).is_err() {
+                    break;
+                };
+            }
         }
     }
     single_conn_ended.store(true, Ordering::Release);
@@ -241,7 +285,6 @@ fn run_single_conn(
         .lock()
         .unwrap()
         .status_changed(TncStatus::Disconnected);
-    println!("write thread terminating");
 }
 
 fn spawn_reader(socket: UdpSocket, event_tx: Sender<TncEvent>, cancel: Arc<AtomicBool>) {
@@ -257,7 +300,6 @@ fn spawn_reader(socket: UdpSocket, event_tx: Sender<TncEvent>, cancel: Arc<Atomi
                 }
             }
         }
-        println!("read thread terminating");
     });
 }