]>
code.octet-stream.net Git - m17rt/blob - m17app/src/reflector.rs
2 io
::{self, Read
, Write
},
3 net
::{Ipv4Addr
, Ipv6Addr
, SocketAddr
, ToSocketAddrs
, UdpSocket
},
6 atomic
::{AtomicBool
, Ordering
},
7 mpsc
::{self, Receiver
, Sender
},
13 use crate::{link_setup
::M17Address
, tnc
::Tnc
, util
::out_buffer
::OutBuffer
};
15 kiss
::{KissBuffer
, KissCommand
, KissFrame
, PORT_STREAM
},
16 protocol
::{LsfFrame
, StreamFrame
},
18 convert
::{RfToVoice
, VoiceToRf
},
19 packet
::{Connect
, Pong
, ServerMessage
, Voice
},
23 #[derive(Debug, PartialEq, Eq, Clone)]
24 pub struct ReflectorClientConfig
{
28 pub local_callsign
: M17Address
,
31 type WrappedStatusHandler
= Arc
<Mutex
<dyn StatusHandler
+ Send
+ '
static>>;
33 /// Network-based TNC that attempts to maintain a UDP connection to a reflector.
35 /// Streams will be sent and received over IP rather than RF.
37 pub struct ReflectorClientTnc
{
38 config
: ReflectorClientConfig
,
39 status_handler
: WrappedStatusHandler
,
40 kiss_out_tx
: Sender
<Arc
<[u8]>>,
42 event_tx
: Arc
<Mutex
<Option
<Sender
<TncEvent
>>>>,
43 is_closed
: Arc
<AtomicBool
>,
44 kiss_buffer
: Arc
<Mutex
<KissBuffer
>>,
45 rf_to_voice
: Arc
<Mutex
<Option
<RfToVoice
>>>,
48 impl ReflectorClientTnc
{
49 /// Create a new Reflector Client TNC.
51 /// You must provide a configuration object and a handler for status events, such as when the TNC
52 /// connects and disconnects. The status events are purely information and if you're not interested
53 /// in them, provide a `NullStatusHandler`.
54 pub fn new
<S
: StatusHandler
+ Send
+ '
static>(
55 config
: ReflectorClientConfig
,
58 let (tx
, rx
) = mpsc
::channel();
61 status_handler
: Arc
::new(Mutex
::new(status
)),
63 kiss_out
: OutBuffer
::new(rx
),
64 event_tx
: Arc
::new(Mutex
::new(None
)),
65 is_closed
: Arc
::new(AtomicBool
::new(false)),
66 kiss_buffer
: Arc
::new(Mutex
::new(KissBuffer
::new())),
67 rf_to_voice
: Arc
::new(Mutex
::new(None
)),
72 impl Read
for ReflectorClientTnc
{
73 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
74 self.kiss_out
.read(buf
)
78 impl Write
for ReflectorClientTnc
{
79 fn write(&mut self, buf
: &[u8]) -> std
::io
::Result
<usize> {
80 let mut kiss
= self.kiss_buffer
.lock().unwrap
();
81 let rem
= kiss
.buf_remaining();
82 let sz
= buf
.len().min(rem
.len());
83 rem
[0..sz
].copy_from_slice(&buf
[0..sz
]);
85 if let Some(frame
) = kiss
.next_frame() {
86 if Ok(KissCommand
::DataFrame
) == frame
.command() && frame
.port() == Ok(PORT_STREAM
) {
87 let mut payload
= [0u8; 30];
88 if let Ok(len
) = frame
.decode_payload(&mut payload
) {
90 let lsf
= LsfFrame(payload
);
91 let mut to_voice
= self.rf_to_voice
.lock().unwrap
();
92 match &mut *to_voice
{
93 Some(to_voice
) => to_voice
.process_lsf(lsf
),
94 None
=> *to_voice
= Some(RfToVoice
::new(lsf
)),
97 let frame_num_part
= u16::from_be_bytes([payload
[6], payload
[7]]);
98 let frame
= StreamFrame
{
99 lich_idx
: payload
[5] >> 5,
100 lich_part
: payload
[0..5].try_into().unwrap
(),
101 frame_number
: frame_num_part
& 0x7fff,
102 end_of_stream
: frame_num_part
& 0x8000 > 0,
103 stream_data
: payload
[8..24].try_into().unwrap
(),
105 let to_voice
= self.rf_to_voice
.lock().unwrap
();
106 if let Some(to_voice
) = &*to_voice
{
107 let voice
= to_voice
.process_stream(&frame
);
108 if let Some(tx
) = self.event_tx
.lock().unwrap
().as_ref() {
109 let _
= tx
.send(TncEvent
::TransmitVoice(voice
));
119 fn flush(&mut self) -> std
::io
::Result
<()> {
124 impl Tnc
for ReflectorClientTnc
{
125 fn try_clone(&mut self) -> Result
<Self, crate::tnc
::TncError
> {
129 fn start(&mut self) {
132 self.status_handler
.clone(),
133 self.event_tx
.clone(),
134 self.is
_closed
.clone(),
135 self.kiss_out_tx
.clone(),
139 fn close(&mut self) {
140 if let Some(tx
) = self.event_tx
.lock().unwrap
().as_ref() {
141 self.is
_closed
.store(true, Ordering
::Release
);
142 let _
= tx
.send(TncEvent
::Close
);
147 #[allow(clippy::large_enum_variant)]
150 Received(ServerMessage
),
151 TransmitVoice(Voice
),
155 config
: ReflectorClientConfig
,
156 status
: WrappedStatusHandler
,
157 event_tx
: Arc
<Mutex
<Option
<Sender
<TncEvent
>>>>,
158 is_closed
: Arc
<AtomicBool
>,
159 kiss_out_tx
: Sender
<Arc
<[u8]>>,
161 std
::thread
::spawn(move || {
165 .status_changed(TncStatus
::Disconnected
);
166 while !is_closed
.load(Ordering
::Acquire
) {
167 status
.lock().unwrap
().status_changed(TncStatus
::Connecting
);
168 let sa
= if let Ok(mut sa_iter
) =
169 (config
.hostname
.as_str(), config
.port
).to_socket_addrs()
171 if let Some(sa
) = sa_iter
.next() {
177 .status_changed(TncStatus
::Disconnected
);
178 thread
::sleep(Duration
::from_secs(10));
185 .status_changed(TncStatus
::Disconnected
);
186 thread
::sleep(Duration
::from_secs(10));
189 let (tx
, rx
) = mpsc
::channel();
190 *event_tx
.lock().unwrap
() = Some(tx
.clone());
191 if !is_closed
.load(Ordering
::Acquire
) {
200 // Cool off a bit if connect rejected, etc.
201 thread
::sleep(Duration
::from_secs(10));
204 status
.lock().unwrap
().status_changed(TncStatus
::Closed
);
210 event_tx
: Sender
<TncEvent
>,
211 event_rx
: Receiver
<TncEvent
>,
212 kiss_out_tx
: Sender
<Arc
<[u8]>>,
213 config
: ReflectorClientConfig
,
214 status
: WrappedStatusHandler
,
216 let socket
= if dest
.is
_ipv
4() {
217 UdpSocket
::bind((Ipv4Addr
::UNSPECIFIED
, 0)).unwrap
()
219 UdpSocket
::bind((Ipv6Addr
::UNSPECIFIED
, 0)).unwrap
()
222 let mut connect
= Connect
::new();
223 connect
.set_address(config
.local_callsign
.address());
224 connect
.set_module(config
.module
);
225 let _
= socket
.send_to(connect
.as_bytes(), dest
);
226 let mut converter
= VoiceToRf
::new();
227 let single_conn_ended
= Arc
::new(AtomicBool
::new(false));
230 socket
.try_clone().unwrap
(),
232 single_conn_ended
.clone(),
235 while let Ok(ev
) = event_rx
.recv_timeout(Duration
::from_secs(30)) {
240 TncEvent
::Received(server_msg
) => match server_msg
{
241 ServerMessage
::ConnectAcknowledge(_
) => {
242 status
.lock().unwrap
().status_changed(TncStatus
::Connected
);
244 ServerMessage
::ConnectNack(_
) => {
248 .status_changed(TncStatus
::ConnectRejected
);
251 ServerMessage
::ForceDisconnect(_
) => {
255 .status_changed(TncStatus
::ForceDisconnect
);
258 ServerMessage
::Voice(voice
) => {
259 let (lsf
, stream
) = converter
.next(&voice
);
260 if let Some(lsf
) = lsf
{
261 let kiss
= KissFrame
::new_stream_setup(&lsf
.0).unwrap
();
262 let _
= kiss_out_tx
.send(kiss
.as_bytes().into
());
264 let kiss
= KissFrame
::new_stream_data(&stream
).unwrap
();
265 let _
= kiss_out_tx
.send(kiss
.as_bytes().into
());
267 ServerMessage
::Ping(_ping
) => {
268 let mut pong
= Pong
::new();
269 pong
.set_address(config
.local_callsign
.address());
270 if socket
.send_to(pong
.as_bytes(), dest
).is
_err
() {
276 TncEvent
::TransmitVoice(voice
) => {
277 if socket
.send_to(voice
.as_bytes(), dest
).is
_err
() {
283 single_conn_ended
.store(true, Ordering
::Release
);
287 .status_changed(TncStatus
::Disconnected
);
290 fn spawn_reader(socket
: UdpSocket
, event_tx
: Sender
<TncEvent
>, cancel
: Arc
<AtomicBool
>) {
291 std
::thread
::spawn(move || {
292 let mut buf
= [0u8; 2048];
293 while let Ok((n
, _sa
)) = socket
.recv_from(&mut buf
) {
294 if cancel
.load(Ordering
::Acquire
) {
297 if let Some(msg
) = ServerMessage
::parse(&buf
[..n
]) {
298 if event_tx
.send(TncEvent
::Received(msg
)).is
_err
() {
306 /// Callbacks to get runtime information about how the reflector client TNC is operating
307 pub trait StatusHandler
{
308 fn status_changed(&mut self, status
: TncStatus
);
311 #[derive(Debug, PartialEq, Eq, Clone)]
321 pub struct NullStatusHandler
;
322 impl StatusHandler
for NullStatusHandler
{
323 fn status_changed(&mut self, _status
: TncStatus
) {}