]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
05e3167139eb0f9ccadd27838643d3354970aaa2
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::tnc::Tnc;
3 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
4 use m17core::protocol::{EncryptionType, LsfFrame, PacketType};
5
6 use log::debug;
7 use std::collections::HashMap;
8 use std::sync::mpsc;
9 use std::sync::{Arc, RwLock};
10
11 pub struct M17App {
12 adapters: Arc<RwLock<Adapters>>,
13 event_tx: mpsc::SyncSender<TncControlEvent>,
14 }
15
16 impl M17App {
17 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
18 let write_tnc = tnc.try_clone().unwrap();
19 let (event_tx, event_rx) = mpsc::sync_channel(128);
20 let listeners = Arc::new(RwLock::new(Adapters::new()));
21 spawn_reader(tnc, listeners.clone());
22 spawn_writer(write_tnc, event_rx);
23 Self {
24 adapters: listeners,
25 event_tx,
26 }
27 }
28
29 pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
30 let adapter = Arc::new(adapter);
31 let mut adapters = self.adapters.write().unwrap();
32 let id = adapters.next;
33 adapters.next += 1;
34 adapters.packet.insert(id, adapter.clone());
35 drop(adapters);
36 adapter.adapter_registered(id, self.tx());
37 id
38 }
39
40 pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
41 let adapter = Arc::new(adapter);
42 let mut adapters = self.adapters.write().unwrap();
43 let id = adapters.next;
44 adapters.next += 1;
45 adapters.stream.insert(id, adapter.clone());
46 drop(adapters);
47 adapter.adapter_registered(id, self.tx());
48 id
49 }
50
51 pub fn remove_packet_adapter(&self, id: usize) {
52 if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
53 a.adapter_removed();
54 }
55 }
56
57 pub fn remove_stream_adapter(&self, id: usize) {
58 if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
59 a.adapter_removed();
60 }
61 }
62
63 pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) {
64 // hang on where do we get the LSF details from? We need a destination obviously
65 // our source address needs to be configured here too
66 // also there is possible CAN, encryption, meta payload
67
68 // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
69 }
70
71 /// Create a handle that can be used to transmit data on the TNC
72 pub fn tx(&self) -> TxHandle {
73 TxHandle {
74 event_tx: self.event_tx.clone(),
75 }
76 }
77
78 pub fn start(&self) {
79 let _ = self.event_tx.send(TncControlEvent::Start);
80 }
81
82 pub fn close(&self) {
83 let _ = self.event_tx.send(TncControlEvent::Close);
84 }
85 }
86
87 pub struct TxHandle {
88 event_tx: mpsc::SyncSender<TncControlEvent>,
89 }
90
91 impl TxHandle {
92 // add more methods here for stream outgoing
93
94 pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {}
95
96 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
97
98 pub fn transmit_stream_next(&self, /* next payload, */ end_of_stream: bool) {}
99 }
100
101 /// Synchronised structure for listeners subscribing to packets and streams.
102 ///
103 /// Each listener will be notified in turn of each event.
104 struct Adapters {
105 /// Identifier to be assigned to the next listener, starting from 0
106 next: usize,
107 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
108 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
109 }
110
111 impl Adapters {
112 fn new() -> Self {
113 Self {
114 next: 0,
115 packet: HashMap::new(),
116 stream: HashMap::new(),
117 }
118 }
119 }
120
121 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
122 enum TncControlEvent {
123 Kiss(KissFrame),
124 Start,
125 Close,
126 }
127
128 fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Adapters>>) {
129 std::thread::spawn(move || {
130 let mut kiss_buffer = KissBuffer::new();
131 loop {
132 let mut buf = kiss_buffer.buf_remaining();
133 let n = match tnc.read(&mut buf) {
134 Ok(n) => n,
135 Err(_) => break,
136 };
137 kiss_buffer.did_write(n);
138 while let Some(frame) = kiss_buffer.next_frame() {
139 if frame.command() != Ok(KissCommand::DataFrame) {
140 continue;
141 }
142 match frame.port() {
143 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
144 // no action
145 // we will handle the more full-featured version from from port 1
146 }
147 Ok(m17core::kiss::PORT_PACKET_FULL) => {
148 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
149 let Ok(n) = frame.decode_payload(&mut payload) else {
150 debug!("failed to decode payload from KISS frame");
151 continue;
152 };
153 if n < 33 {
154 debug!("unusually short full packet frame");
155 continue;
156 }
157 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
158 if lsf.crc() != 0 {
159 debug!("LSF in full packet frame did not pass CRC");
160 continue;
161 }
162 if lsf.encryption_type() != EncryptionType::None {
163 debug!("we only understand None encryption for now - skipping packet");
164 continue;
165 }
166 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
167 else {
168 debug!("failed to decode packet type");
169 continue;
170 };
171 if (n - 30 - type_len) < 2 {
172 debug!("packet payload too small to provide CRC");
173 continue;
174 }
175 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
176 if packet_crc != 0 {
177 debug!("packet CRC does not pass");
178 continue;
179 }
180 let packet_payload: Arc<[u8]> =
181 Arc::from(&payload[(30 + type_len)..(n - 2)]);
182
183 let subs: Vec<_> =
184 listeners.read().unwrap().packet.values().cloned().collect();
185 for s in subs {
186 s.packet_received(
187 lsf.clone(),
188 packet_type.clone(),
189 packet_payload.clone(),
190 );
191 }
192 }
193 Ok(m17core::kiss::PORT_STREAM) => {
194 // handle stream and send it to subscribers
195 }
196 _ => (),
197 }
198 }
199 }
200 });
201 }
202
203 fn spawn_writer<T: Tnc + Send + 'static>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
204 std::thread::spawn(move || {
205 while let Ok(ev) = event_rx.recv() {
206 match ev {
207 TncControlEvent::Kiss(k) => {
208 if let Err(e) = tnc.write_all(&k.as_bytes()) {
209 debug!("kiss send err: {:?}", e);
210 return;
211 }
212 }
213 TncControlEvent::Start => {
214 if let Err(e) = tnc.start() {
215 debug!("tnc start err: {:?}", e);
216 return;
217 }
218 }
219 TncControlEvent::Close => {
220 if let Err(e) = tnc.close() {
221 debug!("tnc close err: {:?}", e);
222 return;
223 }
224 }
225 }
226 }
227 });
228 }