use m17app::app::M17App;
-use m17app::soundmodem::{InputRrcFile, InputSoundcard, Soundmodem};
+use m17app::soundmodem::{InputRrcFile, InputSoundcard, NullOutputSink, Soundmodem};
use m17codec2::Codec2Adapter;
use std::path::PathBuf;
let path = PathBuf::from("../../Data/test_vk7xt.rrc");
let source = InputRrcFile::new(path);
//let source = InputSoundcard::new();
- let soundmodem = Soundmodem::new_with_input(source);
+ let soundmodem = Soundmodem::new_with_input_and_output(source, NullOutputSink::new());
let app = M17App::new(soundmodem);
app.add_stream_adapter(Codec2Adapter::new());
app.start();
continue;
}
let lsf = LsfFrame(payload[0..30].try_into().unwrap());
- if lsf.crc() != 0 {
+ if lsf.check_crc() != 0 {
debug!("LSF in full packet frame did not pass CRC");
continue;
}
};
if n == 30 {
let lsf = LsfFrame(payload[0..30].try_into().unwrap());
- if lsf.crc() != 0 {
+ if lsf.check_crc() != 0 {
debug!("initial LSF in stream did not pass CRC");
continue;
}
-use std::io::{self, ErrorKind, Read, Write};
-use std::sync::RwLock;
-use std::collections::VecDeque;
use crate::tnc::{Tnc, TncError};
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
use m17core::kiss::MAX_FRAME_LEN;
use m17core::modem::{Demodulator, Modulator, ModulatorAction, SoftDemodulator, SoftModulator};
use m17core::tnc::SoftTnc;
+use std::collections::VecDeque;
use std::fs::File;
+use std::io::{self, ErrorKind, Read, Write};
use std::path::PathBuf;
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TryRecvError};
+use std::sync::RwLock;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
// must create TNC here
let (event_tx, event_rx) = sync_channel(128);
let (kiss_out_tx, kiss_out_rx) = sync_channel(128);
- spawn_soundmodem_worker(event_tx.clone(), event_rx, kiss_out_tx, Box::new(input), Box::new(output));
+ spawn_soundmodem_worker(
+ event_tx.clone(),
+ event_rx,
+ kiss_out_tx,
+ Box::new(input),
+ Box::new(output),
+ );
Self {
event_tx,
kiss_out_rx: Arc::new(Mutex::new(kiss_out_rx)),
BasebandInput(Arc<[i16]>),
Start,
Close,
- DidReadFromOutputBuffer {
- len: usize,
- timestamp: Instant,
- },
+ DidReadFromOutputBuffer { len: usize, timestamp: Instant },
OutputUnderrun,
}
let out_buffer = Arc::new(RwLock::new(OutputBuffer::new()));
let mut out_samples = [0i16; 1024];
let start = Instant::now();
+ let mut ptt = false;
while let Ok(ev) = event_rx.recv() {
// Update clock on TNC before we do anything
let sample_time = (start.elapsed().as_nanos() / 48000) as u64;
tnc.set_now(sample_time);
-
+
// Handle event
match ev {
SoundmodemEvent::Kiss(k) => {
SoundmodemEvent::Start => {
input.start(event_tx.clone());
output.start(event_tx.clone(), out_buffer.clone());
- },
+ }
SoundmodemEvent::Close => break,
SoundmodemEvent::DidReadFromOutputBuffer { len, timestamp } => {
let (occupied, internal_latency) = {
(out_buffer.samples.len(), out_buffer.latency)
};
let internal_latency = (internal_latency.as_secs_f32() * 48000.0) as usize;
- let dynamic_latency = len.saturating_sub((timestamp.elapsed().as_secs_f32() * 48000.0) as usize);
- modulator.update_output_buffer(occupied, 48000, internal_latency + dynamic_latency);
- },
+ let dynamic_latency =
+ len.saturating_sub((timestamp.elapsed().as_secs_f32() * 48000.0) as usize);
+ modulator.update_output_buffer(
+ occupied,
+ 48000,
+ internal_latency + dynamic_latency,
+ );
+ }
SoundmodemEvent::OutputUnderrun => {
// TODO: cancel transmission, send empty data frame to host
}
}
-
+
+ // Update PTT state
+ let new_ptt = tnc.ptt();
+ if new_ptt != ptt {
+ if new_ptt {
+ // turn it on
+ } else {
+ // turn it off
+ }
+ }
+ ptt = new_ptt;
+
// Let the modulator do what it wants
while let Some(action) = modulator.run() {
match action {
ModulatorAction::SetIdle(idling) => {
out_buffer.write().unwrap().idling = idling;
- },
+ }
ModulatorAction::GetNextFrame => {
modulator.provide_next_frame(tnc.read_tx_frame());
- },
- ModulatorAction::ReadOutput => {
- loop {
- let n = modulator.read_output_samples(&mut out_samples);
- if n == 0 {
- break;
- }
- let mut out_buffer = out_buffer.write().unwrap();
- for s in &out_samples[0..n] {
- out_buffer.samples.push_back(*s);
- }
+ }
+ ModulatorAction::ReadOutput => loop {
+ let n = modulator.read_output_samples(&mut out_samples);
+ if n == 0 {
+ break;
+ }
+ let mut out_buffer = out_buffer.write().unwrap();
+ for s in &out_samples[0..n] {
+ out_buffer.samples.push_back(*s);
}
-
},
ModulatorAction::TransmissionWillEnd(in_samples) => {
tnc.set_tx_end_time(in_samples);
- },
+ }
}
}
}
if end_rx.try_recv() != Err(TryRecvError::Empty) {
break;
}
-
+
let mut buffer = buffer.write().unwrap();
for out in buf.chunks_mut(2) {
if let Some(s) = buffer.samples.pop_front() {
break;
}
}
-
+ });
+ *self.end_tx.lock().unwrap() = Some(end_tx);
+ }
+
+ fn close(&self) {
+ let _ = self.end_tx.lock().unwrap().take();
+ }
+}
+
+pub struct NullOutputSink {
+ end_tx: Mutex<Option<Sender<()>>>,
+}
+
+impl NullOutputSink {
+ pub fn new() -> Self {
+ Self {
+ end_tx: Mutex::new(None),
+ }
+ }
+}
+
+impl OutputSink for NullOutputSink {
+ fn start(&self, event_tx: SyncSender<SoundmodemEvent>, buffer: Arc<RwLock<OutputBuffer>>) {
+ let (end_tx, end_rx) = channel();
+ std::thread::spawn(move || {
+ // assuming 48 kHz for now
+ const TICK: Duration = Duration::from_millis(25);
+ const SAMPLES_PER_TICK: usize = 1200;
+ let mut next_tick = Instant::now() + TICK;
+
+ loop {
+ std::thread::sleep(next_tick.duration_since(Instant::now()));
+ next_tick = next_tick + TICK;
+ if end_rx.try_recv() != Err(TryRecvError::Empty) {
+ break;
+ }
+
+ let mut buffer = buffer.write().unwrap();
+ for _ in 0..SAMPLES_PER_TICK {
+ if !buffer.samples.pop_front().is_some() && !buffer.idling {
+ debug!("null output had underrun");
+ let _ = event_tx.send(SoundmodemEvent::OutputUnderrun);
+ break;
+ }
+ }
+ }
});
*self.end_tx.lock().unwrap() = Some(end_tx);
}
callsign
}
-#[allow(dead_code)]
pub fn encode_address(address: &Address) -> [u8; 6] {
let mut out: u64 = 0;
match address {
None => return None,
};
debug!("full lsf: {:?}", lsf.0);
- let crc = lsf.crc();
+ let crc = lsf.check_crc();
debug!("recv crc: {:04X}", crc);
debug!("destination: {:?}", lsf.destination());
debug!("source: {:?}", lsf.source());
Some(packet) => packet,
None => return None,
};
+ // TODO: the spec is inconsistent about which bit in packet[25] is EOF
+ // https://github.com/M17-Project/M17_spec/issues/147
let final_frame = (packet[25] & 0x04) > 0;
let number = packet[25] >> 3;
let counter = if final_frame {
}
PacketFrameCounter::FinalFrame { payload_len } => {
type1[0..payload_len].copy_from_slice(&frame.payload[0..payload_len]);
- type1[25] = (payload_len as u8) << 3 | 0x04;
+ type1[25] = ((payload_len as u8) << 3) | 0x04;
}
}
let type3 = fec::encode(&type1, 206, p_3);
-use crate::address::Address;
+use crate::address::{encode_address, Address};
pub(crate) const LSF_SYNC: [i8; 8] = [1, 1, 1, 1, -1, -1, 1, -1];
pub(crate) const BERT_SYNC: [i8; 8] = [-1, 1, -1, -1, 1, 1, 1, 1];
pub(crate) const STREAM_SYNC: [i8; 8] = [-1, -1, -1, -1, 1, 1, -1, 1];
pub(crate) const PACKET_SYNC: [i8; 8] = [1, -1, 1, 1, -1, -1, -1, -1];
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum Mode {
Packet,
Stream,
}
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum DataType {
Reserved,
Data,
Voice,
VoiceAndData,
}
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum EncryptionType {
None,
Scrambler,
// BERT
}
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum PacketType {
/// RAW
Raw,
pub struct LsfFrame(pub [u8; 30]);
impl LsfFrame {
- pub fn crc(&self) -> u16 {
+ /// Calculate crc of entire frame. If zero, it is a valid frame.
+ pub fn check_crc(&self) -> u16 {
crate::crc::m17_crc(&self.0)
}
}
pub fn mode(&self) -> Mode {
- if self.0[12] & 0x01 > 0 {
+ if self.lsf_type() & 0x0001 > 0 {
Mode::Stream
} else {
Mode::Packet
}
pub fn encryption_type(&self) -> EncryptionType {
- match (self.0[12] >> 3) & 0x03 {
+ match (self.lsf_type() >> 3) & 0x0003 {
0b00 => EncryptionType::None,
0b01 => EncryptionType::Scrambler,
0b10 => EncryptionType::Aes,
// TODO: encryption sub-type
pub fn channel_access_number(&self) -> u8 {
- (self.0[12] >> 7) & 0x0f
+ ((self.lsf_type() >> 7) & 0x000f) as u8
}
pub fn meta(&self) -> [u8; 14] {
self.0[14..28].try_into().unwrap()
}
+
+ pub fn set_destination(&mut self, destination: &Address) {
+ self.0[0..6].copy_from_slice(&encode_address(&destination));
+ self.recalculate_crc();
+ }
+
+ pub fn set_source(&mut self, source: &Address) {
+ self.0[6..12].copy_from_slice(&encode_address(&source));
+ self.recalculate_crc();
+ }
+
+ pub fn set_mode(&mut self, mode: Mode) {
+ let existing_type = self.lsf_type();
+ let new_type = (existing_type & !0x0001) | if mode == Mode::Stream { 1 } else { 0 };
+ self.0[12..14].copy_from_slice(&new_type.to_be_bytes());
+ self.recalculate_crc();
+ }
+
+ pub fn set_data_type(&mut self, data_type: DataType) {
+ let type_part = match data_type {
+ DataType::Reserved => 0b00 << 1,
+ DataType::Data => 0b01 << 1,
+ DataType::Voice => 0b10 << 1,
+ DataType::VoiceAndData => 0b11 << 1,
+ };
+ let existing_type = self.lsf_type();
+ let new_type = (existing_type & !0x0006) | type_part;
+ self.0[12..14].copy_from_slice(&new_type.to_be_bytes());
+ self.recalculate_crc();
+ }
+
+ pub fn set_encryption_type(&mut self, encryption_type: EncryptionType) {
+ let type_part = match encryption_type {
+ EncryptionType::None => 0b00 << 3,
+ EncryptionType::Scrambler => 0b01 << 3,
+ EncryptionType::Aes => 0b10 << 3,
+ EncryptionType::Other => 0b11 << 3,
+ };
+ let existing_type = self.lsf_type();
+ let new_type = (existing_type & !0x0018) | type_part;
+ self.0[12..14].copy_from_slice(&new_type.to_be_bytes());
+ self.recalculate_crc();
+ }
+
+ fn recalculate_crc(&mut self) {
+ let new_crc = crate::crc::m17_crc(&self.0[0..28]);
+ self.0[28..30].copy_from_slice(&new_crc.to_be_bytes());
+ debug_assert_eq!(self.check_crc(), 0);
+ }
+
+ fn lsf_type(&self) -> u16 {
+ u16::from_be_bytes([self.0[12], self.0[13]])
+ }
}
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct StreamFrame {
/// Which LICH segment is given in this frame, from 0 to 5 inclusive
pub lich_idx: u8,
-use crate::kiss::{KissBuffer, KissFrame};
+use crate::kiss::{KissBuffer, KissFrame, PORT_PACKET_BASIC, PORT_PACKET_FULL, PORT_STREAM};
use crate::modem::ModulatorFrame;
-use crate::protocol::{Frame, LichCollection, LsfFrame, Mode, PacketFrameCounter};
+use crate::protocol::{
+ Frame, LichCollection, LsfFrame, Mode, PacketFrame, PacketFrameCounter, StreamFrame,
+};
/// Handles the KISS protocol and frame management for `SoftModulator` and `SoftDemodulator`.
///
/// Current RX or TX function of the TNC.
state: State,
+
+ /// Latest state of data carrier detect from demodulator - controls whether we can go to TX
+ dcd: bool,
+
+ /// Current monotonic time, counted in samples
+ now: u64,
+
+ // TODO: use a static ring buffer crate of some sort?
+ /// Circular buffer of packets enqueued for transmission
+ packet_queue: [PendingPacket; 4],
+
+ /// Next slot to fill
+ packet_next: usize,
+
+ /// Current packet index, which is either partly transmitted or not transmitted at all.
+ packet_curr: usize,
+
+ /// If true, packet_next == packet_curr implies full queue. packet_next is invalid.
+ /// If false, it implies empty queue.
+ packet_full: bool,
+
+ /// The LSF for a stream we are going to start transmitting.
+ ///
+ /// This serves as a general indicator that we want to tx a stream.
+ stream_pending_lsf: Option<LsfFrame>,
+
+ /// Circular buffer of stream data enqueued for transmission.
+ ///
+ /// When the queue empties out, we hope that the last one has the end-of-stream flag set.
+ /// Otherwise a buffer underrun has occurred.
+ ///
+ /// Overruns are less troublesome - we can drop frames and receiving stations should cope.
+ stream_queue: [StreamFrame; 8],
+
+ /// Next slot to fill
+ stream_next: usize,
+
+ /// Current unsent stream frame index
+ stream_curr: usize,
+
+ /// True if stream_next == stream_curr because the queue is full. stream_next is invalid.
+ stream_full: bool,
+
+ /// Should PTT be on right now? Polled by external
+ ptt: bool,
}
impl SoftTnc {
kiss_buffer: KissBuffer::new(),
outgoing_kiss: None,
state: State::Idle,
+ dcd: false,
+ now: 0,
+ packet_queue: Default::default(),
+ packet_next: 0,
+ packet_curr: 0,
+ packet_full: false,
+ stream_pending_lsf: None,
+ stream_queue: Default::default(),
+ stream_next: 0,
+ stream_curr: 0,
+ stream_full: false,
+ ptt: false,
}
}
let lsf = LsfFrame(maybe_lsf);
// LICH can change mid-transmission so wait until the CRC is correct
// to ensure (to high probability) we haven't done a "torn read"
- if lsf.crc() == 0 {
+ if lsf.check_crc() == 0 {
let kiss = KissFrame::new_stream_setup(&lsf.0).unwrap();
self.kiss_to_host(kiss);
// TODO: avoid discarding the first data payload here
}
}
- pub fn set_data_carrier_detect(&mut self, _dcd: bool) {}
-
- pub fn set_now(&mut self, samples: u64) {}
-
+ pub fn set_data_carrier_detect(&mut self, dcd: bool) {
+ self.dcd = dcd;
+ }
+
+ pub fn set_now(&mut self, now_samples: u64) {
+ self.now = now_samples;
+ match self.state {
+ State::TxEndingAtTime(time) => {
+ if now_samples >= time {
+ self.ptt = false;
+ self.state = State::Idle;
+ }
+ }
+ _ => (),
+ }
+ }
+
+ pub fn ptt(&self) -> bool {
+ self.ptt
+ }
+
pub fn set_tx_end_time(&mut self, in_samples: usize) {
- // This is a relative time from now, expressed in samples
- // Use the time from set_now() to decide when to drop PTT
+ match self.state {
+ State::TxEnding => {
+ self.state = State::TxEndingAtTime(self.now + in_samples as u64);
+ }
+ _ => (),
+ }
}
pub fn read_tx_frame(&mut self) -> Option<ModulatorFrame> {
- // yes we want to deal with frames here
- // it's important to establish successful decode that SoftDemodulator is aware of the frame innards
- None
+ match self.state {
+ State::Idle | State::RxAcquiringStream(_) | State::RxStream(_) | State::RxPacket(_) => {
+ // We will let CSMA decide whether to actually go ahead.
+ // That's not implemented yet, so let's just check DCD.
+ let channel_free = !self.dcd;
+ let stream_wants_to_tx = self.stream_pending_lsf.is_some();
+ let packet_wants_to_tx = self.packet_full || (self.packet_next != self.packet_curr);
+ if channel_free && stream_wants_to_tx {
+ self.state = State::TxStream;
+ } else if channel_free && packet_wants_to_tx {
+ self.state = State::TxPacket;
+ } else {
+ return None;
+ }
+ self.ptt = true;
+ // TODO: true txdelay
+ Some(ModulatorFrame::Preamble { tx_delay: 0 })
+ }
+ State::TxStream => {
+ if !self.stream_full && self.stream_next == self.stream_curr {
+ return None;
+ }
+ if let Some(lsf) = self.stream_pending_lsf.take() {
+ return Some(ModulatorFrame::Lsf(lsf));
+ }
+ let frame = self.stream_queue[self.stream_curr].clone();
+ if self.stream_full {
+ self.stream_full = false;
+ }
+ self.stream_curr = (self.stream_curr + 1) % 8;
+ if frame.end_of_stream {
+ self.state = State::Idle;
+ }
+ Some(ModulatorFrame::Stream(frame))
+ }
+ State::TxStreamSentEndOfStream => {
+ self.state = State::TxEnding;
+ Some(ModulatorFrame::EndOfTransmission)
+ }
+ State::TxPacket => {
+ if !self.packet_full && self.packet_next == self.packet_curr {
+ return None;
+ }
+ while self.packet_next != self.packet_curr {
+ match self.packet_queue[self.packet_curr].next_frame() {
+ Some(frame) => {
+ return Some(frame);
+ }
+ None => {
+ self.packet_curr = (self.packet_curr + 1) % 4;
+ }
+ }
+ }
+ self.state = State::TxEnding;
+ Some(ModulatorFrame::EndOfTransmission)
+ }
+ State::TxEnding | State::TxEndingAtTime(_) => {
+ // Once we have signalled EOT we withold any new frames until
+ // the channel fully clears and we are ready to TX again
+ None
+ }
+ }
}
/// Read KISS message to be sent to host.
}
}
+ /// Host sends in some KISS data.
pub fn write_kiss(&mut self, buf: &[u8]) -> usize {
let target_buf = self.kiss_buffer.buf_remaining();
let n = buf.len().min(target_buf.len());
target_buf[0..n].copy_from_slice(&buf[0..n]);
self.kiss_buffer.did_write(n);
- while let Some(_kiss_frame) = self.kiss_buffer.next_frame() {
- // TODO: handle host-to-TNC message
+ while let Some(kiss_frame) = self.kiss_buffer.next_frame() {
+ let Ok(port) = kiss_frame.port() else {
+ continue;
+ };
+ if port == PORT_PACKET_BASIC {
+ } else if port == PORT_PACKET_FULL {
+ } else if port == PORT_STREAM {
+ }
}
n
}
}
enum State {
- /// Nothing happening.
+ /// Nothing happening. We may have TX data queued but we won't act on it until CSMA opens up.
Idle,
/// We received some stream data but missed the leading LSF so we are trying to assemble from LICH.
/// We are receiving a packet. All is well so far, and there is more data to come before we tell the host.
RxPacket(RxPacketState),
- // TODO: TX
+
+ /// PTT is on and this is a stream-type transmission. New data may be added.
+ TxStream,
+
+ /// We have delivered the last frame in the current stream
+ TxStreamSentEndOfStream,
+
+ /// PTT is on and this is a packet-type transmission. New packets may be enqueued.
+ TxPacket,
+
+ /// We gave modulator an EndOfTransmission. PTT is still on, waiting for modulator to advise end time.
+ TxEnding,
+
+ /// Ending transmission, PTT remains on, but we know the timestamp at which we should disengage it.
+ TxEndingAtTime(u64),
}
struct RxAcquiringStreamState {
count: usize,
}
+struct PendingPacket {
+ lsf: Option<LsfFrame>,
+
+ app_data: [u8; 825],
+ app_data_len: usize,
+ app_data_transmitted: usize,
+}
+
+impl PendingPacket {
+ /// Returns next frame, not including preamble or EOT.
+ ///
+ /// False means all data frames have been sent.
+ fn next_frame(&mut self) -> Option<ModulatorFrame> {
+ if let Some(lsf) = self.lsf.take() {
+ return Some(ModulatorFrame::Lsf(lsf));
+ }
+ if self.app_data_len == self.app_data_transmitted {
+ return None;
+ }
+ let remaining = self.app_data_len - self.app_data_transmitted;
+ let (counter, data_len) = if remaining <= 25 {
+ (
+ PacketFrameCounter::FinalFrame {
+ payload_len: remaining,
+ },
+ remaining,
+ )
+ } else {
+ (
+ PacketFrameCounter::Frame {
+ index: self.app_data_transmitted / 25,
+ },
+ 25,
+ )
+ };
+ let mut payload = [0u8; 25];
+ payload.copy_from_slice(
+ &self.app_data[self.app_data_transmitted..(self.app_data_transmitted + data_len)],
+ );
+ self.app_data_transmitted += data_len;
+ Some(ModulatorFrame::Packet(PacketFrame { payload, counter }))
+ }
+}
+
+impl Default for PendingPacket {
+ fn default() -> Self {
+ Self {
+ lsf: None,
+ app_data: [0u8; 825],
+ app_data_len: 0,
+ app_data_transmitted: 0,
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;