X-Git-Url: https://code.octet-stream.net/broadcaster/blobdiff_plain/8f9b19f7877e32c15a5f4c296d6738f449dfcc19..8320951221d45c5f5f3d387c5cb4b97d9fa2094c:/radio/radio.go?ds=inline diff --git a/radio/radio.go b/radio/radio.go index f6b3b50..bcf5e38 100644 --- a/radio/radio.go +++ b/radio/radio.go @@ -60,17 +60,18 @@ func main() { fileSpecChan := make(chan []protocol.FileSpec) go filesWorker(config.CachePath, fileSpecChan) + stop := make(chan bool) playlistSpecChan := make(chan []protocol.PlaylistSpec) - go playlistWorker(playlistSpecChan) + go playlistWorker(playlistSpecChan, stop) for { - runWebsocket(fileSpecChan, playlistSpecChan) + runWebsocket(fileSpecChan, playlistSpecChan, stop) log.Println("Websocket failed, retry in 30 seconds") time.Sleep(time.Second * time.Duration(30)) } } -func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec) error { +func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error { log.Println("Establishing websocket connection to:", config.WebsocketURL()) ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL) if err != nil { @@ -120,6 +121,10 @@ func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan [ playlistsMsg := msg.(protocol.PlaylistsMessage) playlistSpecChan <- playlistsMsg.Playlists } + + if t == protocol.StopType { + stop <- true + } } } @@ -167,10 +172,11 @@ func filesWorker(cachePath string, ch chan []protocol.FileSpec) { } } -func playlistWorker(ch <-chan []protocol.PlaylistSpec) { +func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) { var specs []protocol.PlaylistSpec isPlaying := false playbackFinished := make(chan error) + cancel := make(chan bool) nextId := 0 var timer *time.Timer @@ -187,14 +193,20 @@ func playlistWorker(ch <-chan []protocol.PlaylistSpec) { case <-playbackFinished: isPlaying = false doNext = true + cancel = make(chan bool) case <-timerCh: timer = nil isPlaying = true for _, v := range specs { if v.Id == nextId { - go playPlaylist(v, playbackFinished) + go playPlaylist(v, playbackFinished, cancel) } } + case <-stop: + if isPlaying { + log.Println("Cancelling playlist in progress") + cancel <- true + } } if doNext && !isPlaying { @@ -231,10 +243,10 @@ func playlistWorker(ch <-chan []protocol.PlaylistSpec) { } } -func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) { - // TODO: possibility of on-demand cancellation +func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) { startTime := time.Now() log.Println("Beginning playback of playlist", playlist.Name) +entries: for _, p := range playlist.Entries { // delay var duration time.Duration @@ -248,7 +260,11 @@ func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) Seconds: int(duration.Seconds()), Filename: p.Filename, } - <-time.After(duration) + select { + case <-time.After(duration): + case <-cancel: + break entries + } statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{ Playlist: playlist.Name, @@ -297,7 +313,12 @@ func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) }))) } - <-done + select { + case <-done: + case <-cancel: + ptt.DisengagePTT() + break entries + } ptt.DisengagePTT() } log.Println("Playlist finished", playlist.Name)