From 8320951221d45c5f5f3d387c5cb4b97d9fa2094c Mon Sep 17 00:00:00 2001 From: Thomas Karpiniec Date: Sun, 20 Oct 2024 21:11:42 +1100 Subject: [PATCH] Allow web user to cancel transmission in progress --- protocol/protocol.go | 15 ++++++++++++ radio/radio.go | 39 +++++++++++++++++++++++-------- server/broadcaster.go | 18 +++++++++++++++ server/command.go | 53 +++++++++++++++++++++++++++++++++++++++++++ server/radio_sync.go | 2 ++ 5 files changed, 118 insertions(+), 9 deletions(-) create mode 100644 server/command.go diff --git a/protocol/protocol.go b/protocol/protocol.go index d168bc3..e23da3a 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -19,6 +19,7 @@ const ( FilesType = "files" PlaylistsType = "playlists" + StopType = "stop" // Status values @@ -52,6 +53,11 @@ type PlaylistsMessage struct { Playlists []PlaylistSpec } +// Any playlist currently being played should be stopped and PTT disengaged. +type StopMessage struct { + T string +} + type StatusMessage struct { T string @@ -148,5 +154,14 @@ func ParseMessage(data []byte) (string, interface{}, error) { return t.T, status, nil } + if t.T == StopType { + var stop StopMessage + err = json.Unmarshal(data, &stop) + if err != nil { + return "", nil, err + } + return t.T, stop, nil + } + return "", nil, errors.New(fmt.Sprintf("unknown message type %v", t.T)) } diff --git a/radio/radio.go b/radio/radio.go index f6b3b50..bcf5e38 100644 --- a/radio/radio.go +++ b/radio/radio.go @@ -60,17 +60,18 @@ func main() { fileSpecChan := make(chan []protocol.FileSpec) go filesWorker(config.CachePath, fileSpecChan) + stop := make(chan bool) playlistSpecChan := make(chan []protocol.PlaylistSpec) - go playlistWorker(playlistSpecChan) + go playlistWorker(playlistSpecChan, stop) for { - runWebsocket(fileSpecChan, playlistSpecChan) + runWebsocket(fileSpecChan, playlistSpecChan, stop) log.Println("Websocket failed, retry in 30 seconds") time.Sleep(time.Second * time.Duration(30)) } } -func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec) error { +func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error { log.Println("Establishing websocket connection to:", config.WebsocketURL()) ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL) if err != nil { @@ -120,6 +121,10 @@ func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan [ playlistsMsg := msg.(protocol.PlaylistsMessage) playlistSpecChan <- playlistsMsg.Playlists } + + if t == protocol.StopType { + stop <- true + } } } @@ -167,10 +172,11 @@ func filesWorker(cachePath string, ch chan []protocol.FileSpec) { } } -func playlistWorker(ch <-chan []protocol.PlaylistSpec) { +func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) { var specs []protocol.PlaylistSpec isPlaying := false playbackFinished := make(chan error) + cancel := make(chan bool) nextId := 0 var timer *time.Timer @@ -187,14 +193,20 @@ func playlistWorker(ch <-chan []protocol.PlaylistSpec) { case <-playbackFinished: isPlaying = false doNext = true + cancel = make(chan bool) case <-timerCh: timer = nil isPlaying = true for _, v := range specs { if v.Id == nextId { - go playPlaylist(v, playbackFinished) + go playPlaylist(v, playbackFinished, cancel) } } + case <-stop: + if isPlaying { + log.Println("Cancelling playlist in progress") + cancel <- true + } } if doNext && !isPlaying { @@ -231,10 +243,10 @@ func playlistWorker(ch <-chan []protocol.PlaylistSpec) { } } -func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) { - // TODO: possibility of on-demand cancellation +func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) { startTime := time.Now() log.Println("Beginning playback of playlist", playlist.Name) +entries: for _, p := range playlist.Entries { // delay var duration time.Duration @@ -248,7 +260,11 @@ func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) Seconds: int(duration.Seconds()), Filename: p.Filename, } - <-time.After(duration) + select { + case <-time.After(duration): + case <-cancel: + break entries + } statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{ Playlist: playlist.Name, @@ -297,7 +313,12 @@ func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) }))) } - <-done + select { + case <-done: + case <-cancel: + ptt.DisengagePTT() + break entries + } ptt.DisengagePTT() } log.Println("Playlist finished", playlist.Name) diff --git a/server/broadcaster.go b/server/broadcaster.go index d2495fd..e5f9922 100644 --- a/server/broadcaster.go +++ b/server/broadcaster.go @@ -33,6 +33,7 @@ func main() { InitDatabase() defer db.CloseDatabase() + InitCommandRouter() InitPlaylists() InitAudioFiles(config.AudioFilesPath) InitServerStatus() @@ -41,6 +42,7 @@ func main() { http.HandleFunc("/login", logInPage) http.HandleFunc("/logout", logOutPage) http.HandleFunc("/secret", secretPage) + http.HandleFunc("/stop", stopPage) http.HandleFunc("/playlist/", playlistSection) http.HandleFunc("/file/", fileSection) @@ -404,3 +406,19 @@ func logOutPage(w http.ResponseWriter, r *http.Request) { tmpl := template.Must(template.ParseFiles("templates/logout.html")) tmpl.Execute(w, nil) } + +func stopPage(w http.ResponseWriter, r *http.Request) { + _, err := currentUser(w, r) + if err != nil { + http.Redirect(w, r, "/login", http.StatusFound) + return + } + r.ParseForm() + radioId, err := strconv.Atoi(r.Form.Get("radioId")) + if err != nil { + http.NotFound(w, r) + return + } + commandRouter.Stop(radioId) + http.Redirect(w, r, "/", http.StatusFound) +} diff --git a/server/command.go b/server/command.go new file mode 100644 index 0000000..cd7410b --- /dev/null +++ b/server/command.go @@ -0,0 +1,53 @@ +package main + +import ( + "code.octet-stream.net/broadcaster/protocol" + "encoding/json" + "golang.org/x/net/websocket" + "sync" +) + +type CommandRouter struct { + connsMutex sync.Mutex + conns map[int]*websocket.Conn +} + +var commandRouter CommandRouter + +func InitCommandRouter() { + commandRouter.conns = make(map[int]*websocket.Conn) +} + +func (c *CommandRouter) AddWebsocket(radioId int, ws *websocket.Conn) { + c.connsMutex.Lock() + defer c.connsMutex.Unlock() + c.conns[radioId] = ws +} + +func (c *CommandRouter) RemoveWebsocket(ws *websocket.Conn) { + c.connsMutex.Lock() + defer c.connsMutex.Unlock() + key := -1 + for k, v := range c.conns { + if v == ws { + key = k + } + } + if key != -1 { + delete(c.conns, key) + } + +} + +func (c *CommandRouter) Stop(radioId int) { + c.connsMutex.Lock() + defer c.connsMutex.Unlock() + ws := c.conns[radioId] + if ws != nil { + stop := protocol.StopMessage{ + T: protocol.StopType, + } + msg, _ := json.Marshal(stop) + ws.Write(msg) + } +} diff --git a/server/radio_sync.go b/server/radio_sync.go index 9fd9a65..2eaef34 100644 --- a/server/radio_sync.go +++ b/server/radio_sync.go @@ -53,6 +53,8 @@ func RadioSync(ws *websocket.Conn) { radio = r log.Println("Radio authenticated:", radio.Name) isAuthenticated = true + commandRouter.AddWebsocket(r.Id, ws) + defer commandRouter.RemoveWebsocket(ws) go KeepFilesUpdated(ws) go KeepPlaylistsUpdated(ws) -- 2.39.5