]> code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
9c2815de50c4fbe53cc6280ef2a47ebe2630094c
[m17rt] / m17app / src / app.rs
1 use crate::tnc::Tnc;
2 use m17core::kiss::{KissBuffer, KissCommand, KissFrame};
3 use m17core::protocol::PacketType;
4 use m17core::traits::{PacketListener, StreamListener};
5
6 use log::debug;
7 use std::collections::HashMap;
8 use std::io::{Read, Write};
9 use std::sync::mpsc;
10 use std::sync::{Arc, RwLock};
11
12 pub struct M17App {
13 listeners: Arc<RwLock<Listeners>>,
14 event_tx: mpsc::SyncSender<TncControlEvent>,
15 }
16
17 impl M17App {
18 pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
19 let write_tnc = tnc.try_clone().unwrap();
20 let (event_tx, event_rx) = mpsc::sync_channel(128);
21 let listeners = Arc::new(RwLock::new(Listeners::new()));
22 spawn_reader(tnc, listeners.clone());
23 spawn_writer(write_tnc, event_rx);
24 Self {
25 listeners,
26 event_tx,
27 }
28 }
29
30 pub fn add_packet_listener<P: PacketListener + 'static>(&self, listener: P) -> usize {
31 let mut listeners = self.listeners.write().unwrap();
32 let id = listeners.next;
33 listeners.next += 1;
34 listeners.packet.insert(id, Box::new(listener));
35 id
36 }
37
38 pub fn add_stream_listener<S: StreamListener + 'static>(&self, listener: S) -> usize {
39 let mut listeners = self.listeners.write().unwrap();
40 let id = listeners.next;
41 listeners.next += 1;
42 listeners.stream.insert(id, Box::new(listener));
43 id
44 }
45
46 pub fn remove_packet_listener(&self, id: usize) {
47 self.listeners.write().unwrap().packet.remove(&id);
48 }
49
50 pub fn remove_stream_listener(&self, id: usize) {
51 self.listeners.write().unwrap().stream.remove(&id);
52 }
53
54 pub fn transmit_packet(&self, type_code: PacketType, payload: &[u8]) {
55 // hang on where do we get the LSF details from? We need a destination obviously
56 // our source address needs to be configured here too
57 // also there is possible CAN, encryption, meta payload
58
59 // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
60 }
61
62 // add more methods here for stream outgoing
63
64 pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {}
65
66 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
67
68 pub fn transmit_stream_next(&self, /* next payload, */ end_of_stream: bool) {}
69
70 pub fn start(&self) {
71 let _ = self.event_tx.send(TncControlEvent::Start);
72 }
73
74 pub fn close(&self) {
75 let _ = self.event_tx.send(TncControlEvent::Close);
76 }
77 }
78
79 /// Synchronised structure for listeners subscribing to packets and streams.
80 ///
81 /// Each listener will be notified in turn of each event.
82 struct Listeners {
83 /// Identifier to be assigned to the next listener, starting from 0
84 next: usize,
85 packet: HashMap<usize, Box<dyn PacketListener>>,
86 stream: HashMap<usize, Box<dyn StreamListener>>,
87 }
88
89 impl Listeners {
90 fn new() -> Self {
91 Self {
92 next: 0,
93 packet: HashMap::new(),
94 stream: HashMap::new(),
95 }
96 }
97 }
98
99 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
100 enum TncControlEvent {
101 Kiss(KissFrame),
102 Start,
103 Close,
104 }
105
106 fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Listeners>>) {
107 std::thread::spawn(move || {
108 let mut kiss_buffer = KissBuffer::new();
109 loop {
110 let mut buf = kiss_buffer.buf_remaining();
111 let n = match tnc.read(&mut buf) {
112 Ok(n) => n,
113 Err(_) => break,
114 };
115 kiss_buffer.did_write(n);
116 while let Some(frame) = kiss_buffer.next_frame() {
117 if frame.command() != Ok(KissCommand::DataFrame) {
118 continue;
119 }
120 match frame.port() {
121 Ok(0) => {
122 // handle basic frame and send it to subscribers
123 }
124 Ok(1) => {
125 // handle full frame and send it to subscribers - I guess they need to know the type, probably not the CRC
126 }
127 Ok(2) => {
128 // handle stream and send it to subscribers
129 }
130 _ => (),
131 }
132 }
133 }
134 });
135 }
136
137 fn spawn_writer<T: Tnc + Send + 'static>(mut tnc: T, event_rx: mpsc::Receiver<TncControlEvent>) {
138 std::thread::spawn(move || {
139 while let Ok(ev) = event_rx.recv() {
140 match ev {
141 TncControlEvent::Kiss(k) => {
142 if let Err(e) = tnc.write_all(&k.as_bytes()) {
143 debug!("kiss send err: {:?}", e);
144 return;
145 }
146 }
147 TncControlEvent::Start => {
148 if let Err(e) = tnc.start() {
149 debug!("tnc start err: {:?}", e);
150 return;
151 }
152 }
153 TncControlEvent::Close => {
154 if let Err(e) = tnc.close() {
155 debug!("tnc close err: {:?}", e);
156 return;
157 }
158 }
159 }
160 }
161 });
162 }