]>
code.octet-stream.net Git - m17rt/blob - m17app/src/reflector.rs
2 io
::{self, Read
, Write
},
3 net
::{Ipv4Addr
, Ipv6Addr
, SocketAddr
, ToSocketAddrs
, UdpSocket
},
5 atomic
::{AtomicBool
, Ordering
},
6 mpsc
::{self, Receiver
, Sender
},
13 use crate::{link_setup
::M17Address
, tnc
::Tnc
, util
::out_buffer
::OutBuffer
};
15 kiss
::{KissBuffer
, KissCommand
, KissFrame
, PORT_STREAM
},
16 protocol
::{LsfFrame
, StreamFrame
},
18 convert
::{RfToVoice
, VoiceToRf
},
19 packet
::{Connect
, Pong
, ServerMessage
, Voice
},
23 #[derive(Debug, PartialEq, Eq, Clone)]
24 pub struct ReflectorClientConfig
{
28 local_callsign
: M17Address
,
31 type WrappedStatusHandler
= Arc
<Mutex
<dyn StatusHandler
+ Send
+ '
static>>;
33 /// Network-based TNC that attempts to maintain a UDP connection to a reflector.
35 /// Streams will be sent and received over IP rather than RF.
37 pub struct ReflectorClientTnc
{
38 config
: ReflectorClientConfig
,
39 status_handler
: WrappedStatusHandler
,
40 kiss_out_tx
: Sender
<Arc
<[u8]>>,
42 event_tx
: Arc
<Mutex
<Option
<Sender
<TncEvent
>>>>,
43 is_closed
: Arc
<AtomicBool
>,
44 kiss_buffer
: Arc
<Mutex
<KissBuffer
>>,
45 rf_to_voice
: Arc
<Mutex
<Option
<RfToVoice
>>>,
48 impl ReflectorClientTnc
{
49 /// Create a new Reflector Client TNC.
51 /// You must provide a configuration object and a handler for status events, such as when the TNC
52 /// connects and disconnects. The status events are purely information and if you're not interested
53 /// in them, provide a `NullStatusHandler`.
54 pub fn new
<S
: StatusHandler
+ Send
+ '
static>(
55 config
: ReflectorClientConfig
,
58 let (tx
, rx
) = mpsc
::channel();
61 status_handler
: Arc
::new(Mutex
::new(status
)),
63 kiss_out
: OutBuffer
::new(rx
),
64 event_tx
: Arc
::new(Mutex
::new(None
)),
65 is_closed
: Arc
::new(AtomicBool
::new(false)),
66 kiss_buffer
: Arc
::new(Mutex
::new(KissBuffer
::new())),
67 rf_to_voice
: Arc
::new(Mutex
::new(None
)),
72 impl Read
for ReflectorClientTnc
{
73 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
74 self.kiss_out
.read(buf
)
78 impl Write
for ReflectorClientTnc
{
79 fn write(&mut self, buf
: &[u8]) -> std
::io
::Result
<usize> {
80 let mut kiss
= self.kiss_buffer
.lock().unwrap
();
81 let rem
= kiss
.buf_remaining();
82 let sz
= buf
.len().max(rem
.len());
83 rem
[0..sz
].copy_from_slice(&buf
[0..sz
]);
84 if let Some(frame
) = kiss
.next_frame() {
85 if Ok(KissCommand
::DataFrame
) == frame
.command() && frame
.port() == Ok(PORT_STREAM
) {
86 let mut payload
= [0u8; 30];
87 if let Ok(len
) = frame
.decode_payload(&mut payload
) {
89 let lsf
= LsfFrame(payload
);
90 let mut to_voice
= self.rf_to_voice
.lock().unwrap
();
91 match &mut *to_voice
{
92 Some(to_voice
) => to_voice
.process_lsf(lsf
),
93 None
=> *to_voice
= Some(RfToVoice
::new(lsf
)),
96 let frame_num_part
= u16::from_be_bytes([payload
[6], payload
[7]]);
97 let frame
= StreamFrame
{
98 lich_idx
: payload
[5] >> 5,
99 lich_part
: payload
[0..5].try_into().unwrap
(),
100 frame_number
: frame_num_part
& 0x7fff,
101 end_of_stream
: frame_num_part
& 0x8000 > 0,
102 stream_data
: payload
[8..24].try_into().unwrap
(),
104 let to_voice
= self.rf_to_voice
.lock().unwrap
();
105 if let Some(to_voice
) = &*to_voice
{
106 let voice
= to_voice
.process_stream(&frame
);
107 if let Some(tx
) = self.event_tx
.lock().unwrap
().as_ref() {
108 let _
= tx
.send(TncEvent
::TransmitVoice(voice
));
118 fn flush(&mut self) -> std
::io
::Result
<()> {
123 impl Tnc
for ReflectorClientTnc
{
124 fn try_clone(&mut self) -> Result
<Self, crate::tnc
::TncError
> {
128 fn start(&mut self) {
131 self.status_handler
.clone(),
132 self.event_tx
.clone(),
133 self.is
_closed
.clone(),
134 self.kiss_out_tx
.clone(),
138 fn close(&mut self) {
139 if let Some(tx
) = self.event_tx
.lock().unwrap
().as_ref() {
140 self.is
_closed
.store(true, Ordering
::Release
);
141 let _
= tx
.send(TncEvent
::Close
);
146 #[allow(clippy::large_enum_variant)]
149 Received(ServerMessage
),
150 TransmitVoice(Voice
),
154 config
: ReflectorClientConfig
,
155 status
: WrappedStatusHandler
,
156 event_tx
: Arc
<Mutex
<Option
<Sender
<TncEvent
>>>>,
157 is_closed
: Arc
<AtomicBool
>,
158 kiss_out_tx
: Sender
<Arc
<[u8]>>,
160 std
::thread
::spawn(move || {
164 .status_changed(TncStatus
::Disconnected
);
165 while !is_closed
.load(Ordering
::Acquire
) {
166 status
.lock().unwrap
().status_changed(TncStatus
::Connecting
);
167 let sa
= if let Ok(mut sa_iter
) =
168 (config
.hostname
.as_str(), config
.port
).to_socket_addrs()
170 if let Some(sa
) = sa_iter
.next() {
176 .status_changed(TncStatus
::Disconnected
);
177 thread
::sleep(Duration
::from_secs(10));
184 .status_changed(TncStatus
::Disconnected
);
185 thread
::sleep(Duration
::from_secs(10));
188 let (tx
, rx
) = mpsc
::channel();
189 *event_tx
.lock().unwrap
() = Some(tx
.clone());
190 if !is_closed
.load(Ordering
::Acquire
) {
201 status
.lock().unwrap
().status_changed(TncStatus
::Closed
);
207 event_tx
: Sender
<TncEvent
>,
208 event_rx
: Receiver
<TncEvent
>,
209 kiss_out_tx
: Sender
<Arc
<[u8]>>,
210 config
: ReflectorClientConfig
,
211 status
: WrappedStatusHandler
,
213 let socket
= if dest
.is
_ipv
4() {
214 UdpSocket
::bind((Ipv4Addr
::UNSPECIFIED
, 0)).unwrap
()
216 UdpSocket
::bind((Ipv6Addr
::UNSPECIFIED
, 0)).unwrap
()
219 let mut connect
= Connect
::new();
220 connect
.set_address(config
.local_callsign
.address().to_owned());
221 connect
.set_module(config
.module
);
222 let _
= socket
.send_to(connect
.as_bytes(), dest
);
223 let mut converter
= VoiceToRf
::new();
224 let single_conn_ended
= Arc
::new(AtomicBool
::new(false));
227 socket
.try_clone().unwrap
(),
229 single_conn_ended
.clone(),
232 while let Ok(ev
) = event_rx
.recv_timeout(Duration
::from_secs(30)) {
237 TncEvent
::Received(server_msg
) => match server_msg
{
238 ServerMessage
::ConnectAcknowledge(_
) => {
239 status
.lock().unwrap
().status_changed(TncStatus
::Connected
);
241 ServerMessage
::ConnectNack(_
) => {
245 .status_changed(TncStatus
::ConnectRejected
);
248 ServerMessage
::ForceDisconnect(_
) => {
252 .status_changed(TncStatus
::ForceDisconnect
);
255 ServerMessage
::Voice(voice
) => {
256 let (lsf
, stream
) = converter
.next(&voice
);
257 if let Some(lsf
) = lsf
{
258 let kiss
= KissFrame
::new_stream_setup(&lsf
.0).unwrap
();
259 let _
= kiss_out_tx
.send(kiss
.as_bytes().into
());
261 let kiss
= KissFrame
::new_stream_data(&stream
).unwrap
();
262 let _
= kiss_out_tx
.send(kiss
.as_bytes().into
());
264 ServerMessage
::Ping(_ping
) => {
265 let mut pong
= Pong
::new();
267 M17Address
::from_callsign("VK7XT")
272 if socket
.send_to(pong
.as_bytes(), dest
).is
_err
() {
278 TncEvent
::TransmitVoice(voice
) => {
279 if socket
.send_to(voice
.as_bytes(), dest
).is
_err
() {
285 single_conn_ended
.store(true, Ordering
::Release
);
289 .status_changed(TncStatus
::Disconnected
);
292 fn spawn_reader(socket
: UdpSocket
, event_tx
: Sender
<TncEvent
>, cancel
: Arc
<AtomicBool
>) {
293 std
::thread
::spawn(move || {
294 let mut buf
= [0u8; 2048];
295 while let Ok((n
, _sa
)) = socket
.recv_from(&mut buf
) {
296 if cancel
.load(Ordering
::Acquire
) {
299 if let Some(msg
) = ServerMessage
::parse(&buf
[..n
]) {
300 if event_tx
.send(TncEvent
::Received(msg
)).is
_err
() {
308 /// Callbacks to get runtime information about how the reflector client TNC is operating
309 pub trait StatusHandler
{
310 fn status_changed(&mut self, status
: TncStatus
);
313 #[derive(Debug, PartialEq, Eq, Clone)]
323 pub struct NullStatusHandler
;
324 impl StatusHandler
for NullStatusHandler
{
325 fn status_changed(&mut self, _status
: TncStatus
) {}