]> code.octet-stream.net Git - m17rt/commitdiff
Extend adapter lifecycle, decide on some stream callbacks
authorThomas Karpiniec <tom.karpiniec@outlook.com>
Mon, 30 Dec 2024 02:38:31 +0000 (13:38 +1100)
committerThomas Karpiniec <tom.karpiniec@outlook.com>
Mon, 30 Dec 2024 02:38:31 +0000 (13:38 +1100)
m17app/src/adapter.rs
m17app/src/app.rs
m17core/src/protocol.rs

index 1bc09719526278c23ab5fe59cb5b0a9e121075ad..e31234b6d90d42e484664beefb11cef30907a20e 100644 (file)
@@ -3,10 +3,24 @@ use m17core::protocol::{LsfFrame, PacketType};
 use std::sync::Arc;
 
 pub trait PacketAdapter: Send + Sync + 'static {
-    fn adapter_registered(&self, handle: TxHandle);
+    fn adapter_registered(&self, id: usize, handle: TxHandle);
+    fn adapter_removed(&self);
     fn tnc_started(&self);
     fn tnc_closed(&self);
     fn packet_received(&self, lsf: LsfFrame, packet_type: PacketType, content: Arc<[u8]>);
 }
 
-pub trait StreamAdapter: Send + Sync + 'static {}
+pub trait StreamAdapter: Send + Sync + 'static {
+    fn adapter_registered(&self, id: usize, handle: TxHandle);
+    fn adapter_removed(&self);
+    fn tnc_started(&self);
+    fn tnc_closed(&self);
+    fn stream_began(&self, lsf: LsfFrame);
+    fn stream_data(&self, frame_number: u16, is_final: bool, data: Arc<[u8; 16]>);
+    fn stream_lost(&self);
+
+    // TODO
+    // fn stream_assembled_text_block()
+    // fn stream_gnss_data()
+    // fn stream_extended_callsign_data()
+}
index 4a7a345f5fa1bf709fa0f2cb5e1a52c5f4caf0f2..05e3167139eb0f9ccadd27838643d3354970aaa2 100644 (file)
@@ -9,7 +9,7 @@ use std::sync::mpsc;
 use std::sync::{Arc, RwLock};
 
 pub struct M17App {
-    listeners: Arc<RwLock<Listeners>>,
+    adapters: Arc<RwLock<Adapters>>,
     event_tx: mpsc::SyncSender<TncControlEvent>,
 }
 
@@ -17,40 +17,50 @@ impl M17App {
     pub fn new<T: Tnc + Send + 'static>(mut tnc: T) -> Self {
         let write_tnc = tnc.try_clone().unwrap();
         let (event_tx, event_rx) = mpsc::sync_channel(128);
-        let listeners = Arc::new(RwLock::new(Listeners::new()));
+        let listeners = Arc::new(RwLock::new(Adapters::new()));
         spawn_reader(tnc, listeners.clone());
         spawn_writer(write_tnc, event_rx);
         Self {
-            listeners,
+            adapters: listeners,
             event_tx,
         }
     }
 
-    pub fn add_packet_listener<P: PacketAdapter + 'static>(&self, listener: P) -> usize {
-        let mut listeners = self.listeners.write().unwrap();
-        let id = listeners.next;
-        listeners.next += 1;
-        listeners.packet.insert(id, Arc::new(listener));
+    pub fn add_packet_adapter<P: PacketAdapter + 'static>(&self, adapter: P) -> usize {
+        let adapter = Arc::new(adapter);
+        let mut adapters = self.adapters.write().unwrap();
+        let id = adapters.next;
+        adapters.next += 1;
+        adapters.packet.insert(id, adapter.clone());
+        drop(adapters);
+        adapter.adapter_registered(id, self.tx());
         id
     }
 
-    pub fn add_stream_listener<S: StreamAdapter + 'static>(&self, listener: S) -> usize {
-        let mut listeners = self.listeners.write().unwrap();
-        let id = listeners.next;
-        listeners.next += 1;
-        listeners.stream.insert(id, Arc::new(listener));
+    pub fn add_stream_adapter<S: StreamAdapter + 'static>(&self, adapter: S) -> usize {
+        let adapter = Arc::new(adapter);
+        let mut adapters = self.adapters.write().unwrap();
+        let id = adapters.next;
+        adapters.next += 1;
+        adapters.stream.insert(id, adapter.clone());
+        drop(adapters);
+        adapter.adapter_registered(id, self.tx());
         id
     }
 
-    pub fn remove_packet_listener(&self, id: usize) {
-        self.listeners.write().unwrap().packet.remove(&id);
+    pub fn remove_packet_adapter(&self, id: usize) {
+        if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) {
+            a.adapter_removed();
+        }
     }
 
-    pub fn remove_stream_listener(&self, id: usize) {
-        self.listeners.write().unwrap().stream.remove(&id);
+    pub fn remove_stream_adapter(&self, id: usize) {
+        if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) {
+            a.adapter_removed();
+        }
     }
 
-    pub fn transmit_packet(&self, type_code: PacketType, payload: &[u8]) {
+    pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) {
         // hang on where do we get the LSF details from? We need a destination obviously
         // our source address needs to be configured here too
         // also there is possible CAN, encryption, meta payload
@@ -91,14 +101,14 @@ impl TxHandle {
 /// Synchronised structure for listeners subscribing to packets and streams.
 ///
 /// Each listener will be notified in turn of each event.
-struct Listeners {
+struct Adapters {
     /// Identifier to be assigned to the next listener, starting from 0
     next: usize,
     packet: HashMap<usize, Arc<dyn PacketAdapter>>,
     stream: HashMap<usize, Arc<dyn StreamAdapter>>,
 }
 
-impl Listeners {
+impl Adapters {
     fn new() -> Self {
         Self {
             next: 0,
@@ -115,7 +125,7 @@ enum TncControlEvent {
     Close,
 }
 
-fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Listeners>>) {
+fn spawn_reader<T: Tnc + Send + 'static>(mut tnc: T, listeners: Arc<RwLock<Adapters>>) {
     std::thread::spawn(move || {
         let mut kiss_buffer = KissBuffer::new();
         loop {
index 382fb9fa516461a2078cf9032d5afe262f15dec7..375c3f0f8533fd56cf9a439ddf9b1bee56f0e1e7 100755 (executable)
@@ -133,6 +133,8 @@ impl LsfFrame {
         }
     }
 
+    // TODO: encryption sub-type
+
     pub fn channel_access_number(&self) -> u8 {
         (self.0[12] >> 7) & 0x0f
     }