X-Git-Url: https://code.octet-stream.net/m17rt/blobdiff_plain/9e40113187a3eb54335de0277f8038ee35d9428a..b8702d115ebc071a9ae97e9aa9c4bc00196b5e26:/m17app/src/app.rs?ds=sidebyside diff --git a/m17app/src/app.rs b/m17app/src/app.rs index 05e3167..8130455 100644 --- a/m17app/src/app.rs +++ b/m17app/src/app.rs @@ -125,9 +125,10 @@ enum TncControlEvent { Close, } -fn spawn_reader(mut tnc: T, listeners: Arc>) { +fn spawn_reader(mut tnc: T, adapters: Arc>) { std::thread::spawn(move || { let mut kiss_buffer = KissBuffer::new(); + let mut stream_running = false; loop { let mut buf = kiss_buffer.buf_remaining(); let n = match tnc.read(&mut buf) { @@ -181,7 +182,7 @@ fn spawn_reader(mut tnc: T, listeners: Arc = - listeners.read().unwrap().packet.values().cloned().collect(); + adapters.read().unwrap().packet.values().cloned().collect(); for s in subs { s.packet_received( lsf.clone(), @@ -191,7 +192,47 @@ fn spawn_reader(mut tnc: T, listeners: Arc { - // handle stream and send it to subscribers + let mut payload = [0u8; 32]; + let Ok(n) = frame.decode_payload(&mut payload) else { + debug!("failed to decode stream payload from KISS frame"); + continue; + }; + if n == 30 { + let lsf = LsfFrame(payload[0..30].try_into().unwrap()); + if lsf.crc() != 0 { + debug!("initial LSF in stream did not pass CRC"); + continue; + } + stream_running = true; + let subs: Vec<_> = + adapters.read().unwrap().stream.values().cloned().collect(); + for s in subs { + s.stream_began(lsf.clone()); + } + } else if n == 26 { + if !stream_running { + debug!("ignoring stream data as we didn't get a valid LSF first"); + continue; + } + // TODO: parse LICH and handle the different changing subvalues META could have + if m17core::crc::m17_crc(&payload[6..n]) != 0 { + debug!("stream data CRC mismatch"); + continue; + } + let mut frame_number = u16::from_be_bytes([payload[6], payload[7]]); + let is_final = (frame_number & 0x8000) > 0; + frame_number &= 0x7fff; + let data: [u8; 16] = payload[8..24].try_into().unwrap(); + let data = Arc::new(data); + if is_final { + stream_running = false; + } + let subs: Vec<_> = + adapters.read().unwrap().stream.values().cloned().collect(); + for s in subs { + s.stream_data(frame_number, is_final, data.clone()); + } + } } _ => (), }