]>
code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
dc2138da0605879ef6b34878af5e477b35075143
1 use crate::adapter
::{PacketAdapter
, StreamAdapter
};
2 use crate::error
::{M17Error
, M17Errors
};
3 use crate::link_setup
::LinkSetup
;
5 use crate::{LsfFrame
, PacketType
, StreamFrame
};
6 use m17core
::kiss
::{KissBuffer
, KissCommand
, KissFrame
};
7 use m17core
::protocol
::EncryptionType
;
10 use std
::collections
::HashMap
;
12 use std
::sync
::{Arc
, RwLock
};
14 #[derive(Debug, Clone, PartialEq, Eq, Copy)]
22 adapters
: Arc
<RwLock
<Adapters
>>,
23 event_tx
: mpsc
::SyncSender
<TncControlEvent
>,
24 lifecycle
: RwLock
<Lifecycle
>,
28 pub fn new
<T
: Tnc
+ Send
+ '
static>(mut tnc
: T
) -> Self {
29 let write_tnc
= tnc
.try_clone().unwrap
();
30 let (event_tx
, event_rx
) = mpsc
::sync_channel(128);
31 let listeners
= Arc
::new(RwLock
::new(Adapters
::new()));
32 spawn_reader(tnc
, listeners
.clone());
33 spawn_writer(write_tnc
, event_rx
);
37 lifecycle
: RwLock
::new(Lifecycle
::Setup
),
41 pub fn add_packet_adapter
<P
: PacketAdapter
+ '
static>(
44 ) -> Result
<usize, M17Error
> {
45 let adapter
= Arc
::new(adapter
);
46 let mut adapters
= self.adapters
.write().unwrap
();
47 let id
= adapters
.next
;
49 adapters
.packet
.insert
(id
, adapter
.clone());
51 if self.lifecycle() == Lifecycle
::Started
{
54 .map_err(|e
| M17Error
::Adapter(id
, e
))?
;
59 pub fn add_stream_adapter
<S
: StreamAdapter
+ '
static>(
62 ) -> Result
<usize, M17Error
> {
63 let adapter
= Arc
::new(adapter
);
64 let mut adapters
= self.adapters
.write().unwrap
();
65 let id
= adapters
.next
;
67 adapters
.stream
.insert
(id
, adapter
.clone());
69 if self.lifecycle() == Lifecycle
::Started
{
72 .map_err(|e
| M17Error
::Adapter(id
, e
))?
;
77 pub fn remove_packet_adapter(&self, id
: usize) -> Result
<(), M17Error
> {
78 if let Some(a
) = self.adapters
.write().unwrap
().packet
.remove(&id
) {
79 if self.lifecycle() == Lifecycle
::Started
{
80 a
.close().map_err(|e
| M17Error
::Adapter(id
, e
))?
;
86 pub fn remove_stream_adapter(&self, id
: usize) -> Result
<(), M17Error
> {
87 if let Some(a
) = self.adapters
.write().unwrap
().stream
.remove(&id
) {
88 if self.lifecycle() == Lifecycle
::Started
{
89 a
.close().map_err(|e
| M17Error
::Adapter(id
, e
))?
;
95 /// Create a handle that can be used to transmit data on the TNC
96 pub fn tx(&self) -> TxHandle
{
98 event_tx
: self.event_tx
.clone(),
102 pub fn start(&self) -> Result
<(), M17Errors
> {
103 if self.lifecycle() != Lifecycle
::Setup
{
104 return Err(M17Errors(vec
![M17Error
::InvalidStart
]));
106 self.set_lifecycle(Lifecycle
::Started
);
107 let mut errs
= vec
![];
109 let adapters
= self.adapters
.read().unwrap
();
110 for (i
, p
) in &adapters
.packet
{
111 if let Err(e
) = p
.start(self.tx()) {
112 errs
.push(M17Error
::Adapter(*i
, e
));
115 for (i
, s
) in &adapters
.stream
{
116 if let Err(e
) = s
.start(self.tx()) {
117 errs
.push(M17Error
::Adapter(*i
, e
));
121 let _
= self.event_tx
.send(TncControlEvent
::Start
);
129 pub fn close(&self) -> Result
<(), M17Errors
> {
130 if self.lifecycle() != Lifecycle
::Started
{
131 return Err(M17Errors(vec
![M17Error
::InvalidClose
]));
133 self.set_lifecycle(Lifecycle
::Closed
);
134 let mut errs
= vec
![];
136 let adapters
= self.adapters
.read().unwrap
();
137 for (i
, p
) in &adapters
.packet
{
138 if let Err(e
) = p
.close() {
139 errs
.push(M17Error
::Adapter(*i
, e
));
142 for (i
, s
) in &adapters
.stream
{
143 if let Err(e
) = s
.close() {
144 errs
.push(M17Error
::Adapter(*i
, e
));
148 // TODO: blocking function to indicate TNC has finished closing
149 // then we could call this in a signal handler to ensure PTT is dropped before quit
150 let _
= self.event_tx
.send(TncControlEvent
::Close
);
158 fn lifecycle(&self) -> Lifecycle
{
159 *self.lifecycle
.read().unwrap
()
162 fn set_lifecycle(&self, lifecycle
: Lifecycle
) {
163 *self.lifecycle
.write().unwrap
() = lifecycle
;
167 pub struct TxHandle
{
168 event_tx
: mpsc
::SyncSender
<TncControlEvent
>,
172 pub fn transmit_packet(
174 link_setup
: &LinkSetup
,
175 packet_type
: PacketType
,
177 ) -> Result
<(), M17Error
> {
178 let (pack_type
, pack_type_len
) = packet_type
.as_proto();
179 if pack_type_len
+ payload
.len() > 823 {
180 return Err(M17Error
::PacketTooLarge
{
181 provided
: payload
.len(),
182 capacity
: 823 - pack_type_len
,
185 let mut full_payload
= vec
![];
186 full_payload
.extend_from_slice(&pack_type
[0..pack_type_len
]);
187 full_payload
.extend_from_slice(payload
);
188 let crc
= m17core
::crc
::m17_crc(&full_payload
);
189 full_payload
.extend_from_slice(&crc
.to_be_bytes());
190 let kiss_frame
= KissFrame
::new_full_packet(&link_setup
.raw
.0, &full_payload
).unwrap
();
191 let _
= self.event_tx
.send(TncControlEvent
::Kiss(kiss_frame
));
195 pub fn transmit_stream_start(&self, link_setup
: &LinkSetup
) {
196 let kiss_frame
= KissFrame
::new_stream_setup(&link_setup
.raw
.0).unwrap
();
197 let _
= self.event_tx
.send(TncControlEvent
::Kiss(kiss_frame
));
200 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
202 pub fn transmit_stream_next(&self, stream
: &StreamFrame
) {
203 let kiss_frame
= KissFrame
::new_stream_data(stream
).unwrap
();
204 let _
= self.event_tx
.send(TncControlEvent
::Kiss(kiss_frame
));
208 /// Synchronised structure for listeners subscribing to packets and streams.
210 /// Each listener will be notified in turn of each event.
212 /// Identifier to be assigned to the next listener, starting from 0
214 packet
: HashMap
<usize, Arc
<dyn PacketAdapter
>>,
215 stream
: HashMap
<usize, Arc
<dyn StreamAdapter
>>,
222 packet
: HashMap
::new(),
223 stream
: HashMap
::new(),
228 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
229 #[allow(clippy::large_enum_variant)]
230 enum TncControlEvent
{
236 fn spawn_reader
<T
: Tnc
>(mut tnc
: T
, adapters
: Arc
<RwLock
<Adapters
>>) {
237 std
::thread
::spawn(move || {
238 let mut kiss_buffer
= KissBuffer
::new();
239 let mut stream_running
= false;
241 let buf
= kiss_buffer
.buf_remaining();
242 let n
= match tnc
.read(buf
) {
246 kiss_buffer
.did_write(n
);
247 while let Some(frame
) = kiss_buffer
.next_frame() {
248 if frame
.command() != Ok(KissCommand
::DataFrame
) {
252 Ok(m17core
::kiss
::PORT_PACKET_BASIC
) => {
254 // we will handle the more full-featured version from from port 1
256 Ok(m17core
::kiss
::PORT_PACKET_FULL
) => {
257 let mut payload
= [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
258 let Ok(n
) = frame
.decode_payload(&mut payload
) else {
259 debug
!("failed to decode payload from KISS frame");
263 debug
!("unusually short full packet frame");
266 let lsf
= LsfFrame(payload
[0..30].try_into().unwrap
());
267 if lsf
.check_crc() != 0 {
268 debug
!("LSF in full packet frame did not pass CRC");
271 if lsf
.encryption_type() != EncryptionType
::None
{
272 debug
!("we only understand None encryption for now - skipping packet");
275 let Some((packet_type
, type_len
)) = PacketType
::from_proto(&payload
[30..n
])
277 debug
!("failed to decode packet type");
280 if (n
- 30 - type_len
) < 2 {
281 debug
!("packet payload too small to provide CRC");
284 let packet_crc
= m17core
::crc
::m17_crc(&payload
[30..n
]);
286 debug
!("packet CRC does not pass");
289 let packet_payload
: Arc
<[u8]> =
290 Arc
::from(&payload
[(30 + type_len
)..(n
- 2)]);
293 adapters
.read().unwrap
().packet
.values().cloned().collect();
296 LinkSetup
::new_raw(lsf
.clone()),
298 packet_payload
.clone(),
302 Ok(m17core
::kiss
::PORT_STREAM
) => {
303 let mut payload
= [0u8; 32];
304 let Ok(n
) = frame
.decode_payload(&mut payload
) else {
305 debug
!("failed to decode stream payload from KISS frame");
309 let lsf
= LsfFrame(payload
[0..30].try_into().unwrap
());
310 if lsf
.check_crc() != 0 {
311 debug
!("initial LSF in stream did not pass CRC");
314 stream_running
= true;
316 adapters
.read().unwrap
().stream
.values().cloned().collect();
318 s
.stream_began(LinkSetup
::new_raw(lsf
.clone()));
322 debug
!("ignoring stream data as we didn't get a valid LSF first");
325 // TODO: parse LICH and handle the different changing subvalues META could have
326 if m17core
::crc
::m17_crc(&payload
[6..n
]) != 0 {
327 debug
!("stream data CRC mismatch");
330 let mut frame_number
= u16::from_be_bytes([payload
[6], payload
[7]]);
331 let is_final
= (frame_number
& 0x8000) > 0;
332 frame_number
&= 0x7fff;
333 let data
: [u8; 16] = payload
[8..24].try_into().unwrap
();
334 let data
= Arc
::new(data
);
336 stream_running
= false;
339 adapters
.read().unwrap
().stream
.values().cloned().collect();
341 s
.stream_data(frame_number
, is_final
, data
.clone());
352 fn spawn_writer
<T
: Tnc
>(mut tnc
: T
, event_rx
: mpsc
::Receiver
<TncControlEvent
>) {
353 std
::thread
::spawn(move || {
354 while let Ok(ev
) = event_rx
.recv() {
356 TncControlEvent
::Kiss(k
) => {
357 if let Err(e
) = tnc
.write_all(k
.as_bytes()) {
358 debug
!("kiss send err: {:?}", e
);
362 TncControlEvent
::Start
=> {
363 if let Err(e
) = tnc
.start() {
364 debug
!("tnc start err: {:?}", e
);
368 TncControlEvent
::Close
=> {
369 if let Err(e
) = tnc
.close() {
370 debug
!("tnc close err: {:?}", e
);
381 use crate::error
::AdapterError
;
382 use crate::{link_setup
::M17Address
, test_util
::NullTnc
};
387 fn packet_payload_len() {
388 let app
= M17App
::new(NullTnc
);
389 let res
= app
.tx().transmit_packet(
390 &LinkSetup
::new_packet(&M17Address
::new_broadcast(), &M17Address
::new_broadcast()),
394 assert
!(matches
!(res
, Ok(())));
395 let res
= app
.tx().transmit_packet(
396 &LinkSetup
::new_packet(&M17Address
::new_broadcast(), &M17Address
::new_broadcast()),
402 Err(M17Error
::PacketTooLarge
{
410 fn adapter_lifecycle() {
411 #[derive(Debug, PartialEq)]
416 macro_rules
! event_impl
{
417 ($target
:ty
, $
trait:ty
) => {
418 impl $
trait for $target
{
419 fn start(&self, _handle
: TxHandle
) -> Result
<(), AdapterError
> {
420 self.0.send(Event
::Started
)?
;
424 fn close(&self) -> Result
<(), AdapterError
> {
425 self.0.send(Event
::Closed
)?
;
431 struct FakePacket(mpsc
::SyncSender
<Event
>);
432 struct FakeStream(mpsc
::SyncSender
<Event
>);
433 event_impl
!(FakePacket
, PacketAdapter
);
434 event_impl
!(FakeStream
, StreamAdapter
);
436 let app
= M17App
::new(NullTnc
);
437 let (tx_p
, rx_p
) = mpsc
::sync_channel(128);
438 let (tx_s
, rx_s
) = mpsc
::sync_channel(128);
439 let packet
= FakePacket(tx_p
);
440 let stream
= FakeStream(tx_s
);
442 let id_p
= app
.add_packet_adapter(packet
).unwrap
();
443 let id_s
= app
.add_stream_adapter(stream
).unwrap
();
444 app
.start().unwrap
();
445 app
.close().unwrap
();
446 app
.remove_packet_adapter(id_p
).unwrap
();
447 app
.remove_stream_adapter(id_s
).unwrap
();
449 assert_eq
!(rx_p
.try_recv(), Ok(Event
::Started
));
450 assert_eq
!(rx_p
.try_recv(), Ok(Event
::Closed
));
451 assert_eq
!(rx_p
.try_recv(), Err(mpsc
::TryRecvError
::Disconnected
));
453 assert_eq
!(rx_s
.try_recv(), Ok(Event
::Started
));
454 assert_eq
!(rx_s
.try_recv(), Ok(Event
::Closed
));
455 assert_eq
!(rx_s
.try_recv(), Err(mpsc
::TryRecvError
::Disconnected
));