]> code.octet-stream.net Git - m17rt/blob - m17app/src/reflector.rs
Make netclient work against mrefd
[m17rt] / m17app / src / reflector.rs
1 use std::{
2 io::{self, Read, Write},
3 net::{Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket},
4 sync::{
5 Arc, Mutex,
6 atomic::{AtomicBool, Ordering},
7 mpsc::{self, Receiver, Sender},
8 },
9 thread,
10 time::Duration,
11 };
12
13 use crate::{link_setup::M17Address, tnc::Tnc, util::out_buffer::OutBuffer};
14 use m17core::{
15 kiss::{KissBuffer, KissCommand, KissFrame, PORT_STREAM},
16 protocol::{LsfFrame, StreamFrame},
17 reflector::{
18 convert::{RfToVoice, VoiceToRf},
19 packet::{Connect, Pong, ServerMessage, Voice},
20 },
21 };
22
23 #[derive(Debug, PartialEq, Eq, Clone)]
24 pub struct ReflectorClientConfig {
25 pub hostname: String,
26 pub port: u16,
27 pub module: char,
28 pub local_callsign: M17Address,
29 }
30
31 type WrappedStatusHandler = Arc<Mutex<dyn StatusHandler + Send + 'static>>;
32
33 /// Network-based TNC that attempts to maintain a UDP connection to a reflector.
34 ///
35 /// Streams will be sent and received over IP rather than RF.
36 #[derive(Clone)]
37 pub struct ReflectorClientTnc {
38 config: ReflectorClientConfig,
39 status_handler: WrappedStatusHandler,
40 kiss_out_tx: Sender<Arc<[u8]>>,
41 kiss_out: OutBuffer,
42 event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
43 is_closed: Arc<AtomicBool>,
44 kiss_buffer: Arc<Mutex<KissBuffer>>,
45 rf_to_voice: Arc<Mutex<Option<RfToVoice>>>,
46 }
47
48 impl ReflectorClientTnc {
49 /// Create a new Reflector Client TNC.
50 ///
51 /// You must provide a configuration object and a handler for status events, such as when the TNC
52 /// connects and disconnects. The status events are purely information and if you're not interested
53 /// in them, provide a `NullStatusHandler`.
54 pub fn new<S: StatusHandler + Send + 'static>(
55 config: ReflectorClientConfig,
56 status: S,
57 ) -> Self {
58 let (tx, rx) = mpsc::channel();
59 Self {
60 config,
61 status_handler: Arc::new(Mutex::new(status)),
62 kiss_out_tx: tx,
63 kiss_out: OutBuffer::new(rx),
64 event_tx: Arc::new(Mutex::new(None)),
65 is_closed: Arc::new(AtomicBool::new(false)),
66 kiss_buffer: Arc::new(Mutex::new(KissBuffer::new())),
67 rf_to_voice: Arc::new(Mutex::new(None)),
68 }
69 }
70 }
71
72 impl Read for ReflectorClientTnc {
73 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
74 self.kiss_out.read(buf)
75 }
76 }
77
78 impl Write for ReflectorClientTnc {
79 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
80 let mut kiss = self.kiss_buffer.lock().unwrap();
81 let rem = kiss.buf_remaining();
82 let sz = buf.len().min(rem.len());
83 rem[0..sz].copy_from_slice(&buf[0..sz]);
84 kiss.did_write(sz);
85 if let Some(frame) = kiss.next_frame() {
86 if Ok(KissCommand::DataFrame) == frame.command() && frame.port() == Ok(PORT_STREAM) {
87 let mut payload = [0u8; 30];
88 if let Ok(len) = frame.decode_payload(&mut payload) {
89 if len == 30 {
90 let lsf = LsfFrame(payload);
91 let mut to_voice = self.rf_to_voice.lock().unwrap();
92 match &mut *to_voice {
93 Some(to_voice) => to_voice.process_lsf(lsf),
94 None => *to_voice = Some(RfToVoice::new(lsf)),
95 }
96 } else if len == 26 {
97 let frame_num_part = u16::from_be_bytes([payload[6], payload[7]]);
98 let frame = StreamFrame {
99 lich_idx: payload[5] >> 5,
100 lich_part: payload[0..5].try_into().unwrap(),
101 frame_number: frame_num_part & 0x7fff,
102 end_of_stream: frame_num_part & 0x8000 > 0,
103 stream_data: payload[8..24].try_into().unwrap(),
104 };
105 let to_voice = self.rf_to_voice.lock().unwrap();
106 if let Some(to_voice) = &*to_voice {
107 let voice = to_voice.process_stream(&frame);
108 if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
109 let _ = tx.send(TncEvent::TransmitVoice(voice));
110 }
111 }
112 }
113 };
114 }
115 }
116 Ok(sz)
117 }
118
119 fn flush(&mut self) -> std::io::Result<()> {
120 Ok(())
121 }
122 }
123
124 impl Tnc for ReflectorClientTnc {
125 fn try_clone(&mut self) -> Result<Self, crate::tnc::TncError> {
126 Ok(self.clone())
127 }
128
129 fn start(&mut self) {
130 spawn_runner(
131 self.config.clone(),
132 self.status_handler.clone(),
133 self.event_tx.clone(),
134 self.is_closed.clone(),
135 self.kiss_out_tx.clone(),
136 );
137 }
138
139 fn close(&mut self) {
140 if let Some(tx) = self.event_tx.lock().unwrap().as_ref() {
141 self.is_closed.store(true, Ordering::Release);
142 let _ = tx.send(TncEvent::Close);
143 }
144 }
145 }
146
147 #[allow(clippy::large_enum_variant)]
148 enum TncEvent {
149 Close,
150 Received(ServerMessage),
151 TransmitVoice(Voice),
152 }
153
154 fn spawn_runner(
155 config: ReflectorClientConfig,
156 status: WrappedStatusHandler,
157 event_tx: Arc<Mutex<Option<Sender<TncEvent>>>>,
158 is_closed: Arc<AtomicBool>,
159 kiss_out_tx: Sender<Arc<[u8]>>,
160 ) {
161 std::thread::spawn(move || {
162 status
163 .lock()
164 .unwrap()
165 .status_changed(TncStatus::Disconnected);
166 while !is_closed.load(Ordering::Acquire) {
167 status.lock().unwrap().status_changed(TncStatus::Connecting);
168 let sa = if let Ok(mut sa_iter) =
169 (config.hostname.as_str(), config.port).to_socket_addrs()
170 {
171 if let Some(sa) = sa_iter.next() {
172 sa
173 } else {
174 status
175 .lock()
176 .unwrap()
177 .status_changed(TncStatus::Disconnected);
178 thread::sleep(Duration::from_secs(10));
179 continue;
180 }
181 } else {
182 status
183 .lock()
184 .unwrap()
185 .status_changed(TncStatus::Disconnected);
186 thread::sleep(Duration::from_secs(10));
187 continue;
188 };
189 let (tx, rx) = mpsc::channel();
190 *event_tx.lock().unwrap() = Some(tx.clone());
191 if !is_closed.load(Ordering::Acquire) {
192 run_single_conn(
193 sa,
194 tx,
195 rx,
196 kiss_out_tx.clone(),
197 config.clone(),
198 status.clone(),
199 );
200 // Cool off a bit if connect rejected, etc.
201 thread::sleep(Duration::from_secs(10));
202 }
203 }
204 status.lock().unwrap().status_changed(TncStatus::Closed);
205 });
206 }
207
208 fn run_single_conn(
209 dest: SocketAddr,
210 event_tx: Sender<TncEvent>,
211 event_rx: Receiver<TncEvent>,
212 kiss_out_tx: Sender<Arc<[u8]>>,
213 config: ReflectorClientConfig,
214 status: WrappedStatusHandler,
215 ) {
216 let socket = if dest.is_ipv4() {
217 UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).unwrap()
218 } else {
219 UdpSocket::bind((Ipv6Addr::UNSPECIFIED, 0)).unwrap()
220 };
221
222 let mut connect = Connect::new();
223 connect.set_address(config.local_callsign.address());
224 connect.set_module(config.module);
225 let _ = socket.send_to(connect.as_bytes(), dest);
226 let mut converter = VoiceToRf::new();
227 let single_conn_ended = Arc::new(AtomicBool::new(false));
228 // TODO: unwrap
229 spawn_reader(
230 socket.try_clone().unwrap(),
231 event_tx,
232 single_conn_ended.clone(),
233 );
234
235 while let Ok(ev) = event_rx.recv_timeout(Duration::from_secs(30)) {
236 match ev {
237 TncEvent::Close => {
238 break;
239 }
240 TncEvent::Received(server_msg) => match server_msg {
241 ServerMessage::ConnectAcknowledge(_) => {
242 status.lock().unwrap().status_changed(TncStatus::Connected);
243 }
244 ServerMessage::ConnectNack(_) => {
245 status
246 .lock()
247 .unwrap()
248 .status_changed(TncStatus::ConnectRejected);
249 break;
250 }
251 ServerMessage::ForceDisconnect(_) => {
252 status
253 .lock()
254 .unwrap()
255 .status_changed(TncStatus::ForceDisconnect);
256 break;
257 }
258 ServerMessage::Voice(voice) => {
259 let (lsf, stream) = converter.next(&voice);
260 if let Some(lsf) = lsf {
261 let kiss = KissFrame::new_stream_setup(&lsf.0).unwrap();
262 let _ = kiss_out_tx.send(kiss.as_bytes().into());
263 }
264 let kiss = KissFrame::new_stream_data(&stream).unwrap();
265 let _ = kiss_out_tx.send(kiss.as_bytes().into());
266 }
267 ServerMessage::Ping(_ping) => {
268 let mut pong = Pong::new();
269 pong.set_address(config.local_callsign.address());
270 if socket.send_to(pong.as_bytes(), dest).is_err() {
271 break;
272 }
273 }
274 _ => {}
275 },
276 TncEvent::TransmitVoice(voice) => {
277 if socket.send_to(voice.as_bytes(), dest).is_err() {
278 break;
279 };
280 }
281 }
282 }
283 single_conn_ended.store(true, Ordering::Release);
284 status
285 .lock()
286 .unwrap()
287 .status_changed(TncStatus::Disconnected);
288 }
289
290 fn spawn_reader(socket: UdpSocket, event_tx: Sender<TncEvent>, cancel: Arc<AtomicBool>) {
291 std::thread::spawn(move || {
292 let mut buf = [0u8; 2048];
293 while let Ok((n, _sa)) = socket.recv_from(&mut buf) {
294 if cancel.load(Ordering::Acquire) {
295 break;
296 }
297 if let Some(msg) = ServerMessage::parse(&buf[..n]) {
298 if event_tx.send(TncEvent::Received(msg)).is_err() {
299 break;
300 }
301 }
302 }
303 });
304 }
305
306 /// Callbacks to get runtime information about how the reflector client TNC is operating
307 pub trait StatusHandler {
308 fn status_changed(&mut self, status: TncStatus);
309 }
310
311 #[derive(Debug, PartialEq, Eq, Clone)]
312 pub enum TncStatus {
313 Disconnected,
314 Connecting,
315 Connected,
316 ConnectRejected,
317 ForceDisconnect,
318 Closed,
319 }
320
321 pub struct NullStatusHandler;
322 impl StatusHandler for NullStatusHandler {
323 fn status_changed(&mut self, _status: TncStatus) {}
324 }