--- /dev/null
+package main
+
+import (
+ "code.octet-stream.net/broadcaster/internal/protocol"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "github.com/gopxl/beep/v2"
+ "github.com/gopxl/beep/v2/mp3"
+ "github.com/gopxl/beep/v2/speaker"
+ "github.com/gopxl/beep/v2/wav"
+ "golang.org/x/net/websocket"
+ "log"
+ "os"
+ "os/signal"
+ "path/filepath"
+ "strings"
+ "syscall"
+ "time"
+)
+
+const version = "v1.0.0"
+const sampleRate = 44100
+
+var config RadioConfig = NewRadioConfig()
+
+func main() {
+ configFlag := flag.String("c", "", "path to configuration file")
+ versionFlag := flag.Bool("v", false, "print version and exit")
+ flag.Parse()
+
+ if *versionFlag {
+ fmt.Println("Broadcaster Radio", version)
+ os.Exit(0)
+ }
+ if *configFlag == "" {
+ log.Fatal("must specify a configuration file with -c")
+ }
+
+ log.Println("Broadcaster Radio", version, "starting up")
+ config.LoadFromFile(*configFlag)
+ statusCollector.Config <- config
+
+ playbackSampleRate := beep.SampleRate(sampleRate)
+ speaker.Init(playbackSampleRate, playbackSampleRate.N(time.Second/10))
+
+ if config.PTTPin != -1 {
+ InitRaspberryPiPTT(config.PTTPin, config.GpioDevice)
+ }
+ if config.COSPin != -1 {
+ InitRaspberryPiCOS(config.COSPin, config.GpioDevice)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
+ go func() {
+ sig := <-sig
+ log.Println("Radio shutting down due to signal", sig)
+ // Make sure we always stop PTT when program ends
+ ptt.DisengagePTT()
+ os.Exit(0)
+ }()
+
+ log.Println("Config checks out, radio coming online")
+ log.Println("Audio file cache:", config.CachePath)
+
+ fileSpecChan := make(chan []protocol.FileSpec)
+ go filesWorker(config.CachePath, fileSpecChan)
+
+ stop := make(chan bool)
+ playlistSpecChan := make(chan []protocol.PlaylistSpec)
+ go playlistWorker(playlistSpecChan, stop)
+
+ for {
+ runWebsocket(fileSpecChan, playlistSpecChan, stop)
+ log.Println("Websocket failed, retry in 30 seconds")
+ time.Sleep(time.Second * time.Duration(30))
+ }
+}
+
+func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error {
+ log.Println("Establishing websocket connection to:", config.WebsocketURL())
+ ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL)
+ if err != nil {
+ return err
+ }
+
+ auth := protocol.AuthenticateMessage{
+ T: "authenticate",
+ Token: config.Token,
+ }
+ msg, _ := json.Marshal(auth)
+
+ if _, err := ws.Write(msg); err != nil {
+ log.Fatal(err)
+ }
+ statusCollector.Websocket <- ws
+
+ buf := make([]byte, 16384)
+ badRead := false
+ for {
+ n, err := ws.Read(buf)
+ if err != nil {
+ log.Println("Lost websocket to server")
+ return err
+ }
+ // Ignore any massively oversize messages
+ if n == len(buf) {
+ badRead = true
+ continue
+ } else if badRead {
+ badRead = false
+ continue
+ }
+
+ t, msg, err := protocol.ParseMessage(buf[:n])
+ if err != nil {
+ log.Println("Message parse error", err)
+ return err
+ }
+
+ if t == protocol.FilesType {
+ filesMsg := msg.(protocol.FilesMessage)
+ fileSpecChan <- filesMsg.Files
+ }
+
+ if t == protocol.PlaylistsType {
+ playlistsMsg := msg.(protocol.PlaylistsMessage)
+ playlistSpecChan <- playlistsMsg.Playlists
+ }
+
+ if t == protocol.StopType {
+ stop <- true
+ }
+ }
+}
+
+func filesWorker(cachePath string, ch chan []protocol.FileSpec) {
+ machine := NewFilesMachine(cachePath)
+ isDownloading := false
+ downloadResult := make(chan error)
+ var timer *time.Timer
+
+ for {
+ var timerCh <-chan time.Time = nil
+ if timer != nil {
+ timerCh = timer.C
+ }
+ doNext := false
+ select {
+ case specs := <-ch:
+ log.Println("Received new file specs", specs)
+ machine.UpdateSpecs(specs)
+ doNext = true
+ timer = nil
+ case err := <-downloadResult:
+ isDownloading = false
+ machine.RefreshMissing()
+ if err != nil {
+ log.Println(err)
+ if !machine.IsCacheComplete() {
+ timer = time.NewTimer(30 * time.Second)
+ }
+ } else {
+ if !machine.IsCacheComplete() {
+ timer = time.NewTimer(10 * time.Millisecond)
+ }
+ }
+ case <-timerCh:
+ doNext = true
+ timer = nil
+ }
+
+ if doNext && !isDownloading && !machine.IsCacheComplete() {
+ next := machine.NextFile()
+ isDownloading = true
+ go machine.DownloadSingle(next, downloadResult)
+ }
+ }
+}
+
+func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) {
+ var specs []protocol.PlaylistSpec
+ isPlaying := false
+ playbackFinished := make(chan error)
+ cancel := make(chan bool)
+ nextId := 0
+ var timer *time.Timer
+
+ for {
+ var timerCh <-chan time.Time = nil
+ if timer != nil {
+ timerCh = timer.C
+ }
+ doNext := false
+ select {
+ case specs = <-ch:
+ log.Println("Received new playlist specs", specs)
+ doNext = true
+ case <-playbackFinished:
+ isPlaying = false
+ doNext = true
+ cancel = make(chan bool)
+ case <-timerCh:
+ timer = nil
+ isPlaying = true
+ for _, v := range specs {
+ if v.Id == nextId {
+ go playPlaylist(v, playbackFinished, cancel)
+ }
+ }
+ case <-stop:
+ if isPlaying {
+ log.Println("Cancelling playlist in progress")
+ cancel <- true
+ }
+ }
+
+ if doNext && !isPlaying {
+ timer = nil
+ found := false
+ loc, err := time.LoadLocation(config.TimeZone)
+ if err != nil {
+ log.Fatal(err)
+ }
+ var soonestTime time.Time
+ for _, v := range specs {
+ t, err := time.ParseInLocation(protocol.StartTimeFormat, v.StartTime, loc)
+ if err != nil {
+ log.Println("Error parsing start time", err)
+ continue
+ }
+ if t.Before(time.Now()) {
+ continue
+ }
+ if !found || t.Before(soonestTime) {
+ soonestTime = t
+ found = true
+ nextId = v.Id
+ }
+ }
+ if found {
+ duration := soonestTime.Sub(time.Now())
+ log.Println("Next playlist will be id", nextId, "in", duration.Seconds(), "seconds")
+ timer = time.NewTimer(duration)
+ } else {
+ log.Println("No future playlists")
+ }
+ }
+ }
+}
+
+func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) {
+ startTime := time.Now()
+ log.Println("Beginning playback of playlist", playlist.Name)
+entries:
+ for _, p := range playlist.Entries {
+ // delay
+ var duration time.Duration
+ if p.IsRelative {
+ duration = time.Second * time.Duration(p.DelaySeconds)
+ } else {
+ duration = time.Until(startTime.Add(time.Second * time.Duration(p.DelaySeconds)))
+ }
+ statusCollector.PlaylistBeginDelay <- BeginDelayStatus{
+ Playlist: playlist.Name,
+ Seconds: int(duration.Seconds()),
+ Filename: p.Filename,
+ }
+ select {
+ case <-time.After(duration):
+ case <-cancel:
+ break entries
+ }
+
+ statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{
+ Playlist: playlist.Name,
+ Filename: p.Filename,
+ }
+ cos.WaitForChannelClear()
+
+ // then play
+ statusCollector.PlaylistBeginPlayback <- BeginPlaybackStatus{
+ Playlist: playlist.Name,
+ Filename: p.Filename,
+ }
+ ptt.EngagePTT()
+ f, err := os.Open(filepath.Join(config.CachePath, p.Filename))
+ if err != nil {
+ log.Println("Couldn't open file for playlist", p.Filename)
+ continue
+ }
+ log.Println("Playing file", p.Filename)
+ l := strings.ToLower(p.Filename)
+ var streamer beep.StreamSeekCloser
+ var format beep.Format
+ if strings.HasSuffix(l, ".mp3") {
+ streamer, format, err = mp3.Decode(f)
+ } else if strings.HasSuffix(l, ".wav") {
+ streamer, format, err = wav.Decode(f)
+ } else {
+ log.Println("Unrecognised file extension (.wav and .mp3 supported), moving on")
+ }
+ if err != nil {
+ log.Println("Could not decode media file", err)
+ continue
+ }
+ defer streamer.Close()
+
+ done := make(chan bool)
+
+ if format.SampleRate != sampleRate {
+ resampled := beep.Resample(4, format.SampleRate, sampleRate, streamer)
+ speaker.Play(beep.Seq(resampled, beep.Callback(func() {
+ done <- true
+ })))
+ } else {
+ speaker.Play(beep.Seq(streamer, beep.Callback(func() {
+ done <- true
+ })))
+ }
+
+ select {
+ case <-done:
+ case <-cancel:
+ ptt.DisengagePTT()
+ break entries
+ }
+ ptt.DisengagePTT()
+ }
+ log.Println("Playlist finished", playlist.Name)
+ statusCollector.PlaylistBeginIdle <- true
+ playbackFinished <- nil
+}