X-Git-Url: https://code.octet-stream.net/m17rt/blobdiff_plain/21b7d95e42fd374a33e0f18d3e3f406642e46322..9e40113187a3eb54335de0277f8038ee35d9428a:/m17app/src/app.rs diff --git a/m17app/src/app.rs b/m17app/src/app.rs index 4a7a345..05e3167 100644 --- a/m17app/src/app.rs +++ b/m17app/src/app.rs @@ -9,7 +9,7 @@ use std::sync::mpsc; use std::sync::{Arc, RwLock}; pub struct M17App { - listeners: Arc>, + adapters: Arc>, event_tx: mpsc::SyncSender, } @@ -17,40 +17,50 @@ impl M17App { pub fn new(mut tnc: T) -> Self { let write_tnc = tnc.try_clone().unwrap(); let (event_tx, event_rx) = mpsc::sync_channel(128); - let listeners = Arc::new(RwLock::new(Listeners::new())); + let listeners = Arc::new(RwLock::new(Adapters::new())); spawn_reader(tnc, listeners.clone()); spawn_writer(write_tnc, event_rx); Self { - listeners, + adapters: listeners, event_tx, } } - pub fn add_packet_listener(&self, listener: P) -> usize { - let mut listeners = self.listeners.write().unwrap(); - let id = listeners.next; - listeners.next += 1; - listeners.packet.insert(id, Arc::new(listener)); + pub fn add_packet_adapter(&self, adapter: P) -> usize { + let adapter = Arc::new(adapter); + let mut adapters = self.adapters.write().unwrap(); + let id = adapters.next; + adapters.next += 1; + adapters.packet.insert(id, adapter.clone()); + drop(adapters); + adapter.adapter_registered(id, self.tx()); id } - pub fn add_stream_listener(&self, listener: S) -> usize { - let mut listeners = self.listeners.write().unwrap(); - let id = listeners.next; - listeners.next += 1; - listeners.stream.insert(id, Arc::new(listener)); + pub fn add_stream_adapter(&self, adapter: S) -> usize { + let adapter = Arc::new(adapter); + let mut adapters = self.adapters.write().unwrap(); + let id = adapters.next; + adapters.next += 1; + adapters.stream.insert(id, adapter.clone()); + drop(adapters); + adapter.adapter_registered(id, self.tx()); id } - pub fn remove_packet_listener(&self, id: usize) { - self.listeners.write().unwrap().packet.remove(&id); + pub fn remove_packet_adapter(&self, id: usize) { + if let Some(a) = self.adapters.write().unwrap().packet.remove(&id) { + a.adapter_removed(); + } } - pub fn remove_stream_listener(&self, id: usize) { - self.listeners.write().unwrap().stream.remove(&id); + pub fn remove_stream_adapter(&self, id: usize) { + if let Some(a) = self.adapters.write().unwrap().stream.remove(&id) { + a.adapter_removed(); + } } - pub fn transmit_packet(&self, type_code: PacketType, payload: &[u8]) { + pub fn transmit_packet(&self, packet_type: PacketType, payload: &[u8]) { // hang on where do we get the LSF details from? We need a destination obviously // our source address needs to be configured here too // also there is possible CAN, encryption, meta payload @@ -91,14 +101,14 @@ impl TxHandle { /// Synchronised structure for listeners subscribing to packets and streams. /// /// Each listener will be notified in turn of each event. -struct Listeners { +struct Adapters { /// Identifier to be assigned to the next listener, starting from 0 next: usize, packet: HashMap>, stream: HashMap>, } -impl Listeners { +impl Adapters { fn new() -> Self { Self { next: 0, @@ -115,7 +125,7 @@ enum TncControlEvent { Close, } -fn spawn_reader(mut tnc: T, listeners: Arc>) { +fn spawn_reader(mut tnc: T, listeners: Arc>) { std::thread::spawn(move || { let mut kiss_buffer = KissBuffer::new(); loop {