]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
Correct maths for setting output filter gain
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::tnc::Tnc;
3 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
4 use m17core::protocol::{EncryptionType, LsfFrame, PacketType, StreamFrame};
5
6 use log::debug;
7 use std::collections::HashMap;
8 use std::sync::mpsc;
9 use std::sync::{Arc, RwLock};
10
11 pub struct M17App {
12 adapters: Arc<RwLock<Adapters>>,
13 event_tx: mpsc::SyncSender<TncControlEvent>,
14 }
15
16 impl M17App {
17 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
18 let write_tnc = tnc.try_clone().unwrap();
19 let (event_tx, event_rx) = mpsc::sync_channel(128);
20 let listeners = Arc::new(RwLock::new(Adapters::new()));
21 spawn_reader(tnc, listeners.clone());
22 spawn_writer(write_tnc, event_rx);
23 Self {
24 adapters: listeners,
25 event_tx,
26 }
27 }
28
29 pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
30 let adapter = Arc::new(adapter);
31 let mut adapters = self.adapters.write().unwrap();
32 let id = adapters.next;
33 adapters.next += 1;
34 adapters.packet.insert(id, adapter.clone());
35 drop(adapters);
36 adapter.adapter_registered(id, self.tx());
37 id
38 }
39
40 pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
41 let adapter = Arc::new(adapter);
42 let mut adapters = self.adapters.write().unwrap();
43 let id = adapters.next;
44 adapters.next += 1;
45 adapters.stream.insert(id, adapter.clone());
46 drop(adapters);
47 adapter.adapter_registered(id, self.tx());
48 id
49 }
50
51 pub fn remove_packet_adapter(&self, id: usize) {
52 if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
53 a.adapter_removed();
54 }
55 }
56
57 pub fn remove_stream_adapter(&self, id: usize) {
58 if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
59 a.adapter_removed();
60 }
61 }
62
63 /// Create a handle that can be used to transmit data on the TNC
64 pub fn tx(&self) -> TxHandle {
65 TxHandle {
66 event_tx: self.event_tx.clone(),
67 }
68 }
69
70 pub fn start(&self) {
71 let _ = self.event_tx.send(TncControlEvent::Start);
72 }
73
74 pub fn close(&self) {
75 let _ = self.event_tx.send(TncControlEvent::Close);
76 }
77 }
78
79 pub struct TxHandle {
80 event_tx: mpsc::SyncSender<TncControlEvent>,
81 }
82
83 impl TxHandle {
84 pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) {
85 // hang on where do we get the LSF details from? We need a destination obviously
86 // our source address needs to be configured here too
87 // also there is possible CAN, encryption, meta payload
88
89 // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
90 }
91
92 // add more methods here for stream outgoing
93
94 pub fn transmit_stream_start(&self, lsf: LsfFrame) {
95 // TODO: is asking for an LsfFrame a good idea or unfriendly API?
96 // What I should do here is create a LinkSetup struct which wraps an LsfFrame and can be loaded with a raw one
97 let kiss_frame = KissFrame::new_stream_setup(&lsf.0).unwrap();
98 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
99 }
100
101 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
102
103 pub fn transmit_stream_next(&self, stream: StreamFrame) {
104 let kiss_frame = KissFrame::new_stream_data(&stream).unwrap();
105 let _ = self.event_tx.send(TncControlEvent::Kiss(kiss_frame));
106 }
107 }
108
109 /// Synchronised structure for listeners subscribing to packets and streams.
110 ///
111 /// Each listener will be notified in turn of each event.
112 struct Adapters {
113 /// Identifier to be assigned to the next listener, starting from 0
114 next: usize,
115 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
116 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
117 }
118
119 impl Adapters {
120 fn new() -> Self {
121 Self {
122 next: 0,
123 packet: HashMap::new(),
124 stream: HashMap::new(),
125 }
126 }
127 }
128
129 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
130 enum TncControlEvent {
131 Kiss(KissFrame),
132 Start,
133 Close,
134 }
135
136 fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
137 std::thread::spawn(move || {
138 let mut kiss_buffer = KissBuffer::new();
139 let mut stream_running = false;
140 loop {
141 let mut buf = kiss_buffer.buf_remaining();
142 let n = match tnc.read(&mut buf) {
143 Ok(n) => n,
144 Err(_) => break,
145 };
146 kiss_buffer.did_write(n);
147 while let Some(frame) = kiss_buffer.next_frame() {
148 if frame.command() != Ok(KissCommand::DataFrame) {
149 continue;
150 }
151 match frame.port() {
152 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
153 // no action
154 // we will handle the more full-featured version from from port 1
155 }
156 Ok(m17core::kiss::PORT_PACKET_FULL) => {
157 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
158 let Ok(n) = frame.decode_payload(&mut payload) else {
159 debug!("failed to decode payload from KISS frame");
160 continue;
161 };
162 if n < 33 {
163 debug!("unusually short full packet frame");
164 continue;
165 }
166 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
167 if lsf.check_crc() != 0 {
168 debug!("LSF in full packet frame did not pass CRC");
169 continue;
170 }
171 if lsf.encryption_type() != EncryptionType::None {
172 debug!("we only understand None encryption for now - skipping packet");
173 continue;
174 }
175 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
176 else {
177 debug!("failed to decode packet type");
178 continue;
179 };
180 if (n - 30 - type_len) < 2 {
181 debug!("packet payload too small to provide CRC");
182 continue;
183 }
184 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
185 if packet_crc != 0 {
186 debug!("packet CRC does not pass");
187 continue;
188 }
189 let packet_payload: Arc<[u8]> =
190 Arc::from(&payload[(30 + type_len)..(n - 2)]);
191
192 let subs: Vec<_> =
193 adapters.read().unwrap().packet.values().cloned().collect();
194 for s in subs {
195 s.packet_received(
196 lsf.clone(),
197 packet_type.clone(),
198 packet_payload.clone(),
199 );
200 }
201 }
202 Ok(m17core::kiss::PORT_STREAM) => {
203 let mut payload = [0u8; 32];
204 let Ok(n) = frame.decode_payload(&mut payload) else {
205 debug!("failed to decode stream payload from KISS frame");
206 continue;
207 };
208 if n == 30 {
209 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
210 if lsf.check_crc() != 0 {
211 debug!("initial LSF in stream did not pass CRC");
212 continue;
213 }
214 stream_running = true;
215 let subs: Vec<_> =
216 adapters.read().unwrap().stream.values().cloned().collect();
217 for s in subs {
218 s.stream_began(lsf.clone());
219 }
220 } else if n == 26 {
221 if !stream_running {
222 debug!("ignoring stream data as we didn't get a valid LSF first");
223 continue;
224 }
225 // TODO: parse LICH and handle the different changing subvalues META could have
226 if m17core::crc::m17_crc(&payload[6..n]) != 0 {
227 debug!("stream data CRC mismatch");
228 continue;
229 }
230 let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
231 let is_final = (frame_number & 0x8000) > 0;
232 frame_number &= 0x7fff;
233 let data: [u8; 16] = payload[8..24].try_into().unwrap();
234 let data = Arc::new(data);
235 if is_final {
236 stream_running = false;
237 }
238 let subs: Vec<_> =
239 adapters.read().unwrap().stream.values().cloned().collect();
240 for s in subs {
241 s.stream_data(frame_number, is_final, data.clone());
242 }
243 }
244 }
245 _ => (),
246 }
247 }
248 }
249 });
250 }
251
252 fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
253 std::thread::spawn(move || {
254 while let Ok(ev) = event_rx.recv() {
255 match ev {
256 TncControlEvent::Kiss(k) => {
257 if let Err(e) = tnc.write_all(&k.as_bytes()) {
258 debug!("kiss send err: {:?}", e);
259 return;
260 }
261 }
262 TncControlEvent::Start => {
263 if let Err(e) = tnc.start() {
264 debug!("tnc start err: {:?}", e);
265 return;
266 }
267 }
268 TncControlEvent::Close => {
269 if let Err(e) = tnc.close() {
270 debug!("tnc close err: {:?}", e);
271 return;
272 }
273 }
274 }
275 }
276 });
277 }