+++ /dev/null
-package main
-
-import (
- "code.octet-stream.net/broadcaster/internal/protocol"
- "encoding/json"
- "flag"
- "fmt"
- "github.com/gopxl/beep/v2"
- "github.com/gopxl/beep/v2/mp3"
- "github.com/gopxl/beep/v2/speaker"
- "github.com/gopxl/beep/v2/wav"
- "golang.org/x/net/websocket"
- "log"
- "os"
- "os/signal"
- "path/filepath"
- "strings"
- "syscall"
- "time"
-)
-
-const version = "v1.0.0"
-const sampleRate = 44100
-
-var config RadioConfig = NewRadioConfig()
-
-func main() {
- configFlag := flag.String("c", "", "path to configuration file")
- versionFlag := flag.Bool("v", false, "print version and exit")
- flag.Parse()
-
- if *versionFlag {
- fmt.Println("Broadcaster Radio", version)
- os.Exit(0)
- }
- if *configFlag == "" {
- log.Fatal("must specify a configuration file with -c")
- }
-
- log.Println("Broadcaster Radio", version, "starting up")
- config.LoadFromFile(*configFlag)
- statusCollector.Config <- config
-
- playbackSampleRate := beep.SampleRate(sampleRate)
- speaker.Init(playbackSampleRate, playbackSampleRate.N(time.Second/10))
-
- if config.PTTPin != -1 {
- InitRaspberryPiPTT(config.PTTPin, config.GpioDevice)
- }
- if config.COSPin != -1 {
- InitRaspberryPiCOS(config.COSPin, config.GpioDevice)
- }
-
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
- go func() {
- sig := <-sig
- log.Println("Radio shutting down due to signal", sig)
- // Make sure we always stop PTT when program ends
- ptt.DisengagePTT()
- os.Exit(0)
- }()
-
- log.Println("Config checks out, radio coming online")
- log.Println("Audio file cache:", config.CachePath)
-
- fileSpecChan := make(chan []protocol.FileSpec)
- go filesWorker(config.CachePath, fileSpecChan)
-
- stop := make(chan bool)
- playlistSpecChan := make(chan []protocol.PlaylistSpec)
- go playlistWorker(playlistSpecChan, stop)
-
- for {
- runWebsocket(fileSpecChan, playlistSpecChan, stop)
- log.Println("Websocket failed, retry in 30 seconds")
- time.Sleep(time.Second * time.Duration(30))
- }
-}
-
-func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error {
- log.Println("Establishing websocket connection to:", config.WebsocketURL())
- ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL)
- if err != nil {
- return err
- }
-
- auth := protocol.AuthenticateMessage{
- T: "authenticate",
- Token: config.Token,
- }
- msg, _ := json.Marshal(auth)
-
- if _, err := ws.Write(msg); err != nil {
- log.Fatal(err)
- }
- statusCollector.Websocket <- ws
-
- buf := make([]byte, 16384)
- badRead := false
- for {
- n, err := ws.Read(buf)
- if err != nil {
- log.Println("Lost websocket to server")
- return err
- }
- // Ignore any massively oversize messages
- if n == len(buf) {
- badRead = true
- continue
- } else if badRead {
- badRead = false
- continue
- }
-
- t, msg, err := protocol.ParseMessage(buf[:n])
- if err != nil {
- log.Println("Message parse error", err)
- return err
- }
-
- if t == protocol.FilesType {
- filesMsg := msg.(protocol.FilesMessage)
- fileSpecChan <- filesMsg.Files
- }
-
- if t == protocol.PlaylistsType {
- playlistsMsg := msg.(protocol.PlaylistsMessage)
- playlistSpecChan <- playlistsMsg.Playlists
- }
-
- if t == protocol.StopType {
- stop <- true
- }
- }
-}
-
-func filesWorker(cachePath string, ch chan []protocol.FileSpec) {
- machine := NewFilesMachine(cachePath)
- isDownloading := false
- downloadResult := make(chan error)
- var timer *time.Timer
-
- for {
- var timerCh <-chan time.Time = nil
- if timer != nil {
- timerCh = timer.C
- }
- doNext := false
- select {
- case specs := <-ch:
- log.Println("Received new file specs", specs)
- machine.UpdateSpecs(specs)
- doNext = true
- timer = nil
- case err := <-downloadResult:
- isDownloading = false
- machine.RefreshMissing()
- if err != nil {
- log.Println(err)
- if !machine.IsCacheComplete() {
- timer = time.NewTimer(30 * time.Second)
- }
- } else {
- if !machine.IsCacheComplete() {
- timer = time.NewTimer(10 * time.Millisecond)
- }
- }
- case <-timerCh:
- doNext = true
- timer = nil
- }
-
- if doNext && !isDownloading && !machine.IsCacheComplete() {
- next := machine.NextFile()
- isDownloading = true
- go machine.DownloadSingle(next, downloadResult)
- }
- }
-}
-
-func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) {
- var specs []protocol.PlaylistSpec
- isPlaying := false
- playbackFinished := make(chan error)
- cancel := make(chan bool)
- nextId := 0
- var timer *time.Timer
-
- for {
- var timerCh <-chan time.Time = nil
- if timer != nil {
- timerCh = timer.C
- }
- doNext := false
- select {
- case specs = <-ch:
- log.Println("Received new playlist specs", specs)
- doNext = true
- case <-playbackFinished:
- isPlaying = false
- doNext = true
- cancel = make(chan bool)
- case <-timerCh:
- timer = nil
- isPlaying = true
- for _, v := range specs {
- if v.Id == nextId {
- go playPlaylist(v, playbackFinished, cancel)
- }
- }
- case <-stop:
- if isPlaying {
- log.Println("Cancelling playlist in progress")
- cancel <- true
- }
- }
-
- if doNext && !isPlaying {
- timer = nil
- found := false
- loc, err := time.LoadLocation(config.TimeZone)
- if err != nil {
- log.Fatal(err)
- }
- var soonestTime time.Time
- for _, v := range specs {
- t, err := time.ParseInLocation(protocol.StartTimeFormat, v.StartTime, loc)
- if err != nil {
- log.Println("Error parsing start time", err)
- continue
- }
- if t.Before(time.Now()) {
- continue
- }
- if !found || t.Before(soonestTime) {
- soonestTime = t
- found = true
- nextId = v.Id
- }
- }
- if found {
- duration := soonestTime.Sub(time.Now())
- log.Println("Next playlist will be id", nextId, "in", duration.Seconds(), "seconds")
- timer = time.NewTimer(duration)
- } else {
- log.Println("No future playlists")
- }
- }
- }
-}
-
-func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) {
- startTime := time.Now()
- log.Println("Beginning playback of playlist", playlist.Name)
-entries:
- for _, p := range playlist.Entries {
- // delay
- var duration time.Duration
- if p.IsRelative {
- duration = time.Second * time.Duration(p.DelaySeconds)
- } else {
- duration = time.Until(startTime.Add(time.Second * time.Duration(p.DelaySeconds)))
- }
- statusCollector.PlaylistBeginDelay <- BeginDelayStatus{
- Playlist: playlist.Name,
- Seconds: int(duration.Seconds()),
- Filename: p.Filename,
- }
- select {
- case <-time.After(duration):
- case <-cancel:
- break entries
- }
-
- statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{
- Playlist: playlist.Name,
- Filename: p.Filename,
- }
- cos.WaitForChannelClear()
-
- // then play
- statusCollector.PlaylistBeginPlayback <- BeginPlaybackStatus{
- Playlist: playlist.Name,
- Filename: p.Filename,
- }
- ptt.EngagePTT()
- f, err := os.Open(filepath.Join(config.CachePath, p.Filename))
- if err != nil {
- log.Println("Couldn't open file for playlist", p.Filename)
- continue
- }
- log.Println("Playing file", p.Filename)
- l := strings.ToLower(p.Filename)
- var streamer beep.StreamSeekCloser
- var format beep.Format
- if strings.HasSuffix(l, ".mp3") {
- streamer, format, err = mp3.Decode(f)
- } else if strings.HasSuffix(l, ".wav") {
- streamer, format, err = wav.Decode(f)
- } else {
- log.Println("Unrecognised file extension (.wav and .mp3 supported), moving on")
- }
- if err != nil {
- log.Println("Could not decode media file", err)
- continue
- }
- defer streamer.Close()
-
- done := make(chan bool)
-
- if format.SampleRate != sampleRate {
- resampled := beep.Resample(4, format.SampleRate, sampleRate, streamer)
- speaker.Play(beep.Seq(resampled, beep.Callback(func() {
- done <- true
- })))
- } else {
- speaker.Play(beep.Seq(streamer, beep.Callback(func() {
- done <- true
- })))
- }
-
- select {
- case <-done:
- case <-cancel:
- ptt.DisengagePTT()
- break entries
- }
- ptt.DisengagePTT()
- }
- log.Println("Playlist finished", playlist.Name)
- statusCollector.PlaylistBeginIdle <- true
- playbackFinished <- nil
-}