]> code.octet-stream.net Git - broadcaster/blob - radio/radio.go
bcf5e38d7542a8f6c545147e274bdf02ab1d4bbd
[broadcaster] / radio / radio.go
1 package main
2
3 import (
4 "code.octet-stream.net/broadcaster/protocol"
5 "encoding/json"
6 "flag"
7 "github.com/gopxl/beep/v2"
8 "github.com/gopxl/beep/v2/mp3"
9 "github.com/gopxl/beep/v2/speaker"
10 "github.com/gopxl/beep/v2/wav"
11 "golang.org/x/net/websocket"
12 "log"
13 "os"
14 "os/signal"
15 "path/filepath"
16 "strings"
17 "syscall"
18 "time"
19 )
20
21 const sampleRate = 44100
22
23 var config RadioConfig = NewRadioConfig()
24
25 func main() {
26 configFlag := flag.String("c", "", "path to configuration file")
27 // TODO: support this
28 //generateFlag := flag.String("g", "", "create a template config file with specified name then exit")
29 flag.Parse()
30
31 if *configFlag == "" {
32 log.Fatal("must specify a configuration file with -c")
33 }
34 config.LoadFromFile(*configFlag)
35 statusCollector.Config <- config
36
37 playbackSampleRate := beep.SampleRate(sampleRate)
38 speaker.Init(playbackSampleRate, playbackSampleRate.N(time.Second/10))
39
40 if config.PTTPin != -1 {
41 InitRaspberryPiPTT(config.PTTPin, config.GpioDevice)
42 }
43 if config.COSPin != -1 {
44 InitRaspberryPiCOS(config.COSPin, config.GpioDevice)
45 }
46
47 sig := make(chan os.Signal, 1)
48 signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
49 go func() {
50 sig := <-sig
51 log.Println("Radio shutting down due to signal", sig)
52 // Make sure we always stop PTT when program ends
53 ptt.DisengagePTT()
54 os.Exit(0)
55 }()
56
57 log.Println("Config checks out, radio coming online")
58 log.Println("Audio file cache:", config.CachePath)
59
60 fileSpecChan := make(chan []protocol.FileSpec)
61 go filesWorker(config.CachePath, fileSpecChan)
62
63 stop := make(chan bool)
64 playlistSpecChan := make(chan []protocol.PlaylistSpec)
65 go playlistWorker(playlistSpecChan, stop)
66
67 for {
68 runWebsocket(fileSpecChan, playlistSpecChan, stop)
69 log.Println("Websocket failed, retry in 30 seconds")
70 time.Sleep(time.Second * time.Duration(30))
71 }
72 }
73
74 func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error {
75 log.Println("Establishing websocket connection to:", config.WebsocketURL())
76 ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL)
77 if err != nil {
78 return err
79 }
80
81 auth := protocol.AuthenticateMessage{
82 T: "authenticate",
83 Token: config.Token,
84 }
85 msg, _ := json.Marshal(auth)
86
87 if _, err := ws.Write(msg); err != nil {
88 log.Fatal(err)
89 }
90 statusCollector.Websocket <- ws
91
92 buf := make([]byte, 16384)
93 badRead := false
94 for {
95 n, err := ws.Read(buf)
96 if err != nil {
97 log.Println("Lost websocket to server")
98 return err
99 }
100 // Ignore any massively oversize messages
101 if n == len(buf) {
102 badRead = true
103 continue
104 } else if badRead {
105 badRead = false
106 continue
107 }
108
109 t, msg, err := protocol.ParseMessage(buf[:n])
110 if err != nil {
111 log.Println("Message parse error", err)
112 return err
113 }
114
115 if t == protocol.FilesType {
116 filesMsg := msg.(protocol.FilesMessage)
117 fileSpecChan <- filesMsg.Files
118 }
119
120 if t == protocol.PlaylistsType {
121 playlistsMsg := msg.(protocol.PlaylistsMessage)
122 playlistSpecChan <- playlistsMsg.Playlists
123 }
124
125 if t == protocol.StopType {
126 stop <- true
127 }
128 }
129 }
130
131 func filesWorker(cachePath string, ch chan []protocol.FileSpec) {
132 machine := NewFilesMachine(cachePath)
133 isDownloading := false
134 downloadResult := make(chan error)
135 var timer *time.Timer
136
137 for {
138 var timerCh <-chan time.Time = nil
139 if timer != nil {
140 timerCh = timer.C
141 }
142 doNext := false
143 select {
144 case specs := <-ch:
145 log.Println("Received new file specs", specs)
146 machine.UpdateSpecs(specs)
147 doNext = true
148 timer = nil
149 case err := <-downloadResult:
150 isDownloading = false
151 machine.RefreshMissing()
152 if err != nil {
153 log.Println(err)
154 if !machine.IsCacheComplete() {
155 timer = time.NewTimer(30 * time.Second)
156 }
157 } else {
158 if !machine.IsCacheComplete() {
159 timer = time.NewTimer(10 * time.Millisecond)
160 }
161 }
162 case <-timerCh:
163 doNext = true
164 timer = nil
165 }
166
167 if doNext && !isDownloading && !machine.IsCacheComplete() {
168 next := machine.NextFile()
169 isDownloading = true
170 go machine.DownloadSingle(next, downloadResult)
171 }
172 }
173 }
174
175 func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) {
176 var specs []protocol.PlaylistSpec
177 isPlaying := false
178 playbackFinished := make(chan error)
179 cancel := make(chan bool)
180 nextId := 0
181 var timer *time.Timer
182
183 for {
184 var timerCh <-chan time.Time = nil
185 if timer != nil {
186 timerCh = timer.C
187 }
188 doNext := false
189 select {
190 case specs = <-ch:
191 log.Println("Received new playlist specs", specs)
192 doNext = true
193 case <-playbackFinished:
194 isPlaying = false
195 doNext = true
196 cancel = make(chan bool)
197 case <-timerCh:
198 timer = nil
199 isPlaying = true
200 for _, v := range specs {
201 if v.Id == nextId {
202 go playPlaylist(v, playbackFinished, cancel)
203 }
204 }
205 case <-stop:
206 if isPlaying {
207 log.Println("Cancelling playlist in progress")
208 cancel <- true
209 }
210 }
211
212 if doNext && !isPlaying {
213 timer = nil
214 found := false
215 loc, err := time.LoadLocation(config.TimeZone)
216 if err != nil {
217 log.Fatal(err)
218 }
219 var soonestTime time.Time
220 for _, v := range specs {
221 t, err := time.ParseInLocation(protocol.StartTimeFormat, v.StartTime, loc)
222 if err != nil {
223 log.Println("Error parsing start time", err)
224 continue
225 }
226 if t.Before(time.Now()) {
227 continue
228 }
229 if !found || t.Before(soonestTime) {
230 soonestTime = t
231 found = true
232 nextId = v.Id
233 }
234 }
235 if found {
236 duration := soonestTime.Sub(time.Now())
237 log.Println("Next playlist will be id", nextId, "in", duration.Seconds(), "seconds")
238 timer = time.NewTimer(duration)
239 } else {
240 log.Println("No future playlists")
241 }
242 }
243 }
244 }
245
246 func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) {
247 startTime := time.Now()
248 log.Println("Beginning playback of playlist", playlist.Name)
249 entries:
250 for _, p := range playlist.Entries {
251 // delay
252 var duration time.Duration
253 if p.IsRelative {
254 duration = time.Second * time.Duration(p.DelaySeconds)
255 } else {
256 duration = time.Until(startTime.Add(time.Second * time.Duration(p.DelaySeconds)))
257 }
258 statusCollector.PlaylistBeginDelay <- BeginDelayStatus{
259 Playlist: playlist.Name,
260 Seconds: int(duration.Seconds()),
261 Filename: p.Filename,
262 }
263 select {
264 case <-time.After(duration):
265 case <-cancel:
266 break entries
267 }
268
269 statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{
270 Playlist: playlist.Name,
271 Filename: p.Filename,
272 }
273 cos.WaitForChannelClear()
274
275 // then play
276 statusCollector.PlaylistBeginPlayback <- BeginPlaybackStatus{
277 Playlist: playlist.Name,
278 Filename: p.Filename,
279 }
280 ptt.EngagePTT()
281 f, err := os.Open(filepath.Join(config.CachePath, p.Filename))
282 if err != nil {
283 log.Println("Couldn't open file for playlist", p.Filename)
284 continue
285 }
286 log.Println("Playing file", p.Filename)
287 l := strings.ToLower(p.Filename)
288 var streamer beep.StreamSeekCloser
289 var format beep.Format
290 if strings.HasSuffix(l, ".mp3") {
291 streamer, format, err = mp3.Decode(f)
292 } else if strings.HasSuffix(l, ".wav") {
293 streamer, format, err = wav.Decode(f)
294 } else {
295 log.Println("Unrecognised file extension (.wav and .mp3 supported), moving on")
296 }
297 if err != nil {
298 log.Println("Could not decode media file", err)
299 continue
300 }
301 defer streamer.Close()
302
303 done := make(chan bool)
304
305 if format.SampleRate != sampleRate {
306 resampled := beep.Resample(4, format.SampleRate, sampleRate, streamer)
307 speaker.Play(beep.Seq(resampled, beep.Callback(func() {
308 done <- true
309 })))
310 } else {
311 speaker.Play(beep.Seq(streamer, beep.Callback(func() {
312 done <- true
313 })))
314 }
315
316 select {
317 case <-done:
318 case <-cancel:
319 ptt.DisengagePTT()
320 break entries
321 }
322 ptt.DisengagePTT()
323 }
324 log.Println("Playlist finished", playlist.Name)
325 statusCollector.PlaylistBeginIdle <- true
326 playbackFinished <- nil
327 }