]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
Implement Tnc for TcpStream
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::tnc::Tnc;
3 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
4 use m17core::protocol::{EncryptionType, LsfFrame, PacketType};
5
6 use log::debug;
7 use std::collections::HashMap;
8 use std::sync::mpsc;
9 use std::sync::{Arc, RwLock};
10
11 pub struct M17App {
12 adapters: Arc<RwLock<Adapters>>,
13 event_tx: mpsc::SyncSender<TncControlEvent>,
14 }
15
16 impl M17App {
17 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
18 let write_tnc = tnc.try_clone().unwrap();
19 let (event_tx, event_rx) = mpsc::sync_channel(128);
20 let listeners = Arc::new(RwLock::new(Adapters::new()));
21 spawn_reader(tnc, listeners.clone());
22 spawn_writer(write_tnc, event_rx);
23 Self {
24 adapters: listeners,
25 event_tx,
26 }
27 }
28
29 pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
30 let adapter = Arc::new(adapter);
31 let mut adapters = self.adapters.write().unwrap();
32 let id = adapters.next;
33 adapters.next += 1;
34 adapters.packet.insert(id, adapter.clone());
35 drop(adapters);
36 adapter.adapter_registered(id, self.tx());
37 id
38 }
39
40 pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
41 let adapter = Arc::new(adapter);
42 let mut adapters = self.adapters.write().unwrap();
43 let id = adapters.next;
44 adapters.next += 1;
45 adapters.stream.insert(id, adapter.clone());
46 drop(adapters);
47 adapter.adapter_registered(id, self.tx());
48 id
49 }
50
51 pub fn remove_packet_adapter(&self, id: usize) {
52 if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
53 a.adapter_removed();
54 }
55 }
56
57 pub fn remove_stream_adapter(&self, id: usize) {
58 if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
59 a.adapter_removed();
60 }
61 }
62
63 /// Create a handle that can be used to transmit data on the TNC
64 pub fn tx(&self) -> TxHandle {
65 TxHandle {
66 event_tx: self.event_tx.clone(),
67 }
68 }
69
70 pub fn start(&self) {
71 let _ = self.event_tx.send(TncControlEvent::Start);
72 }
73
74 pub fn close(&self) {
75 let _ = self.event_tx.send(TncControlEvent::Close);
76 }
77 }
78
79 pub struct TxHandle {
80 event_tx: mpsc::SyncSender<TncControlEvent>,
81 }
82
83 impl TxHandle {
84 pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) {
85 // hang on where do we get the LSF details from? We need a destination obviously
86 // our source address needs to be configured here too
87 // also there is possible CAN, encryption, meta payload
88
89 // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
90 }
91
92 // add more methods here for stream outgoing
93
94 pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {}
95
96 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
97
98 pub fn transmit_stream_next(&self, /* next payload, */ end_of_stream: bool) {}
99 }
100
101 /// Synchronised structure for listeners subscribing to packets and streams.
102 ///
103 /// Each listener will be notified in turn of each event.
104 struct Adapters {
105 /// Identifier to be assigned to the next listener, starting from 0
106 next: usize,
107 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
108 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
109 }
110
111 impl Adapters {
112 fn new() -> Self {
113 Self {
114 next: 0,
115 packet: HashMap::new(),
116 stream: HashMap::new(),
117 }
118 }
119 }
120
121 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
122 enum TncControlEvent {
123 Kiss(KissFrame),
124 Start,
125 Close,
126 }
127
128 fn spawn_reader<T: Tnc>(mut tnc: T, adapters: Arc<RwLock<Adapters>>) {
129 std::thread::spawn(move || {
130 let mut kiss_buffer = KissBuffer::new();
131 let mut stream_running = false;
132 loop {
133 let mut buf = kiss_buffer.buf_remaining();
134 let n = match tnc.read(&mut buf) {
135 Ok(n) => n,
136 Err(_) => break,
137 };
138 kiss_buffer.did_write(n);
139 while let Some(frame) = kiss_buffer.next_frame() {
140 if frame.command() != Ok(KissCommand::DataFrame) {
141 continue;
142 }
143 match frame.port() {
144 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
145 // no action
146 // we will handle the more full-featured version from from port 1
147 }
148 Ok(m17core::kiss::PORT_PACKET_FULL) => {
149 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
150 let Ok(n) = frame.decode_payload(&mut payload) else {
151 debug!("failed to decode payload from KISS frame");
152 continue;
153 };
154 if n < 33 {
155 debug!("unusually short full packet frame");
156 continue;
157 }
158 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
159 if lsf.crc() != 0 {
160 debug!("LSF in full packet frame did not pass CRC");
161 continue;
162 }
163 if lsf.encryption_type() != EncryptionType::None {
164 debug!("we only understand None encryption for now - skipping packet");
165 continue;
166 }
167 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
168 else {
169 debug!("failed to decode packet type");
170 continue;
171 };
172 if (n - 30 - type_len) < 2 {
173 debug!("packet payload too small to provide CRC");
174 continue;
175 }
176 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
177 if packet_crc != 0 {
178 debug!("packet CRC does not pass");
179 continue;
180 }
181 let packet_payload: Arc<[u8]> =
182 Arc::from(&payload[(30 + type_len)..(n - 2)]);
183
184 let subs: Vec<_> =
185 adapters.read().unwrap().packet.values().cloned().collect();
186 for s in subs {
187 s.packet_received(
188 lsf.clone(),
189 packet_type.clone(),
190 packet_payload.clone(),
191 );
192 }
193 }
194 Ok(m17core::kiss::PORT_STREAM) => {
195 let mut payload = [0u8; 32];
196 let Ok(n) = frame.decode_payload(&mut payload) else {
197 debug!("failed to decode stream payload from KISS frame");
198 continue;
199 };
200 if n == 30 {
201 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
202 if lsf.crc() != 0 {
203 debug!("initial LSF in stream did not pass CRC");
204 continue;
205 }
206 stream_running = true;
207 let subs: Vec<_> =
208 adapters.read().unwrap().stream.values().cloned().collect();
209 for s in subs {
210 s.stream_began(lsf.clone());
211 }
212 } else if n == 26 {
213 if !stream_running {
214 debug!("ignoring stream data as we didn't get a valid LSF first");
215 continue;
216 }
217 // TODO: parse LICH and handle the different changing subvalues META could have
218 if m17core::crc::m17_crc(&payload[6..n]) != 0 {
219 debug!("stream data CRC mismatch");
220 continue;
221 }
222 let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]);
223 let is_final = (frame_number & 0x8000) > 0;
224 frame_number &= 0x7fff;
225 let data: [u8; 16] = payload[8..24].try_into().unwrap();
226 let data = Arc::new(data);
227 if is_final {
228 stream_running = false;
229 }
230 let subs: Vec<_> =
231 adapters.read().unwrap().stream.values().cloned().collect();
232 for s in subs {
233 s.stream_data(frame_number, is_final, data.clone());
234 }
235 }
236 }
237 _ => (),
238 }
239 }
240 }
241 });
242 }
243
244 fn spawn_writer<T: Tnc>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
245 std::thread::spawn(move || {
246 while let Ok(ev) = event_rx.recv() {
247 match ev {
248 TncControlEvent::Kiss(k) => {
249 if let Err(e) = tnc.write_all(&k.as_bytes()) {
250 debug!("kiss send err: {:?}", e);
251 return;
252 }
253 }
254 TncControlEvent::Start => {
255 if let Err(e) = tnc.start() {
256 debug!("tnc start err: {:?}", e);
257 return;
258 }
259 }
260 TncControlEvent::Close => {
261 if let Err(e) = tnc.close() {
262 debug!("tnc close err: {:?}", e);
263 return;
264 }
265 }
266 }
267 }
268 });
269 }