]>
code.octet-stream.net Git - m17rt/blob - m17app/src/app.rs
4a7a345f5fa1bf709fa0f2cb5e1a52c5f4caf0f2
1 use crate::adapter
::{PacketAdapter
, StreamAdapter
};
3 use m17core
::kiss
::{KissBuffer
, KissCommand
, KissFrame
};
4 use m17core
::protocol
::{EncryptionType
, LsfFrame
, PacketType
};
7 use std
::collections
::HashMap
;
9 use std
::sync
::{Arc
, RwLock
};
12 listeners
: Arc
<RwLock
<Listeners
>>,
13 event_tx
: mpsc
::SyncSender
<TncControlEvent
>,
17 pub fn new
<T
: Tnc
+ Send
+ '
static>(mut tnc
: T
) -> Self {
18 let write_tnc
= tnc
.try_clone().unwrap
();
19 let (event_tx
, event_rx
) = mpsc
::sync_channel(128);
20 let listeners
= Arc
::new(RwLock
::new(Listeners
::new()));
21 spawn_reader(tnc
, listeners
.clone());
22 spawn_writer(write_tnc
, event_rx
);
29 pub fn add_packet_listener
<P
: PacketAdapter
+ '
static>(&self, listener
: P
) -> usize {
30 let mut listeners
= self.listeners
.write().unwrap
();
31 let id
= listeners
.next
;
33 listeners
.packet
.insert
(id
, Arc
::new(listener
));
37 pub fn add_stream_listener
<S
: StreamAdapter
+ '
static>(&self, listener
: S
) -> usize {
38 let mut listeners
= self.listeners
.write().unwrap
();
39 let id
= listeners
.next
;
41 listeners
.stream
.insert
(id
, Arc
::new(listener
));
45 pub fn remove_packet_listener(&self, id
: usize) {
46 self.listeners
.write().unwrap
().packet
.remove(&id
);
49 pub fn remove_stream_listener(&self, id
: usize) {
50 self.listeners
.write().unwrap
().stream
.remove(&id
);
53 pub fn transmit_packet(&self, type_code
: PacketType
, payload
: &[u8]) {
54 // hang on where do we get the LSF details from? We need a destination obviously
55 // our source address needs to be configured here too
56 // also there is possible CAN, encryption, meta payload
58 // we will immediately convert this into a KISS payload before sending into channel so we only need borrow on data
61 /// Create a handle that can be used to transmit data on the TNC
62 pub fn tx(&self) -> TxHandle
{
64 event_tx
: self.event_tx
.clone(),
69 let _
= self.event_tx
.send(TncControlEvent
::Start
);
73 let _
= self.event_tx
.send(TncControlEvent
::Close
);
78 event_tx
: mpsc
::SyncSender
<TncControlEvent
>,
82 // add more methods here for stream outgoing
84 pub fn transmit_stream_start(&self /* lsf?, payload? what needs to be configured ?! */) {}
86 // as long as there is only one TNC it is implied there is only ever one stream transmission in flight
88 pub fn transmit_stream_next(&self, /* next payload, */ end_of_stream
: bool
) {}
91 /// Synchronised structure for listeners subscribing to packets and streams.
93 /// Each listener will be notified in turn of each event.
95 /// Identifier to be assigned to the next listener, starting from 0
97 packet
: HashMap
<usize, Arc
<dyn PacketAdapter
>>,
98 stream
: HashMap
<usize, Arc
<dyn StreamAdapter
>>,
105 packet
: HashMap
::new(),
106 stream
: HashMap
::new(),
111 /// Carries a request from a method on M17App to the TNC's writer thread, which will execute it.
112 enum TncControlEvent
{
118 fn spawn_reader
<T
: Tnc
+ Send
+ '
static>(mut tnc
: T
, listeners
: Arc
<RwLock
<Listeners
>>) {
119 std
::thread
::spawn(move || {
120 let mut kiss_buffer
= KissBuffer
::new();
122 let mut buf
= kiss_buffer
.buf_remaining();
123 let n
= match tnc
.read(&mut buf
) {
127 kiss_buffer
.did_write(n
);
128 while let Some(frame
) = kiss_buffer
.next_frame() {
129 if frame
.command() != Ok(KissCommand
::DataFrame
) {
133 Ok(m17core
::kiss
::PORT_PACKET_BASIC
) => {
135 // we will handle the more full-featured version from from port 1
137 Ok(m17core
::kiss
::PORT_PACKET_FULL
) => {
138 let mut payload
= [0u8; 855]; // 30 byte LSF + 825 byte packet including CRC
139 let Ok(n
) = frame
.decode_payload(&mut payload
) else {
140 debug
!("failed to decode payload from KISS frame");
144 debug
!("unusually short full packet frame");
147 let lsf
= LsfFrame(payload
[0..30].try_into().unwrap
());
149 debug
!("LSF in full packet frame did not pass CRC");
152 if lsf
.encryption_type() != EncryptionType
::None
{
153 debug
!("we only understand None encryption for now - skipping packet");
156 let Some((packet_type
, type_len
)) = PacketType
::from_proto(&payload
[30..n
])
158 debug
!("failed to decode packet type");
161 if (n
- 30 - type_len
) < 2 {
162 debug
!("packet payload too small to provide CRC");
165 let packet_crc
= m17core
::crc
::m17_crc(&payload
[30..n
]);
167 debug
!("packet CRC does not pass");
170 let packet_payload
: Arc
<[u8]> =
171 Arc
::from(&payload
[(30 + type_len
)..(n
- 2)]);
174 listeners
.read().unwrap
().packet
.values().cloned().collect();
179 packet_payload
.clone(),
183 Ok(m17core
::kiss
::PORT_STREAM
) => {
184 // handle stream and send it to subscribers
193 fn spawn_writer
<T
: Tnc
+ Send
+ '
static>(mut tnc
: T
, event_rx
: mpsc
::Receiver
<TncControlEvent
>) {
194 std
::thread
::spawn(move || {
195 while let Ok(ev
) = event_rx
.recv() {
197 TncControlEvent
::Kiss(k
) => {
198 if let Err(e
) = tnc
.write_all(&k
.as_bytes()) {
199 debug
!("kiss send err: {:?}", e
);
203 TncControlEvent
::Start
=> {
204 if let Err(e
) = tnc
.start() {
205 debug
!("tnc start err: {:?}", e
);
209 TncControlEvent
::Close
=> {
210 if let Err(e
) = tnc
.close() {
211 debug
!("tnc close err: {:?}", e
);