]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
a7bb3cd291b18efbf154ec5d54807fe32faf408a
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::link_setup::LinkSetup;
3 use crate::tnc::Tnc;
4 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
5 use m17core::protocol::{EncryptionType, LsfFrame, PacketType, StreamFrame};
6
7 use log::debug;
8 use std::collections::HashMap;
9 use std::sync::mpsc;
10 use std::sync::{Arc, RwLock};
11
12 pub struct M17App {
13 adapters: Arc<RwLock<Adapters>>,
14 event_tx: mpsc::SyncSender<TncControlEvent>,
15 }
16
17 impl M17App {
18 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
19 let write_tnc = tnc.try_clone().unwrap();
20 let (event_tx, event_rx) = mpsc::sync_channel(128);
21 let listeners = Arc::new(RwLock::new(Adapters::new()));
22 spawn_reader(tnc, listeners.clone());
23 spawn_writer(write_tnc, event_rx);
24 Self {
25 adapters: listeners,
26 event_tx,
27 }
28 }
29
30 pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
31 let adapter = Arc::new(adapter);
32 let mut adapters = self.adapters.write().unwrap();
33 let id = adapters.next;
34 adapters.next += 1;
35 adapters.packet.insert(id, adapter.clone());
36 drop(adapters);
37 adapter.adapter_registered(id, self.tx());
38 id
39 }
40
41 pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
42 let adapter = Arc::new(adapter);
43 let mut adapters = self.adapters.write().unwrap();
44 let id = adapters.next;
45 adapters.next += 1;
46 adapters.stream.insert(id, adapter.clone());
47 drop(adapters);
48 adapter.adapter_registered(id, self.tx());
49 id
50 }
51
52 pub fn remove_packet_adapter(&self, id: usize) {
53 if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
54 a.adapter_removed();
55 }
56 }
57
58 pub fn remove_stream_adapter(&self, id: usize) {
59 if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
60 a.adapter_removed();
61 }
62 }
63
64 /// Create a handle that can be used to transmit data on the TNC
65 pub fn tx(&self) -> TxHandle {
66 TxHandle {
67 event_tx: self.event_tx.clone(),
68 }
69 }
70
71 pub fn start(&self) {
72 let _ = self.event_tx.send(TncControlEvent::Start);
73 }
74
75 pub fn close(&self) {
76 // TODO: blocking function to indicate TNC has finished closing
77 // then we could call this in a signal handler to ensure PTT is dropped before quit
78 let _ = self.event_tx.send(TncControlEvent::Close);
79 }
80 }
81
82 pub struct TxHandle {
83 event_tx: mpsc::SyncSender<TncControlEvent>,
84 }
85
86 impl TxHandle {
87 pub fn transmit_packet(
88 &self,
89 link_setup: &LinkSetup,
90 packet_type: &PacketType,
91 payload: &[u8],
92 ) {
93 let (pack_type, pack_type_len) = packet_type.as_proto();
94 if pack_type_len + payload.len() > 823 {
95 // TODO: error for invalid transmission type
96 return;
97 }
98 let mut full_payload = vec![];
99 full_payload.extend_from_slice(&pack_type[0..pack_type_len]);
100 full_payload.extend_from_slice(&payload);
101 let crc = m17core::crc::m17_crc(&full_payload);
102 full_payload.extend_from_slice(&crc.to_be_bytes());
103 let kiss_frame = KissFrame::new_full_packet(&link_setup.raw.0, &full_payload).unwrap();
104 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
105 }
106
107 pub fn transmit_stream_start(&self, link_setup: &LinkSetup) {
108 let kiss_frame = KissFrame::new_stream_setup(&link_setup.raw.0).unwrap();
109 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
110 }
111
112 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
113
114 pub fn transmit_stream_next(&self, stream: &StreamFrame) {
115 let kiss_frame = KissFrame::new_stream_data(&stream).unwrap();
116 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
117 }
118 }
119
120 /// Synchronised structure for listeners subscribing to packets and streams.
121 ///
122 /// Each listener will be notified in turn of each event.
123 struct Adapters {
124 /// Identifier to be assigned to the next listener, starting from 0
125 next: usize,
126 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
127 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
128 }
129
130 impl Adapters {
131 fn new() -> Self {
132 Self {
133 next: 0,
134 packet: HashMap::new(),
135 stream: HashMap::new(),
136 }
137 }
138 }
139
140 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
141 enum TncControlEvent {
142 Kiss(KissFrame),
143 Start,
144 Close,
145 }
146
147 fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
148 std::thread::spawn(move || {
149 let mut kiss_buffer = KissBuffer::new();
150 let mut stream_running = false;
151 loop {
152 let mut buf = kiss_buffer.buf_remaining();
153 let n = match tnc.read(&mut buf) {
154 Ok(n) => n,
155 Err(_) => break,
156 };
157 kiss_buffer.did_write(n);
158 while let Some(frame) = kiss_buffer.next_frame() {
159 if frame.command() != Ok(KissCommand::DataFrame) {
160 continue;
161 }
162 match frame.port() {
163 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
164 // no action
165 // we will handle the more full-featured version from from port 1
166 }
167 Ok(m17core::kiss::PORT_PACKET_FULL) => {
168 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
169 let Ok(n) = frame.decode_payload(&mut payload) else {
170 debug!("failed to decode payload from KISS frame");
171 continue;
172 };
173 if n < 33 {
174 debug!("unusually short full packet frame");
175 continue;
176 }
177 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
178 if lsf.check_crc() != 0 {
179 debug!("LSF in full packet frame did not pass CRC");
180 continue;
181 }
182 if lsf.encryption_type() != EncryptionType::None {
183 debug!("we only understand None encryption for now - skipping packet");
184 continue;
185 }
186 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
187 else {
188 debug!("failed to decode packet type");
189 continue;
190 };
191 if (n - 30 - type_len) < 2 {
192 debug!("packet payload too small to provide CRC");
193 continue;
194 }
195 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
196 if packet_crc != 0 {
197 debug!("packet CRC does not pass");
198 continue;
199 }
200 let packet_payload: Arc<[u8]> =
201 Arc::from(&payload[(30 + type_len)..(n - 2)]);
202
203 let subs: Vec<_> =
204 adapters.read().unwrap().packet.values().cloned().collect();
205 for s in subs {
206 s.packet_received(
207 lsf.clone(),
208 packet_type.clone(),
209 packet_payload.clone(),
210 );
211 }
212 }
213 Ok(m17core::kiss::PORT_STREAM) => {
214 let mut payload = [0u8; 32];
215 let Ok(n) = frame.decode_payload(&mut payload) else {
216 debug!("failed to decode stream payload from KISS frame");
217 continue;
218 };
219 if n == 30 {
220 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
221 if lsf.check_crc() != 0 {
222 debug!("initial LSF in stream did not pass CRC");
223 continue;
224 }
225 stream_running = true;
226 let subs: Vec<_> =
227 adapters.read().unwrap().stream.values().cloned().collect();
228 for s in subs {
229 s.stream_began(lsf.clone());
230 }
231 } else if n == 26 {
232 if !stream_running {
233 debug!("ignoring stream data as we didn't get a valid LSF first");
234 continue;
235 }
236 // TODO: parse LICH and handle the different changing subvalues META could have
237 if m17core::crc::m17_crc(&payload[6..n]) != 0 {
238 debug!("stream data CRC mismatch");
239 continue;
240 }
241 let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
242 let is_final = (frame_number & 0x8000) > 0;
243 frame_number &= 0x7fff;
244 let data: [u8; 16] = payload[8..24].try_into().unwrap();
245 let data = Arc::new(data);
246 if is_final {
247 stream_running = false;
248 }
249 let subs: Vec<_> =
250 adapters.read().unwrap().stream.values().cloned().collect();
251 for s in subs {
252 s.stream_data(frame_number, is_final, data.clone());
253 }
254 }
255 }
256 _ => (),
257 }
258 }
259 }
260 });
261 }
262
263 fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
264 std::thread::spawn(move || {
265 while let Ok(ev) = event_rx.recv() {
266 match ev {
267 TncControlEvent::Kiss(k) => {
268 if let Err(e) = tnc.write_all(&k.as_bytes()) {
269 debug!("kiss send err: {:?}", e);
270 return;
271 }
272 }
273 TncControlEvent::Start => {
274 if let Err(e) = tnc.start() {
275 debug!("tnc start err: {:?}", e);
276 return;
277 }
278 }
279 TncControlEvent::Close => {
280 if let Err(e) = tnc.close() {
281 debug!("tnc close err: {:?}", e);
282 return;
283 }
284 }
285 }
286 }
287 });
288 }