]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
4a7a345f5fa1bf709fa0f2cb5e1a52c5f4caf0f2
[m17rt] / m17app / src / app.rs
1 use crate::adapter::{PacketAdapter, StreamAdapter};
2 use crate::tnc::Tnc;
3 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
4 use m17core::protocol::{EncryptionType, LsfFrame, PacketType};
5
6 use log::debug;
7 use std::collections::HashMap;
8 use std::sync::mpsc;
9 use std::sync::{Arc, RwLock};
10
11 pub struct M17App {
12 listeners: Arc<RwLock<Listeners>>,
13 event_tx: mpsc::SyncSender<TncControlEvent>,
14 }
15
16 impl M17App {
17 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
18 let write_tnc = tnc.try_clone().unwrap();
19 let (event_tx, event_rx) = mpsc::sync_channel(128);
20 let listeners = Arc::new(RwLock::new(Listeners::new()));
21 spawn_reader(tnc, listeners.clone());
22 spawn_writer(write_tnc, event_rx);
23 Self {
24 listeners,
25 event_tx,
26 }
27 }
28
29 pub fn add_packet_listener<P: PacketAdapter + 'static>(&self, listener: P) -> usize {
30 let mut listeners = self.listeners.write().unwrap();
31 let id = listeners.next;
32 listeners.next += 1;
33 listeners.packet.insert(id, Arc::new(listener));
34 id
35 }
36
37 pub fn add_stream_listener<S: StreamAdapter + 'static>(&self, listener: S) -> usize {
38 let mut listeners = self.listeners.write().unwrap();
39 let id = listeners.next;
40 listeners.next += 1;
41 listeners.stream.insert(id, Arc::new(listener));
42 id
43 }
44
45 pub fn remove_packet_listener(&self, id: usize) {
46 self.listeners.write().unwrap().packet.remove(&id);
47 }
48
49 pub fn remove_stream_listener(&self, id: usize) {
50 self.listeners.write().unwrap().stream.remove(&id);
51 }
52
53 pub fn transmit_packet(&self, type_code: PacketType, payload: &[u8]) {
54 // hang on where do we get the LSF details from? We need a destination obviously
55 // our source address needs to be configured here too
56 // also there is possible CAN, encryption, meta payload
57
58 // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
59 }
60
61 /// Create a handle that can be used to transmit data on the TNC
62 pub fn tx(&self) -> TxHandle {
63 TxHandle {
64 event_tx: self.event_tx.clone(),
65 }
66 }
67
68 pub fn start(&self) {
69 let _ = self.event_tx.send(TncControlEvent::Start);
70 }
71
72 pub fn close(&self) {
73 let _ = self.event_tx.send(TncControlEvent::Close);
74 }
75 }
76
77 pub struct TxHandle {
78 event_tx: mpsc::SyncSender<TncControlEvent>,
79 }
80
81 impl TxHandle {
82 // add more methods here for stream outgoing
83
84 pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {}
85
86 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
87
88 pub fn transmit_stream_next(&self, /* next payload, */ end_of_stream: bool) {}
89 }
90
91 /// Synchronised structure for listeners subscribing to packets and streams.
92 ///
93 /// Each listener will be notified in turn of each event.
94 struct Listeners {
95 /// Identifier to be assigned to the next listener, starting from 0
96 next: usize,
97 packet: HashMap<usize, Arc<dyn PacketAdapter>>,
98 stream: HashMap<usize, Arc<dyn StreamAdapter>>,
99 }
100
101 impl Listeners {
102 fn new() -> Self {
103 Self {
104 next: 0,
105 packet: HashMap::new(),
106 stream: HashMap::new(),
107 }
108 }
109 }
110
111 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
112 enum TncControlEvent {
113 Kiss(KissFrame),
114 Start,
115 Close,
116 }
117
118 fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Listeners>>) {
119 std::thread::spawn(move || {
120 let mut kiss_buffer = KissBuffer::new();
121 loop {
122 let mut buf = kiss_buffer.buf_remaining();
123 let n = match tnc.read(&mut buf) {
124 Ok(n) => n,
125 Err(_) => break,
126 };
127 kiss_buffer.did_write(n);
128 while let Some(frame) = kiss_buffer.next_frame() {
129 if frame.command() != Ok(KissCommand::DataFrame) {
130 continue;
131 }
132 match frame.port() {
133 Ok(m17core::kiss::PORT_PACKET_BASIC) => {
134 // no action
135 // we will handle the more full-featured version from from port 1
136 }
137 Ok(m17core::kiss::PORT_PACKET_FULL) => {
138 let mut payload = [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
139 let Ok(n) = frame.decode_payload(&mut payload) else {
140 debug!("failed to decode payload from KISS frame");
141 continue;
142 };
143 if n < 33 {
144 debug!("unusually short full packet frame");
145 continue;
146 }
147 let lsf = LsfFrame(payload[0..30].try_into().unwrap());
148 if lsf.crc() != 0 {
149 debug!("LSF in full packet frame did not pass CRC");
150 continue;
151 }
152 if lsf.encryption_type() != EncryptionType::None {
153 debug!("we only understand None encryption for now - skipping packet");
154 continue;
155 }
156 let Some((packet_type, type_len)) = PacketType::from_proto(&payload[30..n])
157 else {
158 debug!("failed to decode packet type");
159 continue;
160 };
161 if (n - 30 - type_len) < 2 {
162 debug!("packet payload too small to provide CRC");
163 continue;
164 }
165 let packet_crc = m17core::crc::m17_crc(&payload[30..n]);
166 if packet_crc != 0 {
167 debug!("packet CRC does not pass");
168 continue;
169 }
170 let packet_payload: Arc<[u8]> =
171 Arc::from(&payload[(30 + type_len)..(n - 2)]);
172
173 let subs: Vec<_> =
174 listeners.read().unwrap().packet.values().cloned().collect();
175 for s in subs {
176 s.packet_received(
177 lsf.clone(),
178 packet_type.clone(),
179 packet_payload.clone(),
180 );
181 }
182 }
183 Ok(m17core::kiss::PORT_STREAM) => {
184 // handle stream and send it to subscribers
185 }
186 _ => (),
187 }
188 }
189 }
190 });
191 }
192
193 fn spawn_writer<T: Tnc + Send + 'static>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
194 std::thread::spawn(move || {
195 while let Ok(ev) = event_rx.recv() {
196 match ev {
197 TncControlEvent::Kiss(k) => {
198 if let Err(e) = tnc.write_all(&k.as_bytes()) {
199 debug!("kiss send err: {:?}", e);
200 return;
201 }
202 }
203 TncControlEvent::Start => {
204 if let Err(e) = tnc.start() {
205 debug!("tnc start err: {:?}", e);
206 return;
207 }
208 }
209 TncControlEvent::Close => {
210 if let Err(e) = tnc.close() {
211 debug!("tnc close err: {:?}", e);
212 return;
213 }
214 }
215 }
216 }
217 });
218 }