io::{self, Read, Write},
net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket},
sync::{
+ Arc, Mutex,
atomic::{AtomicBool, Ordering},
mpsc::{self, Receiver, Sender},
- Arc, Mutex,
},
thread,
time::Duration,
use crate::{link_setup::M17Address, tnc::Tnc, util::out_buffer::OutBuffer};
use m17core::{
- kiss::KissFrame,
+ kiss::{KissBuffer, KissCommand, KissFrame, PORT_PACKET_BASIC, PORT_PACKET_FULL, PORT_STREAM},
+ protocol::{LsfFrame, StreamFrame},
reflector::{
- convert::VoiceToRf,
- packet::{Connect, Pong, ServerMessage},
+ convert::{RfToVoice, VoiceToRf},
+ packet::{Connect, Packet, Pong, ServerMessage, Voice},
},
};
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct ReflectorClientConfig {
- hostname: String,
- port: u16,
- module: char,
- local_callsign: M17Address,
+ pub hostname: String,
+ pub port: u16,
+ pub module: char,
+ pub local_callsign: M17Address,
}
type WrappedStatusHandler = Arc<Mutex<dyn StatusHandler + Send + 'static>>;
kiss_out: OutBuffer,
event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
is_closed: Arc<AtomicBool>,
+ kiss_buffer: Arc<Mutex<KissBuffer>>,
+ rf_to_voice: Arc<Mutex<Option<RfToVoice>>>,
}
impl ReflectorClientTnc {
kiss_out: OutBuffer::new(rx),
event_tx: Arc::new(Mutex::new(None)),
is_closed: Arc::new(AtomicBool::new(false)),
+ kiss_buffer: Arc::new(Mutex::new(KissBuffer::new())),
+ rf_to_voice: Arc::new(Mutex::new(None)),
}
}
}
impl Write for ReflectorClientTnc {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- Ok(buf.len())
+ let mut kiss = self.kiss_buffer.lock().unwrap();
+ let rem = kiss.buf_remaining();
+ let sz = buf.len().min(rem.len());
+ rem[0..sz].copy_from_slice(&buf[0..sz]);
+ kiss.did_write(sz);
+ while let Some(frame) = kiss.next_frame() {
+ if Ok(KissCommand::DataFrame) == frame.command() && frame.port() == Ok(PORT_STREAM) {
+ let mut payload = [0u8; 30];
+ if let Ok(len) = frame.decode_payload(&mut payload) {
+ if len == 30 {
+ let lsf = LsfFrame(payload);
+ let mut to_voice = self.rf_to_voice.lock().unwrap();
+ match &mut *to_voice {
+ Some(to_voice) => to_voice.process_lsf(lsf),
+ None => *to_voice = Some(RfToVoice::new(lsf)),
+ }
+ } else if len == 26 {
+ let frame_num_part = u16::from_be_bytes([payload[6], payload[7]]);
+ let frame = StreamFrame {
+ lich_idx: payload[5] >> 5,
+ lich_part: payload[0..5].try_into().unwrap(),
+ frame_number: frame_num_part & 0x7fff,
+ end_of_stream: frame_num_part & 0x8000 > 0,
+ stream_data: payload[8..24].try_into().unwrap(),
+ };
+ let to_voice = self.rf_to_voice.lock().unwrap();
+ if let Some(to_voice) = &*to_voice {
+ let voice = to_voice.process_stream(&frame);
+ if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
+ let _ = tx.send(TncEvent::TransmitVoice(voice));
+ }
+ }
+ }
+ };
+ } else if Ok(KissCommand::DataFrame) == frame.command()
+ && frame.port() == Ok(PORT_PACKET_BASIC)
+ {
+ // basic packets not supported for now, they will require more config
+ } else if Ok(KissCommand::DataFrame) == frame.command()
+ && frame.port() == Ok(PORT_PACKET_FULL)
+ {
+ let mut payload = [0u8; 855];
+ let Ok(len) = frame.decode_payload(&mut payload) else {
+ continue;
+ };
+ if len < 33 {
+ continue;
+ }
+ let mut lsf = LsfFrame([0u8; 30]);
+ lsf.0.copy_from_slice(&payload[0..30]);
+ if lsf.check_crc() != 0 {
+ continue;
+ }
+ let mut packet = Packet::new();
+ packet.set_link_setup_frame(&lsf);
+ packet.set_payload(&payload[30..]);
+ }
+ }
+ Ok(sz)
}
fn flush(&mut self) -> std::io::Result<()> {
enum TncEvent {
Close,
Received(ServerMessage),
+ TransmitVoice(Voice),
}
fn spawn_runner(
config.clone(),
status.clone(),
);
+ // Cool off a bit if connect rejected, etc.
+ thread::sleep(Duration::from_secs(10));
}
- println!("single conn ended");
}
status.lock().unwrap().status_changed(TncStatus::Closed);
});
};
let mut connect = Connect::new();
- connect.set_address(config.local_callsign.address().to_owned());
+ connect.set_address(config.local_callsign.address());
connect.set_module(config.module);
- socket.send_to(connect.as_bytes(), dest).unwrap();
+ let _ = socket.send_to(connect.as_bytes(), dest);
let mut converter = VoiceToRf::new();
let single_conn_ended = Arc::new(AtomicBool::new(false));
// TODO: unwrap
while let Ok(ev) = event_rx.recv_timeout(Duration::from_secs(30)) {
match ev {
TncEvent::Close => {
- println!("writer: close");
break;
}
TncEvent::Received(server_msg) => match server_msg {
let kiss = KissFrame::new_stream_data(&stream).unwrap();
let _ = kiss_out_tx.send(kiss.as_bytes().into());
}
+ ServerMessage::Packet(packet) => {
+ if let Ok(kiss) =
+ KissFrame::new_full_packet(&packet.link_setup_frame().0, packet.payload())
+ {
+ let _ = kiss_out_tx.send(kiss.as_bytes().into());
+ }
+ }
ServerMessage::Ping(_ping) => {
let mut pong = Pong::new();
- pong.set_address(
- M17Address::from_callsign("VK7XT")
- .unwrap()
- .address()
- .clone(),
- );
- socket.send_to(pong.as_bytes(), dest).unwrap();
+ pong.set_address(config.local_callsign.address());
+ if socket.send_to(pong.as_bytes(), dest).is_err() {
+ break;
+ }
}
_ => {}
},
+ TncEvent::TransmitVoice(voice) => {
+ if socket.send_to(voice.as_bytes(), dest).is_err() {
+ break;
+ };
+ }
}
}
single_conn_ended.store(true, Ordering::Release);
.lock()
.unwrap()
.status_changed(TncStatus::Disconnected);
- println!("write thread terminating");
}
fn spawn_reader(socket: UdpSocket, event_tx: Sender<TncEvent>, cancel: Arc<AtomicBool>) {
}
}
}
- println!("read thread terminating");
});
}