]>
code.octet-stream.net Git - m17rt/blob - m17app/src/reflector.rs
2 io
::{self, Read
, Write
},
3 net
::{Ipv4Addr
, Ipv6Addr
, SocketAddr
, ToSocketAddrs
, UdpSocket
},
6 atomic
::{AtomicBool
, Ordering
},
7 mpsc
::{self, Receiver
, Sender
},
13 use crate::{link_setup
::M17Address
, tnc
::Tnc
, util
::out_buffer
::OutBuffer
};
15 kiss
::{KissBuffer
, KissCommand
, KissFrame
, PORT_PACKET_BASIC
, PORT_PACKET_FULL
, PORT_STREAM
},
16 protocol
::{LsfFrame
, StreamFrame
},
18 convert
::{RfToVoice
, VoiceToRf
},
19 packet
::{Connect
, Packet
, Pong
, ServerMessage
, Voice
},
23 #[derive(Debug, PartialEq, Eq, Clone)]
24 pub struct ReflectorClientConfig
{
28 pub local_callsign
: M17Address
,
31 type WrappedStatusHandler
= Arc
<Mutex
<dyn StatusHandler
+ Send
+ '
static>>;
33 /// Network-based TNC that attempts to maintain a UDP connection to a reflector.
35 /// Streams will be sent and received over IP rather than RF.
37 pub struct ReflectorClientTnc
{
38 config
: ReflectorClientConfig
,
39 status_handler
: WrappedStatusHandler
,
40 kiss_out_tx
: Sender
<Arc
<[u8]>>,
42 event_tx
: Arc
<Mutex
<Option
<Sender
<TncEvent
>>>>,
43 is_closed
: Arc
<AtomicBool
>,
44 kiss_buffer
: Arc
<Mutex
<KissBuffer
>>,
45 rf_to_voice
: Arc
<Mutex
<Option
<RfToVoice
>>>,
48 impl ReflectorClientTnc
{
49 /// Create a new Reflector Client TNC.
51 /// You must provide a configuration object and a handler for status events, such as when the TNC
52 /// connects and disconnects. The status events are purely information and if you're not interested
53 /// in them, provide a `NullStatusHandler`.
54 pub fn new
<S
: StatusHandler
+ Send
+ '
static>(
55 config
: ReflectorClientConfig
,
58 let (tx
, rx
) = mpsc
::channel();
61 status_handler
: Arc
::new(Mutex
::new(status
)),
63 kiss_out
: OutBuffer
::new(rx
),
64 event_tx
: Arc
::new(Mutex
::new(None
)),
65 is_closed
: Arc
::new(AtomicBool
::new(false)),
66 kiss_buffer
: Arc
::new(Mutex
::new(KissBuffer
::new())),
67 rf_to_voice
: Arc
::new(Mutex
::new(None
)),
72 impl Read
for ReflectorClientTnc
{
73 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
74 self.kiss_out
.read(buf
)
78 impl Write
for ReflectorClientTnc
{
79 fn write(&mut self, buf
: &[u8]) -> std
::io
::Result
<usize> {
80 let mut kiss
= self.kiss_buffer
.lock().unwrap
();
81 let rem
= kiss
.buf_remaining();
82 let sz
= buf
.len().min(rem
.len());
83 rem
[0..sz
].copy_from_slice(&buf
[0..sz
]);
85 while let Some(frame
) = kiss
.next_frame() {
86 if Ok(KissCommand
::DataFrame
) == frame
.command() && frame
.port() == Ok(PORT_STREAM
) {
87 let mut payload
= [0u8; 30];
88 if let Ok(len
) = frame
.decode_payload(&mut payload
) {
90 let lsf
= LsfFrame(payload
);
91 let mut to_voice
= self.rf_to_voice
.lock().unwrap
();
92 match &mut *to_voice
{
93 Some(to_voice
) => to_voice
.process_lsf(lsf
),
94 None
=> *to_voice
= Some(RfToVoice
::new(lsf
)),
97 let frame_num_part
= u16::from_be_bytes([payload
[6], payload
[7]]);
98 let frame
= StreamFrame
{
99 lich_idx
: payload
[5] >> 5,
100 lich_part
: payload
[0..5].try_into().unwrap
(),
101 frame_number
: frame_num_part
& 0x7fff,
102 end_of_stream
: frame_num_part
& 0x8000 > 0,
103 stream_data
: payload
[8..24].try_into().unwrap
(),
105 let to_voice
= self.rf_to_voice
.lock().unwrap
();
106 if let Some(to_voice
) = &*to_voice
{
107 let voice
= to_voice
.process_stream(&frame
);
108 if let Some(tx
) = self.event_tx
.lock().unwrap
().as_ref() {
109 let _
= tx
.send(TncEvent
::TransmitVoice(voice
));
114 } else if Ok(KissCommand
::DataFrame
) == frame
.command()
115 && frame
.port() == Ok(PORT_PACKET_BASIC
)
117 // basic packets not supported for now, they will require more config
118 } else if Ok(KissCommand
::DataFrame
) == frame
.command()
119 && frame
.port() == Ok(PORT_PACKET_FULL
)
121 let mut payload
= [0u8; 855];
122 let Ok(len
) = frame
.decode_payload(&mut payload
) else {
128 let mut lsf
= LsfFrame([0u8; 30]);
129 lsf
.0.copy_from_slice(&payload
[0..30]);
130 if lsf
.check_crc() != 0 {
133 let mut packet
= Packet
::new();
134 packet
.set_link_setup_frame(&lsf
);
135 packet
.set_payload(&payload
[30..]);
141 fn flush(&mut self) -> std
::io
::Result
<()> {
146 impl Tnc
for ReflectorClientTnc
{
147 fn try_clone(&mut self) -> Result
<Self, crate::tnc
::TncError
> {
151 fn start(&mut self) {
154 self.status_handler
.clone(),
155 self.event_tx
.clone(),
156 self.is
_closed
.clone(),
157 self.kiss_out_tx
.clone(),
161 fn close(&mut self) {
162 if let Some(tx
) = self.event_tx
.lock().unwrap
().as_ref() {
163 self.is
_closed
.store(true, Ordering
::Release
);
164 let _
= tx
.send(TncEvent
::Close
);
169 #[allow(clippy::large_enum_variant)]
172 Received(ServerMessage
),
173 TransmitVoice(Voice
),
177 config
: ReflectorClientConfig
,
178 status
: WrappedStatusHandler
,
179 event_tx
: Arc
<Mutex
<Option
<Sender
<TncEvent
>>>>,
180 is_closed
: Arc
<AtomicBool
>,
181 kiss_out_tx
: Sender
<Arc
<[u8]>>,
183 std
::thread
::spawn(move || {
187 .status_changed(TncStatus
::Disconnected
);
188 while !is_closed
.load(Ordering
::Acquire
) {
189 status
.lock().unwrap
().status_changed(TncStatus
::Connecting
);
190 let sa
= if let Ok(mut sa_iter
) =
191 (config
.hostname
.as_str(), config
.port
).to_socket_addrs()
193 if let Some(sa
) = sa_iter
.next() {
199 .status_changed(TncStatus
::Disconnected
);
200 thread
::sleep(Duration
::from_secs(10));
207 .status_changed(TncStatus
::Disconnected
);
208 thread
::sleep(Duration
::from_secs(10));
211 let (tx
, rx
) = mpsc
::channel();
212 *event_tx
.lock().unwrap
() = Some(tx
.clone());
213 if !is_closed
.load(Ordering
::Acquire
) {
222 // Cool off a bit if connect rejected, etc.
223 thread
::sleep(Duration
::from_secs(10));
226 status
.lock().unwrap
().status_changed(TncStatus
::Closed
);
232 event_tx
: Sender
<TncEvent
>,
233 event_rx
: Receiver
<TncEvent
>,
234 kiss_out_tx
: Sender
<Arc
<[u8]>>,
235 config
: ReflectorClientConfig
,
236 status
: WrappedStatusHandler
,
238 let socket
= if dest
.is
_ipv
4() {
239 UdpSocket
::bind((Ipv4Addr
::UNSPECIFIED
, 0)).unwrap
()
241 UdpSocket
::bind((Ipv6Addr
::UNSPECIFIED
, 0)).unwrap
()
244 let mut connect
= Connect
::new();
245 connect
.set_address(config
.local_callsign
.address());
246 connect
.set_module(config
.module
);
247 let _
= socket
.send_to(connect
.as_bytes(), dest
);
248 let mut converter
= VoiceToRf
::new();
249 let single_conn_ended
= Arc
::new(AtomicBool
::new(false));
252 socket
.try_clone().unwrap
(),
254 single_conn_ended
.clone(),
257 while let Ok(ev
) = event_rx
.recv_timeout(Duration
::from_secs(30)) {
262 TncEvent
::Received(server_msg
) => match server_msg
{
263 ServerMessage
::ConnectAcknowledge(_
) => {
264 status
.lock().unwrap
().status_changed(TncStatus
::Connected
);
266 ServerMessage
::ConnectNack(_
) => {
270 .status_changed(TncStatus
::ConnectRejected
);
273 ServerMessage
::ForceDisconnect(_
) => {
277 .status_changed(TncStatus
::ForceDisconnect
);
280 ServerMessage
::Voice(voice
) => {
281 let (lsf
, stream
) = converter
.next(&voice
);
282 if let Some(lsf
) = lsf
{
283 let kiss
= KissFrame
::new_stream_setup(&lsf
.0).unwrap
();
284 let _
= kiss_out_tx
.send(kiss
.as_bytes().into
());
286 let kiss
= KissFrame
::new_stream_data(&stream
).unwrap
();
287 let _
= kiss_out_tx
.send(kiss
.as_bytes().into
());
289 ServerMessage
::Packet(packet
) => {
291 KissFrame
::new_full_packet(&packet
.link_setup_frame().0, packet
.payload())
293 let _
= kiss_out_tx
.send(kiss
.as_bytes().into
());
296 ServerMessage
::Ping(_ping
) => {
297 let mut pong
= Pong
::new();
298 pong
.set_address(config
.local_callsign
.address());
299 if socket
.send_to(pong
.as_bytes(), dest
).is
_err
() {
305 TncEvent
::TransmitVoice(voice
) => {
306 if socket
.send_to(voice
.as_bytes(), dest
).is
_err
() {
312 single_conn_ended
.store(true, Ordering
::Release
);
316 .status_changed(TncStatus
::Disconnected
);
319 fn spawn_reader(socket
: UdpSocket
, event_tx
: Sender
<TncEvent
>, cancel
: Arc
<AtomicBool
>) {
320 std
::thread
::spawn(move || {
321 let mut buf
= [0u8; 2048];
322 while let Ok((n
, _sa
)) = socket
.recv_from(&mut buf
) {
323 if cancel
.load(Ordering
::Acquire
) {
326 if let Some(msg
) = ServerMessage
::parse(&buf
[..n
]) {
327 if event_tx
.send(TncEvent
::Received(msg
)).is
_err
() {
335 /// Callbacks to get runtime information about how the reflector client TNC is operating
336 pub trait StatusHandler
{
337 fn status_changed(&mut self, status
: TncStatus
);
340 #[derive(Debug, PartialEq, Eq, Clone)]
350 pub struct NullStatusHandler
;
351 impl StatusHandler
for NullStatusHandler
{
352 fn status_changed(&mut self, _status
: TncStatus
) {}