]> code.octet-stream.net Git - broadcaster/blob - radio/radio.go
Fix races in status message accumulation
[broadcaster] / radio / radio.go
1 package main
2
3 import (
4 "code.octet-stream.net/broadcaster/protocol"
5 "encoding/json"
6 "flag"
7 "github.com/gopxl/beep/v2"
8 "github.com/gopxl/beep/v2/mp3"
9 "github.com/gopxl/beep/v2/speaker"
10 "github.com/gopxl/beep/v2/wav"
11 "golang.org/x/net/websocket"
12 "log"
13 "os"
14 "os/signal"
15 "path/filepath"
16 "strings"
17 "syscall"
18 "time"
19 )
20
21 const sampleRate = 44100
22
23 var config RadioConfig = NewRadioConfig()
24
25 func main() {
26 configFlag := flag.String("c", "", "path to configuration file")
27 // TODO: support this
28 //generateFlag := flag.String("g", "", "create a template config file with specified name then exit")
29 flag.Parse()
30
31 if *configFlag == "" {
32 log.Fatal("must specify a configuration file with -c")
33 }
34 config.LoadFromFile(*configFlag)
35 statusCollector.Config <- config
36
37 playbackSampleRate := beep.SampleRate(sampleRate)
38 speaker.Init(playbackSampleRate, playbackSampleRate.N(time.Second/10))
39
40 if config.PTTPin != -1 {
41 InitRaspberryPiPTT(config.PTTPin, config.GpioDevice)
42 }
43 if config.COSPin != -1 {
44 InitRaspberryPiCOS(config.COSPin, config.GpioDevice)
45 }
46
47 sig := make(chan os.Signal, 1)
48 signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
49 go func() {
50 sig := <-sig
51 log.Println("Radio shutting down due to signal", sig)
52 // Make sure we always stop PTT when program ends
53 ptt.DisengagePTT()
54 os.Exit(0)
55 }()
56
57 log.Println("Config checks out, radio coming online")
58 log.Println("Audio file cache:", config.CachePath)
59
60 fileSpecChan := make(chan []protocol.FileSpec)
61 go filesWorker(config.CachePath, fileSpecChan)
62
63 playlistSpecChan := make(chan []protocol.PlaylistSpec)
64 go playlistWorker(playlistSpecChan)
65
66 for {
67 runWebsocket(fileSpecChan, playlistSpecChan)
68 log.Println("Websocket failed, retry in 30 seconds")
69 time.Sleep(time.Second * time.Duration(30))
70 }
71 }
72
73 func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec) error {
74 log.Println("Establishing websocket connection to:", config.WebsocketURL())
75 ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL)
76 if err != nil {
77 return err
78 }
79
80 auth := protocol.AuthenticateMessage{
81 T: "authenticate",
82 Token: config.Token,
83 }
84 msg, _ := json.Marshal(auth)
85
86 if _, err := ws.Write(msg); err != nil {
87 log.Fatal(err)
88 }
89 statusCollector.Websocket <- ws
90
91 buf := make([]byte, 16384)
92 badRead := false
93 for {
94 n, err := ws.Read(buf)
95 if err != nil {
96 log.Println("Lost websocket to server")
97 return err
98 }
99 // Ignore any massively oversize messages
100 if n == len(buf) {
101 badRead = true
102 continue
103 } else if badRead {
104 badRead = false
105 continue
106 }
107
108 t, msg, err := protocol.ParseMessage(buf[:n])
109 if err != nil {
110 log.Println("Message parse error", err)
111 return err
112 }
113
114 if t == protocol.FilesType {
115 filesMsg := msg.(protocol.FilesMessage)
116 fileSpecChan <- filesMsg.Files
117 }
118
119 if t == protocol.PlaylistsType {
120 playlistsMsg := msg.(protocol.PlaylistsMessage)
121 playlistSpecChan <- playlistsMsg.Playlists
122 }
123 }
124 }
125
126 func filesWorker(cachePath string, ch chan []protocol.FileSpec) {
127 machine := NewFilesMachine(cachePath)
128 isDownloading := false
129 downloadResult := make(chan error)
130 var timer *time.Timer
131
132 for {
133 var timerCh <-chan time.Time = nil
134 if timer != nil {
135 timerCh = timer.C
136 }
137 doNext := false
138 select {
139 case specs := <-ch:
140 log.Println("Received new file specs", specs)
141 machine.UpdateSpecs(specs)
142 doNext = true
143 timer = nil
144 case err := <-downloadResult:
145 isDownloading = false
146 machine.RefreshMissing()
147 if err != nil {
148 log.Println(err)
149 if !machine.IsCacheComplete() {
150 timer = time.NewTimer(30 * time.Second)
151 }
152 } else {
153 if !machine.IsCacheComplete() {
154 timer = time.NewTimer(10 * time.Millisecond)
155 }
156 }
157 case <-timerCh:
158 doNext = true
159 timer = nil
160 }
161
162 if doNext && !isDownloading && !machine.IsCacheComplete() {
163 next := machine.NextFile()
164 isDownloading = true
165 go machine.DownloadSingle(next, downloadResult)
166 }
167 }
168 }
169
170 func playlistWorker(ch <-chan []protocol.PlaylistSpec) {
171 var specs []protocol.PlaylistSpec
172 isPlaying := false
173 playbackFinished := make(chan error)
174 nextId := 0
175 var timer *time.Timer
176
177 for {
178 var timerCh <-chan time.Time = nil
179 if timer != nil {
180 timerCh = timer.C
181 }
182 doNext := false
183 select {
184 case specs = <-ch:
185 log.Println("Received new playlist specs", specs)
186 doNext = true
187 case <-playbackFinished:
188 isPlaying = false
189 doNext = true
190 case <-timerCh:
191 timer = nil
192 isPlaying = true
193 for _, v := range specs {
194 if v.Id == nextId {
195 go playPlaylist(v, playbackFinished)
196 }
197 }
198 }
199
200 if doNext && !isPlaying {
201 timer = nil
202 found := false
203 loc, err := time.LoadLocation(config.TimeZone)
204 if err != nil {
205 log.Fatal(err)
206 }
207 var soonestTime time.Time
208 for _, v := range specs {
209 t, err := time.ParseInLocation(protocol.StartTimeFormat, v.StartTime, loc)
210 if err != nil {
211 log.Println("Error parsing start time", err)
212 continue
213 }
214 if t.Before(time.Now()) {
215 continue
216 }
217 if !found || t.Before(soonestTime) {
218 soonestTime = t
219 found = true
220 nextId = v.Id
221 }
222 }
223 if found {
224 duration := soonestTime.Sub(time.Now())
225 log.Println("Next playlist will be id", nextId, "in", duration.Seconds(), "seconds")
226 timer = time.NewTimer(duration)
227 } else {
228 log.Println("No future playlists")
229 }
230 }
231 }
232 }
233
234 func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) {
235 // TODO: possibility of on-demand cancellation
236 startTime := time.Now()
237 log.Println("Beginning playback of playlist", playlist.Name)
238 for _, p := range playlist.Entries {
239 // delay
240 var duration time.Duration
241 if p.IsRelative {
242 duration = time.Second * time.Duration(p.DelaySeconds)
243 } else {
244 duration = time.Until(startTime.Add(time.Second * time.Duration(p.DelaySeconds)))
245 }
246 statusCollector.PlaylistBeginDelay <- BeginDelayStatus{
247 Playlist: playlist.Name,
248 Seconds: int(duration.Seconds()),
249 Filename: p.Filename,
250 }
251 <-time.After(duration)
252
253 statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{
254 Playlist: playlist.Name,
255 Filename: p.Filename,
256 }
257 cos.WaitForChannelClear()
258
259 // then play
260 statusCollector.PlaylistBeginPlayback <- BeginPlaybackStatus{
261 Playlist: playlist.Name,
262 Filename: p.Filename,
263 }
264 ptt.EngagePTT()
265 f, err := os.Open(filepath.Join(config.CachePath, p.Filename))
266 if err != nil {
267 log.Println("Couldn't open file for playlist", p.Filename)
268 continue
269 }
270 log.Println("Playing file", p.Filename)
271 l := strings.ToLower(p.Filename)
272 var streamer beep.StreamSeekCloser
273 var format beep.Format
274 if strings.HasSuffix(l, ".mp3") {
275 streamer, format, err = mp3.Decode(f)
276 } else if strings.HasSuffix(l, ".wav") {
277 streamer, format, err = wav.Decode(f)
278 } else {
279 log.Println("Unrecognised file extension (.wav and .mp3 supported), moving on")
280 }
281 if err != nil {
282 log.Println("Could not decode media file", err)
283 continue
284 }
285 defer streamer.Close()
286
287 done := make(chan bool)
288
289 if format.SampleRate != sampleRate {
290 resampled := beep.Resample(4, format.SampleRate, sampleRate, streamer)
291 speaker.Play(beep.Seq(resampled, beep.Callback(func() {
292 done <- true
293 })))
294 } else {
295 speaker.Play(beep.Seq(streamer, beep.Callback(func() {
296 done <- true
297 })))
298 }
299
300 <-done
301 ptt.DisengagePTT()
302 }
303 log.Println("Playlist finished", playlist.Name)
304 statusCollector.PlaylistBeginIdle <- true
305 playbackFinished <- nil
306 }