X-Git-Url: https://code.octet-stream.net/broadcaster/blobdiff_plain/7423c6c97eb5d6dc063e7185c50137bbb5e25a23..7b615b3c71825b5b229b78509a16db37e1d3f38d:/radio/main.go diff --git a/radio/main.go b/radio/main.go new file mode 100644 index 0000000..6e31772 --- /dev/null +++ b/radio/main.go @@ -0,0 +1,334 @@ +package main + +import ( + "code.octet-stream.net/broadcaster/internal/protocol" + "encoding/json" + "flag" + "fmt" + "github.com/gopxl/beep/v2" + "github.com/gopxl/beep/v2/mp3" + "github.com/gopxl/beep/v2/speaker" + "github.com/gopxl/beep/v2/wav" + "golang.org/x/net/websocket" + "log" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + "time" +) + +const version = "v1.0.0" +const sampleRate = 44100 + +var config RadioConfig = NewRadioConfig() + +func main() { + configFlag := flag.String("c", "", "path to configuration file") + versionFlag := flag.Bool("v", false, "print version and exit") + flag.Parse() + + if *versionFlag { + fmt.Println("Broadcaster Radio", version) + os.Exit(0) + } + if *configFlag == "" { + log.Fatal("must specify a configuration file with -c") + } + + log.Println("Broadcaster Radio", version, "starting up") + config.LoadFromFile(*configFlag) + statusCollector.Config <- config + + playbackSampleRate := beep.SampleRate(sampleRate) + speaker.Init(playbackSampleRate, playbackSampleRate.N(time.Second/10)) + + if config.PTTPin != -1 { + InitRaspberryPiPTT(config.PTTPin, config.GpioDevice) + } + if config.COSPin != -1 { + InitRaspberryPiCOS(config.COSPin, config.GpioDevice) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sig + log.Println("Radio shutting down due to signal", sig) + // Make sure we always stop PTT when program ends + ptt.DisengagePTT() + os.Exit(0) + }() + + log.Println("Config checks out, radio coming online") + log.Println("Audio file cache:", config.CachePath) + + fileSpecChan := make(chan []protocol.FileSpec) + go filesWorker(config.CachePath, fileSpecChan) + + stop := make(chan bool) + playlistSpecChan := make(chan []protocol.PlaylistSpec) + go playlistWorker(playlistSpecChan, stop) + + for { + runWebsocket(fileSpecChan, playlistSpecChan, stop) + log.Println("Websocket failed, retry in 30 seconds") + time.Sleep(time.Second * time.Duration(30)) + } +} + +func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error { + log.Println("Establishing websocket connection to:", config.WebsocketURL()) + ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL) + if err != nil { + return err + } + + auth := protocol.AuthenticateMessage{ + T: "authenticate", + Token: config.Token, + } + msg, _ := json.Marshal(auth) + + if _, err := ws.Write(msg); err != nil { + log.Fatal(err) + } + statusCollector.Websocket <- ws + + buf := make([]byte, 16384) + badRead := false + for { + n, err := ws.Read(buf) + if err != nil { + log.Println("Lost websocket to server") + return err + } + // Ignore any massively oversize messages + if n == len(buf) { + badRead = true + continue + } else if badRead { + badRead = false + continue + } + + t, msg, err := protocol.ParseMessage(buf[:n]) + if err != nil { + log.Println("Message parse error", err) + return err + } + + if t == protocol.FilesType { + filesMsg := msg.(protocol.FilesMessage) + fileSpecChan <- filesMsg.Files + } + + if t == protocol.PlaylistsType { + playlistsMsg := msg.(protocol.PlaylistsMessage) + playlistSpecChan <- playlistsMsg.Playlists + } + + if t == protocol.StopType { + stop <- true + } + } +} + +func filesWorker(cachePath string, ch chan []protocol.FileSpec) { + machine := NewFilesMachine(cachePath) + isDownloading := false + downloadResult := make(chan error) + var timer *time.Timer + + for { + var timerCh <-chan time.Time = nil + if timer != nil { + timerCh = timer.C + } + doNext := false + select { + case specs := <-ch: + log.Println("Received new file specs", specs) + machine.UpdateSpecs(specs) + doNext = true + timer = nil + case err := <-downloadResult: + isDownloading = false + machine.RefreshMissing() + if err != nil { + log.Println(err) + if !machine.IsCacheComplete() { + timer = time.NewTimer(30 * time.Second) + } + } else { + if !machine.IsCacheComplete() { + timer = time.NewTimer(10 * time.Millisecond) + } + } + case <-timerCh: + doNext = true + timer = nil + } + + if doNext && !isDownloading && !machine.IsCacheComplete() { + next := machine.NextFile() + isDownloading = true + go machine.DownloadSingle(next, downloadResult) + } + } +} + +func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) { + var specs []protocol.PlaylistSpec + isPlaying := false + playbackFinished := make(chan error) + cancel := make(chan bool) + nextId := 0 + var timer *time.Timer + + for { + var timerCh <-chan time.Time = nil + if timer != nil { + timerCh = timer.C + } + doNext := false + select { + case specs = <-ch: + log.Println("Received new playlist specs", specs) + doNext = true + case <-playbackFinished: + isPlaying = false + doNext = true + cancel = make(chan bool) + case <-timerCh: + timer = nil + isPlaying = true + for _, v := range specs { + if v.Id == nextId { + go playPlaylist(v, playbackFinished, cancel) + } + } + case <-stop: + if isPlaying { + log.Println("Cancelling playlist in progress") + cancel <- true + } + } + + if doNext && !isPlaying { + timer = nil + found := false + loc, err := time.LoadLocation(config.TimeZone) + if err != nil { + log.Fatal(err) + } + var soonestTime time.Time + for _, v := range specs { + t, err := time.ParseInLocation(protocol.StartTimeFormat, v.StartTime, loc) + if err != nil { + log.Println("Error parsing start time", err) + continue + } + if t.Before(time.Now()) { + continue + } + if !found || t.Before(soonestTime) { + soonestTime = t + found = true + nextId = v.Id + } + } + if found { + duration := soonestTime.Sub(time.Now()) + log.Println("Next playlist will be id", nextId, "in", duration.Seconds(), "seconds") + timer = time.NewTimer(duration) + } else { + log.Println("No future playlists") + } + } + } +} + +func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) { + startTime := time.Now() + log.Println("Beginning playback of playlist", playlist.Name) +entries: + for _, p := range playlist.Entries { + // delay + var duration time.Duration + if p.IsRelative { + duration = time.Second * time.Duration(p.DelaySeconds) + } else { + duration = time.Until(startTime.Add(time.Second * time.Duration(p.DelaySeconds))) + } + statusCollector.PlaylistBeginDelay <- BeginDelayStatus{ + Playlist: playlist.Name, + Seconds: int(duration.Seconds()), + Filename: p.Filename, + } + select { + case <-time.After(duration): + case <-cancel: + break entries + } + + statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{ + Playlist: playlist.Name, + Filename: p.Filename, + } + cos.WaitForChannelClear() + + // then play + statusCollector.PlaylistBeginPlayback <- BeginPlaybackStatus{ + Playlist: playlist.Name, + Filename: p.Filename, + } + ptt.EngagePTT() + f, err := os.Open(filepath.Join(config.CachePath, p.Filename)) + if err != nil { + log.Println("Couldn't open file for playlist", p.Filename) + continue + } + log.Println("Playing file", p.Filename) + l := strings.ToLower(p.Filename) + var streamer beep.StreamSeekCloser + var format beep.Format + if strings.HasSuffix(l, ".mp3") { + streamer, format, err = mp3.Decode(f) + } else if strings.HasSuffix(l, ".wav") { + streamer, format, err = wav.Decode(f) + } else { + log.Println("Unrecognised file extension (.wav and .mp3 supported), moving on") + } + if err != nil { + log.Println("Could not decode media file", err) + continue + } + defer streamer.Close() + + done := make(chan bool) + + if format.SampleRate != sampleRate { + resampled := beep.Resample(4, format.SampleRate, sampleRate, streamer) + speaker.Play(beep.Seq(resampled, beep.Callback(func() { + done <- true + }))) + } else { + speaker.Play(beep.Seq(streamer, beep.Callback(func() { + done <- true + }))) + } + + select { + case <-done: + case <-cancel: + ptt.DisengagePTT() + break entries + } + ptt.DisengagePTT() + } + log.Println("Playlist finished", playlist.Name) + statusCollector.PlaylistBeginIdle <- true + playbackFinished <- nil +}