]> code.octet-stream.net Git - broadcaster/commitdiff
Allow web user to cancel transmission in progress
authorThomas Karpiniec <tom.karpiniec@outlook.com>
Sun, 20 Oct 2024 10:11:42 +0000 (21:11 +1100)
committerThomas Karpiniec <tom.karpiniec@outlook.com>
Sun, 20 Oct 2024 10:11:42 +0000 (21:11 +1100)
protocol/protocol.go
radio/radio.go
server/broadcaster.go
server/command.go [new file with mode: 0644]
server/radio_sync.go

index d168bc36ba3e115f1fec63293e8410f5d2559c3d..e23da3a29486d91eabbf40244abef38345a77c1d 100644 (file)
@@ -19,6 +19,7 @@ const (
 
        FilesType     = "files"
        PlaylistsType = "playlists"
+       StopType      = "stop"
 
        // Status values
 
@@ -52,6 +53,11 @@ type PlaylistsMessage struct {
        Playlists []PlaylistSpec
 }
 
+// Any playlist currently being played should be stopped and PTT disengaged.
+type StopMessage struct {
+       T string
+}
+
 type StatusMessage struct {
        T string
 
@@ -148,5 +154,14 @@ func ParseMessage(data []byte) (string, interface{}, error) {
                return t.T, status, nil
        }
 
+       if t.T == StopType {
+               var stop StopMessage
+               err = json.Unmarshal(data, &stop)
+               if err != nil {
+                       return "", nil, err
+               }
+               return t.T, stop, nil
+       }
+
        return "", nil, errors.New(fmt.Sprintf("unknown message type %v", t.T))
 }
index f6b3b50afd08aa190b1413b6b608f588c0525576..bcf5e38d7542a8f6c545147e274bdf02ab1d4bbd 100644 (file)
@@ -60,17 +60,18 @@ func main() {
        fileSpecChan := make(chan []protocol.FileSpec)
        go filesWorker(config.CachePath, fileSpecChan)
 
+       stop := make(chan bool)
        playlistSpecChan := make(chan []protocol.PlaylistSpec)
-       go playlistWorker(playlistSpecChan)
+       go playlistWorker(playlistSpecChan, stop)
 
        for {
-               runWebsocket(fileSpecChan, playlistSpecChan)
+               runWebsocket(fileSpecChan, playlistSpecChan, stop)
                log.Println("Websocket failed, retry in 30 seconds")
                time.Sleep(time.Second * time.Duration(30))
        }
 }
 
-func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec) error {
+func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error {
        log.Println("Establishing websocket connection to:", config.WebsocketURL())
        ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL)
        if err != nil {
@@ -120,6 +121,10 @@ func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan [
                        playlistsMsg := msg.(protocol.PlaylistsMessage)
                        playlistSpecChan <- playlistsMsg.Playlists
                }
+
+               if t == protocol.StopType {
+                       stop <- true
+               }
        }
 }
 
@@ -167,10 +172,11 @@ func filesWorker(cachePath string, ch chan []protocol.FileSpec) {
        }
 }
 
-func playlistWorker(ch <-chan []protocol.PlaylistSpec) {
+func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) {
        var specs []protocol.PlaylistSpec
        isPlaying := false
        playbackFinished := make(chan error)
+       cancel := make(chan bool)
        nextId := 0
        var timer *time.Timer
 
@@ -187,14 +193,20 @@ func playlistWorker(ch <-chan []protocol.PlaylistSpec) {
                case <-playbackFinished:
                        isPlaying = false
                        doNext = true
+                       cancel = make(chan bool)
                case <-timerCh:
                        timer = nil
                        isPlaying = true
                        for _, v := range specs {
                                if v.Id == nextId {
-                                       go playPlaylist(v, playbackFinished)
+                                       go playPlaylist(v, playbackFinished, cancel)
                                }
                        }
+               case <-stop:
+                       if isPlaying {
+                               log.Println("Cancelling playlist in progress")
+                               cancel <- true
+                       }
                }
 
                if doNext && !isPlaying {
@@ -231,10 +243,10 @@ func playlistWorker(ch <-chan []protocol.PlaylistSpec) {
        }
 }
 
-func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) {
-       // TODO: possibility of on-demand cancellation
+func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) {
        startTime := time.Now()
        log.Println("Beginning playback of playlist", playlist.Name)
+entries:
        for _, p := range playlist.Entries {
                // delay
                var duration time.Duration
@@ -248,7 +260,11 @@ func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error)
                        Seconds:  int(duration.Seconds()),
                        Filename: p.Filename,
                }
