]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
Track preamble and EOT and estimate DCD
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::tnc::Tnc;
3 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
4 use m17core::protocol::{EncryptionType, LsfFrame, PacketType, StreamFrame};
5
6 use log::debug;
7 use std::collections::HashMap;
8 use std::sync::mpsc;
9 use std::sync::{Arc, RwLock};
10
11 pub struct M17App {
12 adapters: Arc<RwLock<Adapters>>,
13 event_tx: mpsc::SyncSender<TncControlEvent>,
14 }
15
16 impl M17App {
17 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
18 let write_tnc = tnc.try_clone().unwrap();
19 let (event_tx, event_rx) = mpsc::sync_channel(128);
20 let listeners = Arc::new(RwLock::new(Adapters::new()));
21 spawn_reader(tnc, listeners.clone());
22 spawn_writer(write_tnc, event_rx);
23 Self {
24 adapters: listeners,
25 event_tx,
26 }
27 }
28
29 pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
30 let adapter = Arc::new(adapter);
31 let mut adapters = self.adapters.write().unwrap();
32 let id = adapters.next;
33 adapters.next += 1;
34 adapters.packet.insert(id, adapter.clone());
35 drop(adapters);
36 adapter.adapter_registered(id, self.tx());
37 id
38 }
39
40 pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
41 let adapter = Arc::new(adapter);
42 let mut adapters = self.adapters.write().unwrap();
43 let id = adapters.next;
44 adapters.next += 1;
45 adapters.stream.insert(id, adapter.clone());
46 drop(adapters);
47 adapter.adapter_registered(id, self.tx());
48 id
49 }
50
51 pub fn remove_packet_adapter(&self, id: usize) {
52 if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
53 a.adapter_removed();
54 }
55 }
56
57 pub fn remove_stream_adapter(&self, id: usize) {
58 if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
59 a.adapter_removed();
60 }
61 }
62
63 /// Create a handle that can be used to transmit data on the TNC
64 pub fn tx(&self) -> TxHandle {
65 TxHandle {
66 event_tx: self.event_tx.clone(),
67 }
68 }
69
70 pub fn start(&self) {
71 let _ = self.event_tx.send(TncControlEvent::Start);
72 }
73
74 pub fn close(&self) {
75 // TODO: blocking function to indicate TNC has finished closing
76 // then we could call this in a signal handler to ensure PTT is dropped before quit
77 let _ = self.event_tx.send(TncControlEvent::Close);
78 }
79 }
80
81 pub struct TxHandle {
82 event_tx: mpsc::SyncSender<TncControlEvent>,
83 }
84
85 impl TxHandle {
86 pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) {
87 // hang on where do we get the LSF details from? We need a destination obviously
88 // our source address needs to be configured here too
89 // also there is possible CAN, encryption, meta payload
90
91 // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
92 }
93
94 // add more methods here for stream outgoing
95
96 pub fn transmit_stream_start(&self, lsf: LsfFrame) {
97 // TODO: is asking for an LsfFrame a good idea or unfriendly API?
98 // What I should do here is create a LinkSetup struct which wraps an LsfFrame and can be loaded with a raw one
99 let kiss_frame = KissFrame::new_stream_setup(&lsf.0).unwrap();
100 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
101 }
102
103 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
104
105 pub fn transmit_stream_next(&self, stream: StreamFrame) {
106 let kiss_frame = KissFrame::new_stream_data(&stream).unwrap();
107 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
108 }
109 }
110
111 /// Synchronised structure for listeners subscribing to packets and streams.
112 ///
113 /// Each listener will be notified in turn of each event.
114 struct Adapters {
115 /// Identifier to be assigned to the next listener, starting from 0
116 next: usize,
117 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
118 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
119 }
120
121 impl Adapters {
122 fn new() -> Self {
123 Self {
124 next: 0,
125 packet: HashMap::new(),
126 stream: HashMap::new(),
127 }
128 }
129 }
130
131 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
132 enum TncControlEvent {
133 Kiss(KissFrame),
134 Start,
135 Close,
136 }
137
138 fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
139 std::thread::spawn(move || {
140 let mut kiss_buffer = KissBuffer::new();
141 let mut stream_running = false;
142 loop {
143 let mut buf = kiss_buffer.buf_remaining();
144 let n = match tnc.read(&mut buf) {
145 Ok(n) => n,
146 Err(_) => break,
147 };
148 kiss_buffer.did_write(n);
149 while let Some(frame) = kiss_buffer.next_frame() {
150 if frame.command() != Ok(KissCommand::DataFrame) {
151 continue;
152 }
153 match frame.port() {
154 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
155 // no action
156 // we will handle the more full-featured version from from port 1
157 }
158 Ok(m17core::kiss::PORT_PACKET_FULL) => {
159 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
160 let Ok(n) = frame.decode_payload(&mut payload) else {
161 debug!("failed to decode payload from KISS frame");
162 continue;
163 };
164 if n < 33 {
165 debug!("unusually short full packet frame");
166 continue;
167 }
168 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
169 if lsf.check_crc() != 0 {
170 debug!("LSF in full packet frame did not pass CRC");
171 continue;
172 }
173 if lsf.encryption_type() != EncryptionType::None {
174 debug!("we only understand None encryption for now - skipping packet");
175 continue;
176 }
177 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
178 else {
179 debug!("failed to decode packet type");
180 continue;
181 };
182 if (n - 30 - type_len) < 2 {
183 debug!("packet payload too small to provide CRC");
184 continue;
185 }
186 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
187 if packet_crc != 0 {
188 debug!("packet CRC does not pass");
189 continue;
190 }
191 let packet_payload: Arc<[u8]> =
192 Arc::from(&payload[(30 + type_len)..(n - 2)]);
193
194 let subs: Vec<_> =
195 adapters.read().unwrap().packet.values().cloned().collect();
196 for s in subs {
197 s.packet_received(
198 lsf.clone(),
199 packet_type.clone(),
200 packet_payload.clone(),
201 );
202 }
203 }
204 Ok(m17core::kiss::PORT_STREAM) => {
205 let mut payload = [0u8; 32];
206 let Ok(n) = frame.decode_payload(&mut payload) else {
207 debug!("failed to decode stream payload from KISS frame");
208 continue;
209 };
210 if n == 30 {
211 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
212 if lsf.check_crc() != 0 {
213 debug!("initial LSF in stream did not pass CRC");
214 continue;
215 }
216 stream_running = true;
217 let subs: Vec<_> =
218 adapters.read().unwrap().stream.values().cloned().collect();
219 for s in subs {
220 s.stream_began(lsf.clone());
221 }
222 } else if n == 26 {
223 if !stream_running {
224 debug!("ignoring stream data as we didn't get a valid LSF first");
225 continue;
226 }
227 // TODO: parse LICH and handle the different changing subvalues META could have
228 if m17core::crc::m17_crc(&payload[6..n]) != 0 {
229 debug!("stream data CRC mismatch");
230 continue;
231 }
232 let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
233 let is_final = (frame_number & 0x8000) > 0;
234 frame_number &= 0x7fff;
235 let data: [u8; 16] = payload[8..24].try_into().unwrap();
236 let data = Arc::new(data);
237 if is_final {
238 stream_running = false;
239 }
240 let subs: Vec<_> =
241 adapters.read().unwrap().stream.values().cloned().collect();
242 for s in subs {
243 s.stream_data(frame_number, is_final, data.clone());
244 }
245 }
246 }
247 _ => (),
248 }
249 }
250 }
251 });
252 }
253
254 fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
255 std::thread::spawn(move || {
256 while let Ok(ev) = event_rx.recv() {
257 match ev {
258 TncControlEvent::Kiss(k) => {
259 if let Err(e) = tnc.write_all(&k.as_bytes()) {
260 debug!("kiss send err: {:?}", e);
261 return;
262 }
263 }
264 TncControlEvent::Start => {
265 if let Err(e) = tnc.start() {
266 debug!("tnc start err: {:?}", e);
267 return;
268 }
269 }
270 TncControlEvent::Close => {
271 if let Err(e) = tnc.close() {
272 debug!("tnc close err: {:?}", e);
273 return;
274 }
275 }
276 }
277 }
278 });
279 }