]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
Add and test error if packet payload is too large
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::error::M17Error;
3 use crate::link_setup::LinkSetup;
4 use crate::tnc::Tnc;
5 use crate::{LsfFrame, PacketType, StreamFrame};
6 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
7 use m17core::protocol::EncryptionType;
8
9 use log::debug;
10 use std::collections::HashMap;
11 use std::sync::mpsc;
12 use std::sync::{Arc, RwLock};
13
14 pub struct M17App {
15 adapters: Arc<RwLock<Adapters>>,
16 event_tx: mpsc::SyncSender<TncControlEvent>,
17 }
18
19 impl M17App {
20 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
21 let write_tnc = tnc.try_clone().unwrap();
22 let (event_tx, event_rx) = mpsc::sync_channel(128);
23 let listeners = Arc::new(RwLock::new(Adapters::new()));
24 spawn_reader(tnc, listeners.clone());
25 spawn_writer(write_tnc, event_rx);
26 Self {
27 adapters: listeners,
28 event_tx,
29 }
30 }
31
32 pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
33 let adapter = Arc::new(adapter);
34 let mut adapters = self.adapters.write().unwrap();
35 let id = adapters.next;
36 adapters.next += 1;
37 adapters.packet.insert(id, adapter.clone());
38 drop(adapters);
39 adapter.adapter_registered(id, self.tx());
40 id
41 }
42
43 pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
44 let adapter = Arc::new(adapter);
45 let mut adapters = self.adapters.write().unwrap();
46 let id = adapters.next;
47 adapters.next += 1;
48 adapters.stream.insert(id, adapter.clone());
49 drop(adapters);
50 adapter.adapter_registered(id, self.tx());
51 id
52 }
53
54 pub fn remove_packet_adapter(&self, id: usize) {
55 if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
56 a.adapter_removed();
57 }
58 }
59
60 pub fn remove_stream_adapter(&self, id: usize) {
61 if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
62 a.adapter_removed();
63 }
64 }
65
66 /// Create a handle that can be used to transmit data on the TNC
67 pub fn tx(&self) -> TxHandle {
68 TxHandle {
69 event_tx: self.event_tx.clone(),
70 }
71 }
72
73 pub fn start(&self) {
74 let _ = self.event_tx.send(TncControlEvent::Start);
75 }
76
77 pub fn close(&self) {
78 // TODO: blocking function to indicate TNC has finished closing
79 // then we could call this in a signal handler to ensure PTT is dropped before quit
80 let _ = self.event_tx.send(TncControlEvent::Close);
81 }
82 }
83
84 pub struct TxHandle {
85 event_tx: mpsc::SyncSender<TncControlEvent>,
86 }
87
88 impl TxHandle {
89 pub fn transmit_packet(
90 &self,
91 link_setup: &LinkSetup,
92 packet_type: PacketType,
93 payload: &[u8],
94 ) -> Result<(), M17Error> {
95 let (pack_type, pack_type_len) = packet_type.as_proto();
96 if pack_type_len + payload.len() > 823 {
97 return Err(M17Error::PacketTooLarge {
98 provided: payload.len(),
99 capacity: 823 - pack_type_len,
100 });
101 }
102 let mut full_payload = vec![];
103 full_payload.extend_from_slice(&pack_type[0..pack_type_len]);
104 full_payload.extend_from_slice(payload);
105 let crc = m17core::crc::m17_crc(&full_payload);
106 full_payload.extend_from_slice(&crc.to_be_bytes());
107 let kiss_frame = KissFrame::new_full_packet(&link_setup.raw.0, &full_payload).unwrap();
108 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
109 Ok(())
110 }
111
112 pub fn transmit_stream_start(&self, link_setup: &LinkSetup) {
113 let kiss_frame = KissFrame::new_stream_setup(&link_setup.raw.0).unwrap();
114 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
115 }
116
117 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
118
119 pub fn transmit_stream_next(&self, stream: &StreamFrame) {
120 let kiss_frame = KissFrame::new_stream_data(stream).unwrap();
121 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
122 }
123 }
124
125 /// Synchronised structure for listeners subscribing to packets and streams.
126 ///
127 /// Each listener will be notified in turn of each event.
128 struct Adapters {
129 /// Identifier to be assigned to the next listener, starting from 0
130 next: usize,
131 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
132 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
133 }
134
135 impl Adapters {
136 fn new() -> Self {
137 Self {
138 next: 0,
139 packet: HashMap::new(),
140 stream: HashMap::new(),
141 }
142 }
143 }
144
145 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
146 #[allow(clippy::large_enum_variant)]
147 enum TncControlEvent {
148 Kiss(KissFrame),
149 Start,
150 Close,
151 }
152
153 fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
154 std::thread::spawn(move || {
155 let mut kiss_buffer = KissBuffer::new();
156 let mut stream_running = false;
157 loop {
158 let buf = kiss_buffer.buf_remaining();
159 let n = match tnc.read(buf) {
160 Ok(n) => n,
161 Err(_) => break,
162 };
163 kiss_buffer.did_write(n);
164 while let Some(frame) = kiss_buffer.next_frame() {
165 if frame.command() != Ok(KissCommand::DataFrame) {
166 continue;
167 }
168 match frame.port() {
169 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
170 // no action
171 // we will handle the more full-featured version from from port 1
172 }
173 Ok(m17core::kiss::PORT_PACKET_FULL) => {
174 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
175 let Ok(n) = frame.decode_payload(&mut payload) else {
176 debug!("failed to decode payload from KISS frame");
177 continue;
178 };
179 if n < 33 {
180 debug!("unusually short full packet frame");
181 continue;
182 }
183 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
184 if lsf.check_crc() != 0 {
185 debug!("LSF in full packet frame did not pass CRC");
186 continue;
187 }
188 if lsf.encryption_type() != EncryptionType::None {
189 debug!("we only understand None encryption for now - skipping packet");
190 continue;
191 }
192 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
193 else {
194 debug!("failed to decode packet type");
195 continue;
196 };
197 if (n - 30 - type_len) < 2 {
198 debug!("packet payload too small to provide CRC");
199 continue;
200 }
201 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
202 if packet_crc != 0 {
203 debug!("packet CRC does not pass");
204 continue;
205 }
206 let packet_payload: Arc<[u8]> =
207 Arc::from(&payload[(30 + type_len)..(n - 2)]);
208
209 let subs: Vec<_> =
210 adapters.read().unwrap().packet.values().cloned().collect();
211 for s in subs {
212 s.packet_received(
213 LinkSetup::new_raw(lsf.clone()),
214 packet_type,
215 packet_payload.clone(),
216 );
217 }
218 }
219 Ok(m17core::kiss::PORT_STREAM) => {
220 let mut payload = [0u8; 32];
221 let Ok(n) = frame.decode_payload(&mut payload) else {
222 debug!("failed to decode stream payload from KISS frame");
223 continue;
224 };
225 if n == 30 {
226 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
227 if lsf.check_crc() != 0 {
228 debug!("initial LSF in stream did not pass CRC");
229 continue;
230 }
231 stream_running = true;
232 let subs: Vec<_> =
233 adapters.read().unwrap().stream.values().cloned().collect();
234 for s in subs {
235 s.stream_began(LinkSetup::new_raw(lsf.clone()));
236 }
237 } else if n == 26 {
238 if !stream_running {
239 debug!("ignoring stream data as we didn't get a valid LSF first");
240 continue;
241 }
242 // TODO: parse LICH and handle the different changing subvalues META could have
243 if m17core::crc::m17_crc(&payload[6..n]) != 0 {
244 debug!("stream data CRC mismatch");
245 continue;
246 }
247 let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
248 let is_final = (frame_number & 0x8000) > 0;
249 frame_number &= 0x7fff;
250 let data: [u8; 16] = payload[8..24].try_into().unwrap();
251 let data = Arc::new(data);
252 if is_final {
253 stream_running = false;
254 }
255 let subs: Vec<_> =
256 adapters.read().unwrap().stream.values().cloned().collect();
257 for s in subs {
258 s.stream_data(frame_number, is_final, data.clone());
259 }
260 }
261 }
262 _ => (),
263 }
264 }
265 }
266 });
267 }
268
269 fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
270 std::thread::spawn(move || {
271 while let Ok(ev) = event_rx.recv() {
272 match ev {
273 TncControlEvent::Kiss(k) => {
274 if let Err(e) = tnc.write_all(k.as_bytes()) {
275 debug!("kiss send err: {:?}", e);
276 return;
277 }
278 }
279 TncControlEvent::Start => {
280 if let Err(e) = tnc.start() {
281 debug!("tnc start err: {:?}", e);
282 return;
283 }
284 }
285 TncControlEvent::Close => {
286 if let Err(e) = tnc.close() {
287 debug!("tnc close err: {:?}", e);
288 return;
289 }
290 }
291 }
292 }
293 });
294 }
295
296 #[cfg(test)]
297 mod tests {
298 use crate::{link_setup::M17Address, test_util::NullTnc};
299
300 use super::*;
301
302 #[test]
303 fn packet_payload_len() {
304 let app = M17App::new(NullTnc);
305 let res = app.tx().transmit_packet(
306 &LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()),
307 PacketType::Raw,
308 &[0u8; 100],
309 );
310 assert_eq!(res, Ok(()));
311 let res = app.tx().transmit_packet(
312 &LinkSetup::new_packet(&M17Address::new_broadcast(), &M17Address::new_broadcast()),
313 PacketType::Raw,
314 &[0u8; 900],
315 );
316 assert_eq!(
317 res,
318 Err(M17Error::PacketTooLarge {
319 provided: 900,
320 capacity: 822
321 })
322 );
323 }
324 }