]>
code.octet-stream.net Git - m17rt/blob - m17app/src/reflector.rs
9735298131bd510132fb21b41bd7311d2e8f0a22
2 io
::{self, Read
, Write
},
3 net
::{Ipv4Addr
, Ipv6Addr
, SocketAddr
, ToSocketAddrs
, UdpSocket
},
5 atomic
::{AtomicBool
, Ordering
},
6 mpsc
::{self, Receiver
, Sender
},
13 use crate::{link_setup
::M17Address
, tnc
::Tnc
, util
::out_buffer
::OutBuffer
};
18 packet
::{Connect
, Pong
, ServerMessage
},
22 #[derive(Debug, PartialEq, Eq, Clone)]
23 pub struct ReflectorClientConfig
{
27 local_callsign
: M17Address
,
30 type WrappedStatusHandler
= Arc
<Mutex
<dyn StatusHandler
+ Send
+ '
static>>;
32 /// Network-based TNC that attempts to maintain a UDP connection to a reflector.
34 /// Streams will be sent and received over IP rather than RF.
36 pub struct ReflectorClientTnc
{
37 config
: ReflectorClientConfig
,
38 status_handler
: WrappedStatusHandler
,
39 kiss_out_tx
: Sender
<Arc
<[u8]>>,
41 event_tx
: Arc
<Mutex
<Option
<Sender
<TncEvent
>>>>,
42 is_closed
: Arc
<AtomicBool
>,
45 impl ReflectorClientTnc
{
46 /// Create a new Reflector Client TNC.
48 /// You must provide a configuration object and a handler for status events, such as when the TNC
49 /// connects and disconnects. The status events are purely information and if you're not interested
50 /// in them, provide a `NullStatusHandler`.
51 pub fn new
<S
: StatusHandler
+ Send
+ '
static>(
52 config
: ReflectorClientConfig
,
55 let (tx
, rx
) = mpsc
::channel();
58 status_handler
: Arc
::new(Mutex
::new(status
)),
60 kiss_out
: OutBuffer
::new(rx
),
61 event_tx
: Arc
::new(Mutex
::new(None
)),
62 is_closed
: Arc
::new(AtomicBool
::new(false)),
67 impl Read
for ReflectorClientTnc
{
68 fn read(&mut self, buf
: &mut [u8]) -> io
::Result
<usize> {
69 self.kiss_out
.read(buf
)
73 impl Write
for ReflectorClientTnc
{
74 fn write(&mut self, buf
: &[u8]) -> std
::io
::Result
<usize> {
78 fn flush(&mut self) -> std
::io
::Result
<()> {
83 impl Tnc
for ReflectorClientTnc
{
84 fn try_clone(&mut self) -> Result
<Self, crate::tnc
::TncError
> {
91 self.status_handler
.clone(),
92 self.event_tx
.clone(),
93 self.is
_closed
.clone(),
94 self.kiss_out_tx
.clone(),
99 if let Some(tx
) = self.event_tx
.lock().unwrap
().as_ref() {
100 self.is
_closed
.store(true, Ordering
::Release
);
101 let _
= tx
.send(TncEvent
::Close
);
106 #[allow(clippy::large_enum_variant)]
109 Received(ServerMessage
),
113 config
: ReflectorClientConfig
,
114 status
: WrappedStatusHandler
,
115 event_tx
: Arc
<Mutex
<Option
<Sender
<TncEvent
>>>>,
116 is_closed
: Arc
<AtomicBool
>,
117 kiss_out_tx
: Sender
<Arc
<[u8]>>,
119 std
::thread
::spawn(move || {
123 .status_changed(TncStatus
::Disconnected
);
124 while !is_closed
.load(Ordering
::Acquire
) {
125 status
.lock().unwrap
().status_changed(TncStatus
::Connecting
);
126 let sa
= if let Ok(mut sa_iter
) =
127 (config
.hostname
.as_str(), config
.port
).to_socket_addrs()
129 if let Some(sa
) = sa_iter
.next() {
135 .status_changed(TncStatus
::Disconnected
);
136 thread
::sleep(Duration
::from_secs(10));
143 .status_changed(TncStatus
::Disconnected
);
144 thread
::sleep(Duration
::from_secs(10));
147 let (tx
, rx
) = mpsc
::channel();
148 *event_tx
.lock().unwrap
() = Some(tx
.clone());
149 if !is_closed
.load(Ordering
::Acquire
) {
159 println
!("single conn ended");
161 status
.lock().unwrap
().status_changed(TncStatus
::Closed
);
167 event_tx
: Sender
<TncEvent
>,
168 event_rx
: Receiver
<TncEvent
>,
169 kiss_out_tx
: Sender
<Arc
<[u8]>>,
170 config
: ReflectorClientConfig
,
171 status
: WrappedStatusHandler
,
173 let socket
= if dest
.is
_ipv
4() {
174 UdpSocket
::bind((Ipv4Addr
::UNSPECIFIED
, 0)).unwrap
()
176 UdpSocket
::bind((Ipv6Addr
::UNSPECIFIED
, 0)).unwrap
()
179 let mut connect
= Connect
::new();
180 connect
.set_address(config
.local_callsign
.address().to_owned());
181 connect
.set_module(config
.module
);
182 socket
.send_to(connect
.as_bytes(), dest
).unwrap
();
183 let mut converter
= VoiceToRf
::new();
184 let single_conn_ended
= Arc
::new(AtomicBool
::new(false));
187 socket
.try_clone().unwrap
(),
189 single_conn_ended
.clone(),
192 while let Ok(ev
) = event_rx
.recv_timeout(Duration
::from_secs(30)) {
195 println
!("writer: close");
198 TncEvent
::Received(server_msg
) => match server_msg
{
199 ServerMessage
::ConnectAcknowledge(_
) => {
200 status
.lock().unwrap
().status_changed(TncStatus
::Connected
);
202 ServerMessage
::ConnectNack(_
) => {
206 .status_changed(TncStatus
::ConnectRejected
);
209 ServerMessage
::ForceDisconnect(_
) => {
213 .status_changed(TncStatus
::ForceDisconnect
);
216 ServerMessage
::Voice(voice
) => {
217 let (lsf
, stream
) = converter
.next(&voice
);
218 if let Some(lsf
) = lsf
{
219 let kiss
= KissFrame
::new_stream_setup(&lsf
.0).unwrap
();
220 let _
= kiss_out_tx
.send(kiss
.as_bytes().into
());
222 let kiss
= KissFrame
::new_stream_data(&stream
).unwrap
();
223 let _
= kiss_out_tx
.send(kiss
.as_bytes().into
());
225 ServerMessage
::Ping(_ping
) => {
226 let mut pong
= Pong
::new();
228 M17Address
::from_callsign("VK7XT")
233 socket
.send_to(pong
.as_bytes(), dest
).unwrap
();
239 single_conn_ended
.store(true, Ordering
::Release
);
243 .status_changed(TncStatus
::Disconnected
);
244 println
!("write thread terminating");
247 fn spawn_reader(socket
: UdpSocket
, event_tx
: Sender
<TncEvent
>, cancel
: Arc
<AtomicBool
>) {
248 std
::thread
::spawn(move || {
249 let mut buf
= [0u8; 2048];
250 while let Ok((n
, _sa
)) = socket
.recv_from(&mut buf
) {
251 if cancel
.load(Ordering
::Acquire
) {
254 if let Some(msg
) = ServerMessage
::parse(&buf
[..n
]) {
255 if event_tx
.send(TncEvent
::Received(msg
)).is
_err
() {
260 println
!("read thread terminating");
264 /// Callbacks to get runtime information about how the reflector client TNC is operating
265 pub trait StatusHandler
{
266 fn status_changed(&mut self, status
: TncStatus
);
269 #[derive(Debug, PartialEq, Eq, Clone)]
279 pub struct NullStatusHandler
;
280 impl StatusHandler
for NullStatusHandler
{
281 fn status_changed(&mut self, _status
: TncStatus
) {}