]>
code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
0abfab640b51937b9076b45cea922ea43c4e954a
1 use crate::adapter
::{PacketAdapter
, StreamAdapter
};
2 use crate::error
::M17Error
;
3 use crate::link_setup
::LinkSetup
;
5 use crate::{LsfFrame
, PacketType
, StreamFrame
};
6 use m17core
::kiss
::{KissBuffer
, KissCommand
, KissFrame
};
7 use m17core
::protocol
::EncryptionType
;
10 use std
::collections
::HashMap
;
12 use std
::sync
::{Arc
, RwLock
};
15 adapters
: Arc
<RwLock
<Adapters
>>,
16 event_tx
: mpsc
::SyncSender
<TncControlEvent
>,
20 pub fn new
<T
: Tnc
+ Send
+ '
static>(mut tnc
: T
) -> Self {
21 let write_tnc
= tnc
.try_clone().unwrap
();
22 let (event_tx
, event_rx
) = mpsc
::sync_channel(128);
23 let listeners
= Arc
::new(RwLock
::new(Adapters
::new()));
24 spawn_reader(tnc
, listeners
.clone());
25 spawn_writer(write_tnc
, event_rx
);
32 pub fn add_packet_adapter
<P
: PacketAdapter
+ '
static>(&self, adapter
: P
) -> usize {
33 let adapter
= Arc
::new(adapter
);
34 let mut adapters
= self.adapters
.write().unwrap
();
35 let id
= adapters
.next
;
37 adapters
.packet
.insert
(id
, adapter
.clone());
39 adapter
.adapter_registered(id
, self.tx());
43 pub fn add_stream_adapter
<S
: StreamAdapter
+ '
static>(&self, adapter
: S
) -> usize {
44 let adapter
= Arc
::new(adapter
);
45 let mut adapters
= self.adapters
.write().unwrap
();
46 let id
= adapters
.next
;
48 adapters
.stream
.insert
(id
, adapter
.clone());
50 adapter
.adapter_registered(id
, self.tx());
54 pub fn remove_packet_adapter(&self, id
: usize) {
55 if let Some(a
) = self.adapters
.write().unwrap
().packet
.remove(&id
) {
60 pub fn remove_stream_adapter(&self, id
: usize) {
61 if let Some(a
) = self.adapters
.write().unwrap
().stream
.remove(&id
) {
66 /// Create a handle that can be used to transmit data on the TNC
67 pub fn tx(&self) -> TxHandle
{
69 event_tx
: self.event_tx
.clone(),
74 let _
= self.event_tx
.send(TncControlEvent
::Start
);
78 // TODO: blocking function to indicate TNC has finished closing
79 // then we could call this in a signal handler to ensure PTT is dropped before quit
80 let _
= self.event_tx
.send(TncControlEvent
::Close
);
85 event_tx
: mpsc
::SyncSender
<TncControlEvent
>,
89 pub fn transmit_packet(
91 link_setup
: &LinkSetup
,
92 packet_type
: PacketType
,
94 ) -> Result
<(), M17Error
> {
95 let (pack_type
, pack_type_len
) = packet_type
.as_proto();
96 if pack_type_len
+ payload
.len() > 823 {
97 return Err(M17Error
::PacketTooLarge
{
98 provided
: payload
.len(),
99 capacity
: 823 - pack_type_len
,
102 let mut full_payload
= vec
![];
103 full_payload
.extend_from_slice(&pack_type
[0..pack_type_len
]);
104 full_payload
.extend_from_slice(payload
);
105 let crc
= m17core
::crc
::m17_crc(&full_payload
);
106 full_payload
.extend_from_slice(&crc
.to_be_bytes());
107 let kiss_frame
= KissFrame
::new_full_packet(&link_setup
.raw
.0, &full_payload
).unwrap
();
108 let _
= self.event_tx
.send(TncControlEvent
::Kiss(kiss_frame
));
112 pub fn transmit_stream_start(&self, link_setup
: &LinkSetup
) {
113 let kiss_frame
= KissFrame
::new_stream_setup(&link_setup
.raw
.0).unwrap
();
114 let _
= self.event_tx
.send(TncControlEvent
::Kiss(kiss_frame
));
117 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
119 pub fn transmit_stream_next(&self, stream
: &StreamFrame
) {
120 let kiss_frame
= KissFrame
::new_stream_data(stream
).unwrap
();
121 let _
= self.event_tx
.send(TncControlEvent
::Kiss(kiss_frame
));
125 /// Synchronised structure for listeners subscribing to packets and streams.
127 /// Each listener will be notified in turn of each event.
129 /// Identifier to be assigned to the next listener, starting from 0
131 packet
: HashMap
<usize, Arc
<dyn PacketAdapter
>>,
132 stream
: HashMap
<usize, Arc
<dyn StreamAdapter
>>,
139 packet
: HashMap
::new(),
140 stream
: HashMap
::new(),
145 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
146 #[allow(clippy::large_enum_variant)]
147 enum TncControlEvent
{
153 fn spawn_reader
<T
: Tnc
>(mut tnc
: T
, adapters
: Arc
<RwLock
<Adapters
>>) {
154 std
::thread
::spawn(move || {
155 let mut kiss_buffer
= KissBuffer
::new();
156 let mut stream_running
= false;
158 let buf
= kiss_buffer
.buf_remaining();
159 let n
= match tnc
.read(buf
) {
163 kiss_buffer
.did_write(n
);
164 while let Some(frame
) = kiss_buffer
.next_frame() {
165 if frame
.command() != Ok(KissCommand
::DataFrame
) {
169 Ok(m17core
::kiss
::PORT_PACKET_BASIC
) => {
171 // we will handle the more full-featured version from from port 1
173 Ok(m17core
::kiss
::PORT_PACKET_FULL
) => {
174 let mut payload
= [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
175 let Ok(n
) = frame
.decode_payload(&mut payload
) else {
176 debug
!("failed to decode payload from KISS frame");
180 debug
!("unusually short full packet frame");
183 let lsf
= LsfFrame(payload
[0..30].try_into().unwrap
());
184 if lsf
.check_crc() != 0 {
185 debug
!("LSF in full packet frame did not pass CRC");
188 if lsf
.encryption_type() != EncryptionType
::None
{
189 debug
!("we only understand None encryption for now - skipping packet");
192 let Some((packet_type
, type_len
)) = PacketType
::from_proto(&payload
[30..n
])
194 debug
!("failed to decode packet type");
197 if (n
- 30 - type_len
) < 2 {
198 debug
!("packet payload too small to provide CRC");
201 let packet_crc
= m17core
::crc
::m17_crc(&payload
[30..n
]);
203 debug
!("packet CRC does not pass");
206 let packet_payload
: Arc
<[u8]> =
207 Arc
::from(&payload
[(30 + type_len
)..(n
- 2)]);
210 adapters
.read().unwrap
().packet
.values().cloned().collect();
213 LinkSetup
::new_raw(lsf
.clone()),
215 packet_payload
.clone(),
219 Ok(m17core
::kiss
::PORT_STREAM
) => {
220 let mut payload
= [0u8; 32];
221 let Ok(n
) = frame
.decode_payload(&mut payload
) else {
222 debug
!("failed to decode stream payload from KISS frame");
226 let lsf
= LsfFrame(payload
[0..30].try_into().unwrap
());
227 if lsf
.check_crc() != 0 {
228 debug
!("initial LSF in stream did not pass CRC");
231 stream_running
= true;
233 adapters
.read().unwrap
().stream
.values().cloned().collect();
235 s
.stream_began(LinkSetup
::new_raw(lsf
.clone()));
239 debug
!("ignoring stream data as we didn't get a valid LSF first");
242 // TODO: parse LICH and handle the different changing subvalues META could have
243 if m17core
::crc
::m17_crc(&payload
[6..n
]) != 0 {
244 debug
!("stream data CRC mismatch");
247 let mut frame_number
= u16::from_be_bytes([payload
[6], payload
[7]]);
248 let is_final
= (frame_number
& 0x8000) > 0;
249 frame_number
&= 0x7fff;
250 let data
: [u8; 16] = payload
[8..24].try_into().unwrap
();
251 let data
= Arc
::new(data
);
253 stream_running
= false;
256 adapters
.read().unwrap
().stream
.values().cloned().collect();
258 s
.stream_data(frame_number
, is_final
, data
.clone());
269 fn spawn_writer
<T
: Tnc
>(mut tnc
: T
, event_rx
: mpsc
::Receiver
<TncControlEvent
>) {
270 std
::thread
::spawn(move || {
271 while let Ok(ev
) = event_rx
.recv() {
273 TncControlEvent
::Kiss(k
) => {
274 if let Err(e
) = tnc
.write_all(k
.as_bytes()) {
275 debug
!("kiss send err: {:?}", e
);
279 TncControlEvent
::Start
=> {
280 if let Err(e
) = tnc
.start() {
281 debug
!("tnc start err: {:?}", e
);
285 TncControlEvent
::Close
=> {
286 if let Err(e
) = tnc
.close() {
287 debug
!("tnc close err: {:?}", e
);
298 use crate::{link_setup
::M17Address
, test_util
::NullTnc
};
303 fn packet_payload_len() {
304 let app
= M17App
::new(NullTnc
);
305 let res
= app
.tx().transmit_packet(
306 &LinkSetup
::new_packet(&M17Address
::new_broadcast(), &M17Address
::new_broadcast()),
310 assert_eq
!(res
, Ok(()));
311 let res
= app
.tx().transmit_packet(
312 &LinkSetup
::new_packet(&M17Address
::new_broadcast(), &M17Address
::new_broadcast()),
318 Err(M17Error
::PacketTooLarge
{