]> code.octet-stream.net Git - broadcaster/blob - radio/main.go
Clean up log entries
[broadcaster] / radio / main.go
1 package main
2
3 import (
4 "code.octet-stream.net/broadcaster/internal/protocol"
5 "encoding/json"
6 "flag"
7 "fmt"
8 "github.com/gopxl/beep/v2"
9 "github.com/gopxl/beep/v2/mp3"
10 "github.com/gopxl/beep/v2/speaker"
11 "github.com/gopxl/beep/v2/wav"
12 "golang.org/x/net/websocket"
13 "log"
14 "os"
15 "os/signal"
16 "path/filepath"
17 "strings"
18 "syscall"
19 "time"
20 )
21
22 const version = "v1.0.0"
23 const sampleRate = 44100
24
25 var config RadioConfig = NewRadioConfig()
26
27 func main() {
28 configFlag := flag.String("c", "", "path to configuration file")
29 versionFlag := flag.Bool("v", false, "print version and exit")
30 flag.Parse()
31
32 if *versionFlag {
33 fmt.Println("Broadcaster Radio", version)
34 os.Exit(0)
35 }
36 if *configFlag == "" {
37 log.Fatal("must specify a configuration file with -c")
38 }
39
40 log.Println("Broadcaster Radio", version, "starting up")
41 config.LoadFromFile(*configFlag)
42 statusCollector.Config <- config
43
44 playbackSampleRate := beep.SampleRate(sampleRate)
45 speaker.Init(playbackSampleRate, playbackSampleRate.N(time.Second/10))
46
47 if config.PTTPin != -1 {
48 InitRaspberryPiPTT(config.PTTPin, config.GpioDevice)
49 }
50 if config.COSPin != -1 {
51 InitRaspberryPiCOS(config.COSPin, config.GpioDevice)
52 }
53
54 sig := make(chan os.Signal, 1)
55 signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
56 go func() {
57 sig := <-sig
58 log.Println("Radio shutting down due to signal:", sig)
59 // Make sure we always stop PTT when program ends
60 ptt.DisengagePTT()
61 os.Exit(0)
62 }()
63
64 log.Println("Config checks out, radio coming online")
65 log.Println("Audio file cache:", config.CachePath)
66
67 fileSpecChan := make(chan []protocol.FileSpec)
68 go filesWorker(config.CachePath, fileSpecChan)
69
70 stop := make(chan bool)
71 playlistSpecChan := make(chan []protocol.PlaylistSpec)
72 go playlistWorker(playlistSpecChan, stop)
73
74 for {
75 runWebsocket(fileSpecChan, playlistSpecChan, stop)
76 log.Println("Websocket failed, retry in 30 seconds")
77 time.Sleep(time.Second * time.Duration(30))
78 }
79 }
80
81 func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error {
82 log.Println("Establishing websocket connection to:", config.WebsocketURL())
83 ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL)
84 if err != nil {
85 return err
86 }
87
88 auth := protocol.AuthenticateMessage{
89 T: "authenticate",
90 Token: config.Token,
91 }
92 msg, _ := json.Marshal(auth)
93
94 if _, err := ws.Write(msg); err != nil {
95 log.Fatal(err)
96 }
97 statusCollector.Websocket <- ws
98
99 buf := make([]byte, 16384)
100 badRead := false
101 for {
102 n, err := ws.Read(buf)
103 if err != nil {
104 log.Println("Lost websocket to server")
105 return err
106 }
107 // Ignore any massively oversize messages
108 if n == len(buf) {
109 badRead = true
110 continue
111 } else if badRead {
112 badRead = false
113 continue
114 }
115
116 t, msg, err := protocol.ParseMessage(buf[:n])
117 if err != nil {
118 log.Println("Message parse error", err)
119 return err
120 }
121
122 if t == protocol.FilesType {
123 filesMsg := msg.(protocol.FilesMessage)
124 fileSpecChan <- filesMsg.Files
125 }
126
127 if t == protocol.PlaylistsType {
128 playlistsMsg := msg.(protocol.PlaylistsMessage)
129 playlistSpecChan <- playlistsMsg.Playlists
130 }
131
132 if t == protocol.StopType {
133 stop <- true
134 }
135 }
136 }
137
138 func filesWorker(cachePath string, ch chan []protocol.FileSpec) {
139 machine := NewFilesMachine(cachePath)
140 isDownloading := false
141 downloadResult := make(chan error)
142 var timer *time.Timer
143
144 for {
145 var timerCh <-chan time.Time = nil
146 if timer != nil {
147 timerCh = timer.C
148 }
149 doNext := false
150 select {
151 case specs := <-ch:
152 log.Println("Received new file specs", specs)
153 machine.UpdateSpecs(specs)
154 doNext = true
155 timer = nil
156 case err := <-downloadResult:
157 isDownloading = false
158 machine.RefreshMissing()
159 if err != nil {
160 log.Println(err)
161 if !machine.IsCacheComplete() {
162 timer = time.NewTimer(30 * time.Second)
163 }
164 } else {
165 if !machine.IsCacheComplete() {
166 timer = time.NewTimer(10 * time.Millisecond)
167 }
168 }
169 case <-timerCh:
170 doNext = true
171 timer = nil
172 }
173
174 if doNext && !isDownloading && !machine.IsCacheComplete() {
175 next := machine.NextFile()
176 isDownloading = true
177 go machine.DownloadSingle(next, downloadResult)
178 }
179 }
180 }
181
182 func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) {
183 var specs []protocol.PlaylistSpec
184 isPlaying := false
185 playbackFinished := make(chan error)
186 cancel := make(chan bool)
187 nextId := 0
188 var timer *time.Timer
189
190 for {
191 var timerCh <-chan time.Time = nil
192 if timer != nil {
193 timerCh = timer.C
194 }
195 doNext := false
196 select {
197 case specs = <-ch:
198 log.Println("Received new playlist specs", specs)
199 doNext = true
200 case <-playbackFinished:
201 isPlaying = false
202 doNext = true
203 cancel = make(chan bool)
204 case <-timerCh:
205 timer = nil
206 isPlaying = true
207 for _, v := range specs {
208 if v.Id == nextId {
209 go playPlaylist(v, playbackFinished, cancel)
210 }
211 }
212 case <-stop:
213 if isPlaying {
214 log.Println("Cancelling playlist in progress")
215 cancel <- true
216 }
217 }
218
219 if doNext && !isPlaying {
220 timer = nil
221 found := false
222 loc, err := time.LoadLocation(config.TimeZone)
223 if err != nil {
224 log.Fatal(err)
225 }
226 var soonestTime time.Time
227 for _, v := range specs {
228 t, err := time.ParseInLocation(protocol.StartTimeFormat, v.StartTime, loc)
229 if err != nil {
230 log.Println("Error parsing start time", err)
231 continue
232 }
233 if t.Before(time.Now()) {
234 continue
235 }
236 if !found || t.Before(soonestTime) {
237 soonestTime = t
238 found = true
239 nextId = v.Id
240 }
241 }
242 if found {
243 duration := soonestTime.Sub(time.Now())
244 log.Println("Next playlist will be id", nextId, "in", duration.Seconds(), "seconds")
245 timer = time.NewTimer(duration)
246 } else {
247 log.Println("No future playlists")
248 }
249 }
250 }
251 }
252
253 func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) {
254 startTime := time.Now()
255 log.Println("Beginning playback of playlist", playlist.Name)
256 entries:
257 for _, p := range playlist.Entries {
258 // delay
259 var duration time.Duration
260 if p.IsRelative {
261 duration = time.Second * time.Duration(p.DelaySeconds)
262 } else {
263 duration = time.Until(startTime.Add(time.Second * time.Duration(p.DelaySeconds)))
264 }
265 statusCollector.PlaylistBeginDelay <- BeginDelayStatus{
266 Playlist: playlist.Name,
267 Seconds: int(duration.Seconds()),
268 Filename: p.Filename,
269 }
270 select {
271 case <-time.After(duration):
272 case <-cancel:
273 break entries
274 }
275
276 statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{
277 Playlist: playlist.Name,
278 Filename: p.Filename,
279 }
280 cos.WaitForChannelClear()
281
282 // then play
283 statusCollector.PlaylistBeginPlayback <- BeginPlaybackStatus{
284 Playlist: playlist.Name,
285 Filename: p.Filename,
286 }
287 ptt.EngagePTT()
288 f, err := os.Open(filepath.Join(config.CachePath, p.Filename))
289 if err != nil {
290 log.Println("Couldn't open file for playlist", p.Filename)
291 continue
292 }
293 log.Println("Playing file", p.Filename)
294 l := strings.ToLower(p.Filename)
295 var streamer beep.StreamSeekCloser
296 var format beep.Format
297 if strings.HasSuffix(l, ".mp3") {
298 streamer, format, err = mp3.Decode(f)
299 } else if strings.HasSuffix(l, ".wav") {
300 streamer, format, err = wav.Decode(f)
301 } else {
302 log.Println("Unrecognised file extension (.wav and .mp3 supported), moving on")
303 }
304 if err != nil {
305 log.Println("Could not decode media file", err)
306 continue
307 }
308 defer streamer.Close()
309
310 done := make(chan bool)
311
312 if format.SampleRate != sampleRate {
313 resampled := beep.Resample(4, format.SampleRate, sampleRate, streamer)
314 speaker.Play(beep.Seq(resampled, beep.Callback(func() {
315 done <- true
316 })))
317 } else {
318 speaker.Play(beep.Seq(streamer, beep.Callback(func() {
319 done <- true
320 })))
321 }
322
323 select {
324 case <-done:
325 case <-cancel:
326 ptt.DisengagePTT()
327 break entries
328 }
329 ptt.DisengagePTT()
330 }
331 log.Println("Playlist finished", playlist.Name)
332 statusCollector.PlaylistBeginIdle <- true
333 playbackFinished <- nil
334 }