]> code.octet-stream.net Git - m17rt/blob - m17app/src/reflector.rs
Implement TX for reflector TNC
[m17rt] / m17app / src / reflector.rs
1 use std::{
2 io::{self, Read, Write},
3 net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket},
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 mpsc::{self, Receiver, Sender},
7 Arc, Mutex,
8 },
9 thread,
10 time::Duration,
11 };
12
13 use crate::{link_setup::M17Address, tnc::Tnc, util::out_buffer::OutBuffer};
14 use m17core::{
15 kiss::{KissBuffer, KissCommand, KissFrame, PORT_STREAM},
16 protocol::{LsfFrame, StreamFrame},
17 reflector::{
18 convert::{RfToVoice, VoiceToRf},
19 packet::{Connect, Pong, ServerMessage, Voice},
20 },
21 };
22
23 #[derive(Debug, PartialEq, Eq, Clone)]
24 pub struct ReflectorClientConfig {
25 hostname: String,
26 port: u16,
27 module: char,
28 local_callsign: M17Address,
29 }
30
31 type WrappedStatusHandler = Arc<Mutex<dyn StatusHandler + Send + 'static>>;
32
33 /// Network-based TNC that attempts to maintain a UDP connection to a reflector.
34 ///
35 /// Streams will be sent and received over IP rather than RF.
36 #[derive(Clone)]
37 pub struct ReflectorClientTnc {
38 config: ReflectorClientConfig,
39 status_handler: WrappedStatusHandler,
40 kiss_out_tx: Sender<Arc<[u8]>>,
41 kiss_out: OutBuffer,
42 event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
43 is_closed: Arc<AtomicBool>,
44 kiss_buffer: Arc<Mutex<KissBuffer>>,
45 rf_to_voice: Arc<Mutex<Option<RfToVoice>>>,
46 }
47
48 impl ReflectorClientTnc {
49 /// Create a new Reflector Client TNC.
50 ///
51 /// You must provide a configuration object and a handler for status events, such as when the TNC
52 /// connects and disconnects. The status events are purely information and if you're not interested
53 /// in them, provide a `NullStatusHandler`.
54 pub fn new<S: StatusHandler + Send + 'static>(
55 config: ReflectorClientConfig,
56 status: S,
57 ) -> Self {
58 let (tx, rx) = mpsc::channel();
59 Self {
60 config,
61 status_handler: Arc::new(Mutex::new(status)),
62 kiss_out_tx: tx,
63 kiss_out: OutBuffer::new(rx),
64 event_tx: Arc::new(Mutex::new(None)),
65 is_closed: Arc::new(AtomicBool::new(false)),
66 kiss_buffer: Arc::new(Mutex::new(KissBuffer::new())),
67 rf_to_voice: Arc::new(Mutex::new(None)),
68 }
69 }
70 }
71
72 impl Read for ReflectorClientTnc {
73 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
74 self.kiss_out.read(buf)
75 }
76 }
77
78 impl Write for ReflectorClientTnc {
79 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
80 let mut kiss = self.kiss_buffer.lock().unwrap();
81 let rem = kiss.buf_remaining();
82 let sz = buf.len().max(rem.len());
83 rem[0..sz].copy_from_slice(&buf[0..sz]);
84 if let Some(frame) = kiss.next_frame() {
85 if Ok(KissCommand::DataFrame) == frame.command() && frame.port() == Ok(PORT_STREAM) {
86 let mut payload = [0u8; 30];
87 if let Ok(len) = frame.decode_payload(&mut payload) {
88 if len == 30 {
89 let lsf = LsfFrame(payload);
90 let mut to_voice = self.rf_to_voice.lock().unwrap();
91 match &mut *to_voice {
92 Some(to_voice) => to_voice.process_lsf(lsf),
93 None => *to_voice = Some(RfToVoice::new(lsf)),
94 }
95 } else if len == 26 {
96 let frame_num_part = u16::from_be_bytes([payload[6], payload[7]]);
97 let frame = StreamFrame {
98 lich_idx: payload[5] >> 5,
99 lich_part: payload[0..5].try_into().unwrap(),
100 frame_number: frame_num_part & 0x7fff,
101 end_of_stream: frame_num_part & 0x8000 > 0,
102 stream_data: payload[8..24].try_into().unwrap(),
103 };
104 let to_voice = self.rf_to_voice.lock().unwrap();
105 if let Some(to_voice) = &*to_voice {
106 let voice = to_voice.process_stream(&frame);
107 if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
108 let _ = tx.send(TncEvent::TransmitVoice(voice));
109 }
110 }
111 }
112 };
113 }
114 }
115 Ok(sz)
116 }
117
118 fn flush(&mut self) -> std::io::Result<()> {
119 Ok(())
120 }
121 }
122
123 impl Tnc for ReflectorClientTnc {
124 fn try_clone(&mut self) -> Result<Self, crate::tnc::TncError> {
125 Ok(self.clone())
126 }
127
128 fn start(&mut self) {
129 spawn_runner(
130 self.config.clone(),
131 self.status_handler.clone(),
132 self.event_tx.clone(),
133 self.is_closed.clone(),
134 self.kiss_out_tx.clone(),
135 );
136 }
137
138 fn close(&mut self) {
139 if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
140 self.is_closed.store(true, Ordering::Release);
141 let _ = tx.send(TncEvent::Close);
142 }
143 }
144 }
145
146 #[allow(clippy::large_enum_variant)]
147 enum TncEvent {
148 Close,
149 Received(ServerMessage),
150 TransmitVoice(Voice),
151 }
152
153 fn spawn_runner(
154 config: ReflectorClientConfig,
155 status: WrappedStatusHandler,
156 event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
157 is_closed: Arc<AtomicBool>,
158 kiss_out_tx: Sender<Arc<[u8]>>,
159 ) {
160 std::thread::spawn(move || {
161 status
162 .lock()
163 .unwrap()
164 .status_changed(TncStatus::Disconnected);
165 while !is_closed.load(Ordering::Acquire) {
166 status.lock().unwrap().status_changed(TncStatus::Connecting);
167 let sa = if let Ok(mut sa_iter) =
168 (config.hostname.as_str(), config.port).to_socket_addrs()
169 {
170 if let Some(sa) = sa_iter.next() {
171 sa
172 } else {
173 status
174 .lock()
175 .unwrap()
176 .status_changed(TncStatus::Disconnected);
177 thread::sleep(Duration::from_secs(10));
178 continue;
179 }
180 } else {
181 status
182 .lock()
183 .unwrap()
184 .status_changed(TncStatus::Disconnected);
185 thread::sleep(Duration::from_secs(10));
186 continue;
187 };
188 let (tx, rx) = mpsc::channel();
189 *event_tx.lock().unwrap() = Some(tx.clone());
190 if !is_closed.load(Ordering::Acquire) {
191 run_single_conn(
192 sa,
193 tx,
194 rx,
195 kiss_out_tx.clone(),
196 config.clone(),
197 status.clone(),
198 );
199 }
200 }
201 status.lock().unwrap().status_changed(TncStatus::Closed);
202 });
203 }
204
205 fn run_single_conn(
206 dest: SocketAddr,
207 event_tx: Sender<TncEvent>,
208 event_rx: Receiver<TncEvent>,
209 kiss_out_tx: Sender<Arc<[u8]>>,
210 config: ReflectorClientConfig,
211 status: WrappedStatusHandler,
212 ) {
213 let socket = if dest.is_ipv4() {
214 UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).unwrap()
215 } else {
216 UdpSocket::bind((Ipv6Addr::UNSPECIFIED, 0)).unwrap()
217 };
218
219 let mut connect = Connect::new();
220 connect.set_address(config.local_callsign.address().to_owned());
221 connect.set_module(config.module);
222 let _ = socket.send_to(connect.as_bytes(), dest);
223 let mut converter = VoiceToRf::new();
224 let single_conn_ended = Arc::new(AtomicBool::new(false));
225 // TODO: unwrap
226 spawn_reader(
227 socket.try_clone().unwrap(),
228 event_tx,
229 single_conn_ended.clone(),
230 );
231
232 while let Ok(ev) = event_rx.recv_timeout(Duration::from_secs(30)) {
233 match ev {
234 TncEvent::Close => {
235 break;
236 }
237 TncEvent::Received(server_msg) => match server_msg {
238 ServerMessage::ConnectAcknowledge(_) => {
239 status.lock().unwrap().status_changed(TncStatus::Connected);
240 }
241 ServerMessage::ConnectNack(_) => {
242 status
243 .lock()
244 .unwrap()
245 .status_changed(TncStatus::ConnectRejected);
246 break;
247 }
248 ServerMessage::ForceDisconnect(_) => {
249 status
250 .lock()
251 .unwrap()
252 .status_changed(TncStatus::ForceDisconnect);
253 break;
254 }
255 ServerMessage::Voice(voice) => {
256 let (lsf, stream) = converter.next(&voice);
257 if let Some(lsf) = lsf {
258 let kiss = KissFrame::new_stream_setup(&lsf.0).unwrap();
259 let _ = kiss_out_tx.send(kiss.as_bytes().into());
260 }
261 let kiss = KissFrame::new_stream_data(&stream).unwrap();
262 let _ = kiss_out_tx.send(kiss.as_bytes().into());
263 }
264 ServerMessage::Ping(_ping) => {
265 let mut pong = Pong::new();
266 pong.set_address(
267 M17Address::from_callsign("VK7XT")
268 .unwrap()
269 .address()
270 .clone(),
271 );
272 if socket.send_to(pong.as_bytes(), dest).is_err() {
273 break;
274 }
275 }
276 _ => {}
277 },
278 TncEvent::TransmitVoice(voice) => {
279 if socket.send_to(voice.as_bytes(), dest).is_err() {
280 break;
281 };
282 }
283 }
284 }
285 single_conn_ended.store(true, Ordering::Release);
286 status
287 .lock()
288 .unwrap()
289 .status_changed(TncStatus::Disconnected);
290 }
291
292 fn spawn_reader(socket: UdpSocket, event_tx: Sender<TncEvent>, cancel: Arc<AtomicBool>) {
293 std::thread::spawn(move || {
294 let mut buf = [0u8; 2048];
295 while let Ok((n, _sa)) = socket.recv_from(&mut buf) {
296 if cancel.load(Ordering::Acquire) {
297 break;
298 }
299 if let Some(msg) = ServerMessage::parse(&buf[..n]) {
300 if event_tx.send(TncEvent::Received(msg)).is_err() {
301 break;
302 }
303 }
304 }
305 });
306 }
307
308 /// Callbacks to get runtime information about how the reflector client TNC is operating
309 pub trait StatusHandler {
310 fn status_changed(&mut self, status: TncStatus);
311 }
312
313 #[derive(Debug, PartialEq, Eq, Clone)]
314 pub enum TncStatus {
315 Disconnected,
316 Connecting,
317 Connected,
318 ConnectRejected,
319 ForceDisconnect,
320 Closed,
321 }
322
323 pub struct NullStatusHandler;
324 impl StatusHandler for NullStatusHandler {
325 fn status_changed(&mut self, _status: TncStatus) {}
326 }