fileSpecChan := make(chan []protocol.FileSpec)
go filesWorker(config.CachePath, fileSpecChan)
+ stop := make(chan bool)
playlistSpecChan := make(chan []protocol.PlaylistSpec)
- go playlistWorker(playlistSpecChan)
+ go playlistWorker(playlistSpecChan, stop)
for {
- runWebsocket(fileSpecChan, playlistSpecChan)
+ runWebsocket(fileSpecChan, playlistSpecChan, stop)
log.Println("Websocket failed, retry in 30 seconds")
time.Sleep(time.Second * time.Duration(30))
}
}
-func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec) error {
+func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error {
log.Println("Establishing websocket connection to:", config.WebsocketURL())
ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL)
if err != nil {
playlistsMsg := msg.(protocol.PlaylistsMessage)
playlistSpecChan <- playlistsMsg.Playlists
}
+
+ if t == protocol.StopType {
+ stop <- true
+ }
}
}
}
}
-func playlistWorker(ch <-chan []protocol.PlaylistSpec) {
+func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) {
var specs []protocol.PlaylistSpec
isPlaying := false
playbackFinished := make(chan error)
+ cancel := make(chan bool)
nextId := 0
var timer *time.Timer
case <-playbackFinished:
isPlaying = false
doNext = true
+ cancel = make(chan bool)
case <-timerCh:
timer = nil
isPlaying = true
for _, v := range specs {
if v.Id == nextId {
- go playPlaylist(v, playbackFinished)
+ go playPlaylist(v, playbackFinished, cancel)
}
}
+ case <-stop:
+ if isPlaying {
+ log.Println("Cancelling playlist in progress")
+ cancel <- true
+ }
}
if doNext && !isPlaying {
}
}
-func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) {
- // TODO: possibility of on-demand cancellation
+func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) {
startTime := time.Now()
log.Println("Beginning playback of playlist", playlist.Name)
+entries:
for _, p := range playlist.Entries {
// delay
var duration time.Duration
Seconds: int(duration.Seconds()),
Filename: p.Filename,
}
- <-time.After(duration)
+ select {
+ case <-time.After(duration):
+ case <-cancel:
+ break entries
+ }
statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{
Playlist: playlist.Name,
})))
}
- <-done
+ select {
+ case <-done:
+ case <-cancel:
+ ptt.DisengagePTT()
+ break entries
+ }
ptt.DisengagePTT()
}
log.Println("Playlist finished", playlist.Name)
InitDatabase()
defer db.CloseDatabase()
+ InitCommandRouter()
InitPlaylists()
InitAudioFiles(config.AudioFilesPath)
InitServerStatus()
http.HandleFunc("/login", logInPage)
http.HandleFunc("/logout", logOutPage)
http.HandleFunc("/secret", secretPage)
+ http.HandleFunc("/stop", stopPage)
http.HandleFunc("/playlist/", playlistSection)
http.HandleFunc("/file/", fileSection)
tmpl := template.Must(template.ParseFiles("templates/logout.html"))
tmpl.Execute(w, nil)
}
+
+func stopPage(w http.ResponseWriter, r *http.Request) {
+ _, err := currentUser(w, r)
+ if err != nil {
+ http.Redirect(w, r, "/login", http.StatusFound)
+ return
+ }
+ r.ParseForm()
+ radioId, err := strconv.Atoi(r.Form.Get("radioId"))
+ if err != nil {
+ http.NotFound(w, r)
+ return
+ }
+ commandRouter.Stop(radioId)
+ http.Redirect(w, r, "/", http.StatusFound)
+}
--- /dev/null
+package main
+
+import (
+ "code.octet-stream.net/broadcaster/protocol"
+ "encoding/json"
+ "golang.org/x/net/websocket"
+ "sync"
+)
+
+type CommandRouter struct {
+ connsMutex sync.Mutex
+ conns map[int]*websocket.Conn
+}
+
+var commandRouter CommandRouter
+
+func InitCommandRouter() {
+ commandRouter.conns = make(map[int]*websocket.Conn)
+}
+
+func (c *CommandRouter) AddWebsocket(radioId int, ws *websocket.Conn) {
+ c.connsMutex.Lock()
+ defer c.connsMutex.Unlock()
+ c.conns[radioId] = ws
+}
+
+func (c *CommandRouter) RemoveWebsocket(ws *websocket.Conn) {
+ c.connsMutex.Lock()
+ defer c.connsMutex.Unlock()
+ key := -1
+ for k, v := range c.conns {
+ if v == ws {
+ key = k
+ }
+ }
+ if key != -1 {
+ delete(c.conns, key)
+ }
+
+}
+
+func (c *CommandRouter) Stop(radioId int) {
+ c.connsMutex.Lock()
+ defer c.connsMutex.Unlock()
+ ws := c.conns[radioId]
+ if ws != nil {
+ stop := protocol.StopMessage{
+ T: protocol.StopType,
+ }
+ msg, _ := json.Marshal(stop)
+ ws.Write(msg)
+ }
+}