-               <-time.After(duration)
+               select {
+               case <-time.After(duration):
+               case <-cancel:
+                       break entries
+               }
 
                statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{
                        Playlist: playlist.Name,
@@ -297,7 +313,12 @@ func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error)
                        })))
                }
 
-               <-done
+               select {
+               case <-done:
+               case <-cancel:
+                       ptt.DisengagePTT()
+                       break entries
+               }
                ptt.DisengagePTT()
        }
        log.Println("Playlist finished", playlist.Name)
index d2495fd45e8d8d386197d56b7a21ff2d09769387..e5f992243568cd09f3d0fb842029baf5b177c62e 100644 (file)
@@ -33,6 +33,7 @@ func main() {
        InitDatabase()
        defer db.CloseDatabase()
 
+       InitCommandRouter()
        InitPlaylists()
        InitAudioFiles(config.AudioFilesPath)
        InitServerStatus()
@@ -41,6 +42,7 @@ func main() {
        http.HandleFunc("/login", logInPage)
        http.HandleFunc("/logout", logOutPage)
        http.HandleFunc("/secret", secretPage)
+       http.HandleFunc("/stop", stopPage)
 
        http.HandleFunc("/playlist/", playlistSection)
        http.HandleFunc("/file/", fileSection)
@@ -404,3 +406,19 @@ func logOutPage(w http.ResponseWriter, r *http.Request) {
        tmpl := template.Must(template.ParseFiles("templates/logout.html"))
        tmpl.Execute(w, nil)
 }
+
+func stopPage(w http.ResponseWriter, r *http.Request) {
+       _, err := currentUser(w, r)
+       if err != nil {
+               http.Redirect(w, r, "/login", http.StatusFound)
+               return
+       }
+       r.ParseForm()
+       radioId, err := strconv.Atoi(r.Form.Get("radioId"))
+       if err != nil {
+               http.NotFound(w, r)
+               return
+       }
+       commandRouter.Stop(radioId)
+       http.Redirect(w, r, "/", http.StatusFound)
+}
diff --git a/server/command.go b/server/command.go
new file mode 100644 (file)
index 0000000..cd7410b
--- /dev/null
@@ -0,0 +1,53 @@
+package main
+
+import (
+       "code.octet-stream.net/broadcaster/protocol"
+       "encoding/json"
+       "golang.org/x/net/websocket"
+       "sync"
+)
+
+type CommandRouter struct {
+       connsMutex sync.Mutex
+       conns      map[int]*websocket.Conn
+}
+
+var commandRouter CommandRouter
+
+func InitCommandRouter() {
+       commandRouter.conns = make(map[int]*websocket.Conn)
+}
+
+func (c *CommandRouter) AddWebsocket(radioId int, ws *websocket.Conn) {
+       c.connsMutex.Lock()
+       defer c.connsMutex.Unlock()
+       c.conns[radioId] = ws
+}
+
+func (c *CommandRouter) RemoveWebsocket(ws *websocket.Conn) {
+       c.connsMutex.Lock()
+       defer c.connsMutex.Unlock()
+       key := -1
+       for k, v := range c.conns {
+               if v == ws {
+                       key = k
+               }
+       }
+       if key != -1 {
+               delete(c.conns, key)
+       }
+
+}
+
+func (c *CommandRouter) Stop(radioId int) {
+       c.connsMutex.Lock()
+       defer c.connsMutex.Unlock()
+       ws := c.conns[radioId]
+       if ws != nil {
+               stop := protocol.StopMessage{
+                       T: protocol.StopType,
+               }
+               msg, _ := json.Marshal(stop)
+               ws.Write(msg)
+       }
+}
index 9fd9a658a834622b8b4a02b560e963b01a231445..2eaef340fdf6bec1d9a430ca5e4447f5b588f33e 100644 (file)
@@ -53,6 +53,8 @@ func RadioSync(ws *websocket.Conn) {
                        radio = r
                        log.Println("Radio authenticated:", radio.Name)
                        isAuthenticated = true
+                       commandRouter.AddWebsocket(r.Id, ws)
+                       defer commandRouter.RemoveWebsocket(ws)
 
                        go KeepFilesUpdated(ws)
                        go KeepPlaylistsUpdated(ws)