]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
Fix timing bugs and add documentation
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::link_setup::LinkSetup;
3 use crate::tnc::Tnc;
4 use crate::{LsfFrame, PacketType, StreamFrame};
5 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
6 use m17core::protocol::EncryptionType;
7
8 use log::debug;
9 use std::collections::HashMap;
10 use std::sync::mpsc;
11 use std::sync::{Arc, RwLock};
12
13 pub struct M17App {
14 adapters: Arc<RwLock<Adapters>>,
15 event_tx: mpsc::SyncSender<TncControlEvent>,
16 }
17
18 impl M17App {
19 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
20 let write_tnc = tnc.try_clone().unwrap();
21 let (event_tx, event_rx) = mpsc::sync_channel(128);
22 let listeners = Arc::new(RwLock::new(Adapters::new()));
23 spawn_reader(tnc, listeners.clone());
24 spawn_writer(write_tnc, event_rx);
25 Self {
26 adapters: listeners,
27 event_tx,
28 }
29 }
30
31 pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
32 let adapter = Arc::new(adapter);
33 let mut adapters = self.adapters.write().unwrap();
34 let id = adapters.next;
35 adapters.next += 1;
36 adapters.packet.insert(id, adapter.clone());
37 drop(adapters);
38 adapter.adapter_registered(id, self.tx());
39 id
40 }
41
42 pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
43 let adapter = Arc::new(adapter);
44 let mut adapters = self.adapters.write().unwrap();
45 let id = adapters.next;
46 adapters.next += 1;
47 adapters.stream.insert(id, adapter.clone());
48 drop(adapters);
49 adapter.adapter_registered(id, self.tx());
50 id
51 }
52
53 pub fn remove_packet_adapter(&self, id: usize) {
54 if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
55 a.adapter_removed();
56 }
57 }
58
59 pub fn remove_stream_adapter(&self, id: usize) {
60 if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
61 a.adapter_removed();
62 }
63 }
64
65 /// Create a handle that can be used to transmit data on the TNC
66 pub fn tx(&self) -> TxHandle {
67 TxHandle {
68 event_tx: self.event_tx.clone(),
69 }
70 }
71
72 pub fn start(&self) {
73 let _ = self.event_tx.send(TncControlEvent::Start);
74 }
75
76 pub fn close(&self) {
77 // TODO: blocking function to indicate TNC has finished closing
78 // then we could call this in a signal handler to ensure PTT is dropped before quit
79 let _ = self.event_tx.send(TncControlEvent::Close);
80 }
81 }
82
83 pub struct TxHandle {
84 event_tx: mpsc::SyncSender<TncControlEvent>,
85 }
86
87 impl TxHandle {
88 pub fn transmit_packet(&self, link_setup: &LinkSetup, packet_type: PacketType, payload: &[u8]) {
89 let (pack_type, pack_type_len) = packet_type.as_proto();
90 if pack_type_len + payload.len() > 823 {
91 // TODO: error for invalid transmission type
92 return;
93 }
94 let mut full_payload = vec![];
95 full_payload.extend_from_slice(&pack_type[0..pack_type_len]);
96 full_payload.extend_from_slice(payload);
97 let crc = m17core::crc::m17_crc(&full_payload);
98 full_payload.extend_from_slice(&crc.to_be_bytes());
99 let kiss_frame = KissFrame::new_full_packet(&link_setup.raw.0, &full_payload).unwrap();
100 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
101 }
102
103 pub fn transmit_stream_start(&self, link_setup: &LinkSetup) {
104 let kiss_frame = KissFrame::new_stream_setup(&link_setup.raw.0).unwrap();
105 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
106 }
107
108 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
109
110 pub fn transmit_stream_next(&self, stream: &StreamFrame) {
111 let kiss_frame = KissFrame::new_stream_data(stream).unwrap();
112 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
113 }
114 }
115
116 /// Synchronised structure for listeners subscribing to packets and streams.
117 ///
118 /// Each listener will be notified in turn of each event.
119 struct Adapters {
120 /// Identifier to be assigned to the next listener, starting from 0
121 next: usize,
122 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
123 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
124 }
125
126 impl Adapters {
127 fn new() -> Self {
128 Self {
129 next: 0,
130 packet: HashMap::new(),
131 stream: HashMap::new(),
132 }
133 }
134 }
135
136 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
137 #[allow(clippy::large_enum_variant)]
138 enum TncControlEvent {
139 Kiss(KissFrame),
140 Start,
141 Close,
142 }
143
144 fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
145 std::thread::spawn(move || {
146 let mut kiss_buffer = KissBuffer::new();
147 let mut stream_running = false;
148 loop {
149 let buf = kiss_buffer.buf_remaining();
150 let n = match tnc.read(buf) {
151 Ok(n) => n,
152 Err(_) => break,
153 };
154 kiss_buffer.did_write(n);
155 while let Some(frame) = kiss_buffer.next_frame() {
156 if frame.command() != Ok(KissCommand::DataFrame) {
157 continue;
158 }
159 match frame.port() {
160 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
161 // no action
162 // we will handle the more full-featured version from from port 1
163 }
164 Ok(m17core::kiss::PORT_PACKET_FULL) => {
165 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
166 let Ok(n) = frame.decode_payload(&mut payload) else {
167 debug!("failed to decode payload from KISS frame");
168 continue;
169 };
170 if n < 33 {
171 debug!("unusually short full packet frame");
172 continue;
173 }
174 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
175 if lsf.check_crc() != 0 {
176 debug!("LSF in full packet frame did not pass CRC");
177 continue;
178 }
179 if lsf.encryption_type() != EncryptionType::None {
180 debug!("we only understand None encryption for now - skipping packet");
181 continue;
182 }
183 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
184 else {
185 debug!("failed to decode packet type");
186 continue;
187 };
188 if (n - 30 - type_len) < 2 {
189 debug!("packet payload too small to provide CRC");
190 continue;
191 }
192 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
193 if packet_crc != 0 {
194 debug!("packet CRC does not pass");
195 continue;
196 }
197 let packet_payload: Arc<[u8]> =
198 Arc::from(&payload[(30 + type_len)..(n - 2)]);
199
200 let subs: Vec<_> =
201 adapters.read().unwrap().packet.values().cloned().collect();
202 for s in subs {
203 s.packet_received(
204 LinkSetup::new_raw(lsf.clone()),
205 packet_type,
206 packet_payload.clone(),
207 );
208 }
209 }
210 Ok(m17core::kiss::PORT_STREAM) => {
211 let mut payload = [0u8; 32];
212 let Ok(n) = frame.decode_payload(&mut payload) else {
213 debug!("failed to decode stream payload from KISS frame");
214 continue;
215 };
216 if n == 30 {
217 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
218 if lsf.check_crc() != 0 {
219 debug!("initial LSF in stream did not pass CRC");
220 continue;
221 }
222 stream_running = true;
223 let subs: Vec<_> =
224 adapters.read().unwrap().stream.values().cloned().collect();
225 for s in subs {
226 s.stream_began(LinkSetup::new_raw(lsf.clone()));
227 }
228 } else if n == 26 {
229 if !stream_running {
230 debug!("ignoring stream data as we didn't get a valid LSF first");
231 continue;
232 }
233 // TODO: parse LICH and handle the different changing subvalues META could have
234 if m17core::crc::m17_crc(&payload[6..n]) != 0 {
235 debug!("stream data CRC mismatch");
236 continue;
237 }
238 let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
239 let is_final = (frame_number & 0x8000) > 0;
240 frame_number &= 0x7fff;
241 let data: [u8; 16] = payload[8..24].try_into().unwrap();
242 let data = Arc::new(data);
243 if is_final {
244 stream_running = false;
245 }
246 let subs: Vec<_> =
247 adapters.read().unwrap().stream.values().cloned().collect();
248 for s in subs {
249 s.stream_data(frame_number, is_final, data.clone());
250 }
251 }
252 }
253 _ => (),
254 }
255 }
256 }
257 });
258 }
259
260 fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
261 std::thread::spawn(move || {
262 while let Ok(ev) = event_rx.recv() {
263 match ev {
264 TncControlEvent::Kiss(k) => {
265 if let Err(e) = tnc.write_all(k.as_bytes()) {
266 debug!("kiss send err: {:?}", e);
267 return;
268 }
269 }
270 TncControlEvent::Start => {
271 if let Err(e) = tnc.start() {
272 debug!("tnc start err: {:?}", e);
273 return;
274 }
275 }
276 TncControlEvent::Close => {
277 if let Err(e) = tnc.close() {
278 debug!("tnc close err: {:?}", e);
279 return;
280 }
281 }
282 }
283 }
284 });
285 }