]> code.octet-stream.net Git - broadcaster/blob - radio/main.go
22658a38633fab0219c54e1fee37a80c4e44e95f
[broadcaster] / radio / main.go
1 package main
2
3 import (
4 "encoding/json"
5 "flag"
6 "fmt"
7 "log"
8 "os"
9 "os/signal"
10 "path/filepath"
11 "strings"
12 "syscall"
13 "time"
14
15 "code.octet-stream.net/broadcaster/internal/protocol"
16 "github.com/gopxl/beep/v2"
17 "github.com/gopxl/beep/v2/mp3"
18 "github.com/gopxl/beep/v2/speaker"
19 "github.com/gopxl/beep/v2/wav"
20 "golang.org/x/net/websocket"
21 )
22
23 const version = "v1.1.0"
24 const sampleRate = 44100
25
26 var config RadioConfig = NewRadioConfig()
27
28 func main() {
29 configFlag := flag.String("c", "", "path to configuration file")
30 versionFlag := flag.Bool("v", false, "print version and exit")
31 flag.Parse()
32
33 if *versionFlag {
34 fmt.Println("Broadcaster Radio", version)
35 os.Exit(0)
36 }
37 if *configFlag == "" {
38 log.Fatal("must specify a configuration file with -c")
39 }
40
41 log.Println("Broadcaster Radio", version, "starting up")
42 config.LoadFromFile(*configFlag)
43 statusCollector.Config <- config
44
45 playbackSampleRate := beep.SampleRate(sampleRate)
46 speaker.Init(playbackSampleRate, playbackSampleRate.N(time.Second/10))
47
48 if config.PTTPin != -1 {
49 InitRaspberryPiPTT(config.PTTPin, config.GpioDevice)
50 }
51 if config.COSPin != -1 {
52 InitRaspberryPiCOS(config.COSPin, config.GpioDevice)
53 }
54
55 sig := make(chan os.Signal, 1)
56 signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
57 go func() {
58 sig := <-sig
59 log.Println("Radio shutting down due to signal:", sig)
60 // Make sure we always stop PTT when program ends
61 ptt.DisengagePTT()
62 os.Exit(0)
63 }()
64
65 log.Println("Config checks out, radio coming online")
66 log.Println("Audio file cache:", config.CachePath)
67
68 fileSpecChan := make(chan []protocol.FileSpec)
69 go filesWorker(config.CachePath, fileSpecChan)
70
71 stop := make(chan bool)
72 playlistSpecChan := make(chan []protocol.PlaylistSpec)
73 go playlistWorker(playlistSpecChan, stop)
74
75 for {
76 runWebsocket(fileSpecChan, playlistSpecChan, stop)
77 log.Println("Websocket failed, retry in 30 seconds")
78 time.Sleep(time.Second * time.Duration(30))
79 }
80 }
81
82 func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error {
83 log.Println("Establishing websocket connection to:", config.WebsocketURL())
84 ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL)
85 if err != nil {
86 return err
87 }
88
89 auth := protocol.AuthenticateMessage{
90 T: "authenticate",
91 Token: config.Token,
92 }
93 msg, _ := json.Marshal(auth)
94
95 if _, err := ws.Write(msg); err != nil {
96 log.Fatal(err)
97 }
98 statusCollector.Websocket <- ws
99
100 buf := make([]byte, 16384)
101 badRead := false
102 for {
103 n, err := ws.Read(buf)
104 if err != nil {
105 log.Println("Lost websocket to server")
106 return err
107 }
108 // Ignore any massively oversize messages
109 if n == len(buf) {
110 badRead = true
111 continue
112 } else if badRead {
113 badRead = false
114 continue
115 }
116
117 t, msg, err := protocol.ParseMessage(buf[:n])
118 if err != nil {
119 log.Println("Message parse error", err)
120 return err
121 }
122
123 if t == protocol.FilesType {
124 filesMsg := msg.(protocol.FilesMessage)
125 fileSpecChan <- filesMsg.Files
126 }
127
128 if t == protocol.PlaylistsType {
129 playlistsMsg := msg.(protocol.PlaylistsMessage)
130 playlistSpecChan <- playlistsMsg.Playlists
131 }
132
133 if t == protocol.StopType {
134 log.Println("Received stop transmission message from server")
135 stop <- true
136 }
137 }
138 }
139
140 func filesWorker(cachePath string, ch chan []protocol.FileSpec) {
141 machine := NewFilesMachine(cachePath)
142 isDownloading := false
143 downloadResult := make(chan error)
144 var timer *time.Timer
145
146 for {
147 var timerCh <-chan time.Time = nil
148 if timer != nil {
149 timerCh = timer.C
150 }
151 doNext := false
152 select {
153 case specs := <-ch:
154 log.Println("Received new file specs", specs)
155 machine.UpdateSpecs(specs)
156 doNext = true
157 timer = nil
158 case err := <-downloadResult:
159 isDownloading = false
160 machine.RefreshMissing()
161 if err != nil {
162 log.Println(err)
163 if !machine.IsCacheComplete() {
164 timer = time.NewTimer(30 * time.Second)
165 }
166 } else {
167 if !machine.IsCacheComplete() {
168 timer = time.NewTimer(10 * time.Millisecond)
169 }
170 }
171 case <-timerCh:
172 doNext = true
173 timer = nil
174 }
175
176 if doNext && !isDownloading && !machine.IsCacheComplete() {
177 next := machine.NextFile()
178 isDownloading = true
179 go machine.DownloadSingle(next, downloadResult)
180 }
181 }
182 }
183
184 func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) {
185 var specs []protocol.PlaylistSpec
186 isPlaying := false
187 playbackFinished := make(chan error)
188 cancel := make(chan bool)
189 nextId := 0
190 var timer *time.Timer
191
192 for {
193 var timerCh <-chan time.Time = nil
194 if timer != nil {
195 timerCh = timer.C
196 }
197 doNext := false
198 select {
199 case specs = <-ch:
200 log.Println("Received new playlist specs", specs)
201 doNext = true
202 case <-playbackFinished:
203 isPlaying = false
204 doNext = true
205 cancel = make(chan bool)
206 case <-timerCh:
207 timer = nil
208 isPlaying = true
209 for _, v := range specs {
210 if v.Id == nextId {
211 go playPlaylist(v, playbackFinished, cancel)
212 }
213 }
214 case <-stop:
215 if isPlaying {
216 log.Println("Cancelling playlist in progress")
217 cancel <- true
218 }
219 }
220
221 if doNext && !isPlaying {
222 timer = nil
223 found := false
224 loc, err := time.LoadLocation(config.TimeZone)
225 if err != nil {
226 log.Fatal(err)
227 }
228 var soonestTime time.Time
229 for _, v := range specs {
230 t, err := time.ParseInLocation(protocol.StartTimeFormat, v.StartTime, loc)
231 if err != nil {
232 log.Println("Error parsing start time", err)
233 continue
234 }
235 if t.Before(time.Now()) {
236 continue
237 }
238 if !found || t.Before(soonestTime) {
239 soonestTime = t
240 found = true
241 nextId = v.Id
242 }
243 }
244 if found {
245 duration := time.Until(soonestTime)
246 log.Println("Next playlist will be id", nextId, "in", duration.Seconds(), "seconds")
247 timer = time.NewTimer(duration)
248 } else {
249 log.Println("No future playlists")
250 }
251 }
252 }
253 }
254
255 func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) {
256 startTime := time.Now()
257 log.Println("Beginning playback of playlist", playlist.Name)
258 entries:
259 for _, p := range playlist.Entries {
260 // delay
261 var duration time.Duration
262 if p.IsRelative {
263 duration = time.Second * time.Duration(p.DelaySeconds)
264 } else {
265 duration = time.Until(startTime.Add(time.Second * time.Duration(p.DelaySeconds)))
266 }
267 statusCollector.PlaylistBeginDelay <- BeginDelayStatus{
268 Playlist: playlist.Name,
269 Seconds: int(duration.Seconds()),
270 Filename: p.Filename,
271 }
272 select {
273 case <-time.After(duration):
274 case <-cancel:
275 log.Println("Cancelling pre-play delay")
276 break entries
277 }
278
279 statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{
280 Playlist: playlist.Name,
281 Filename: p.Filename,
282 }
283 cos.WaitForChannelClear()
284
285 // then play
286 statusCollector.PlaylistBeginPlayback <- BeginPlaybackStatus{
287 Playlist: playlist.Name,
288 Filename: p.Filename,
289 }
290 f, err := os.Open(filepath.Join(config.CachePath, p.Filename))
291 if err != nil {
292 log.Println("Couldn't open file for playlist", p.Filename)
293 continue
294 }
295 log.Println("Playing file", p.Filename)
296 l := strings.ToLower(p.Filename)
297 var streamer beep.StreamSeekCloser
298 var format beep.Format
299 if strings.HasSuffix(l, ".mp3") {
300 streamer, format, err = mp3.Decode(f)
301 } else if strings.HasSuffix(l, ".wav") {
302 streamer, format, err = wav.Decode(f)
303 } else {
304 log.Println("Unrecognised file extension (.wav and .mp3 supported), moving on")
305 }
306 if err != nil {
307 log.Println("Could not decode media file", err)
308 continue
309 }
310 defer streamer.Close()
311
312 done := make(chan bool)
313 log.Println("PTT on for playback")
314 ptt.EngagePTT()
315
316 if format.SampleRate != sampleRate {
317 log.Println("Configuring resampler for audio provided at sample rate", format.SampleRate)
318 resampled := beep.Resample(4, format.SampleRate, sampleRate, streamer)
319 log.Println("Playing resampled audio")
320 speaker.Play(beep.Seq(resampled, beep.Callback(func() {
321 done <- true
322 })))
323 } else {
324 log.Println("Playing audio at native sample rate")
325 speaker.Play(beep.Seq(streamer, beep.Callback(func() {
326 done <- true
327 })))
328 }
329
330 select {
331 case <-done:
332 log.Println("Audio playback complete")
333 case <-cancel:
334 log.Println("Disengaging PTT and aborting playlist playback")
335 ptt.DisengagePTT()
336 break entries
337 }
338 log.Println("PTT off since audio file has finished")
339 ptt.DisengagePTT()
340 }
341 log.Println("Playlist finished", playlist.Name)
342 statusCollector.PlaylistBeginIdle <- true
343 playbackFinished <- nil
344 }