From: Thomas Karpiniec <tom.karpiniec@outlook.com>
Date: Sun, 20 Oct 2024 10:11:42 +0000 (+1100)
Subject: Allow web user to cancel transmission in progress
X-Git-Tag: v1.0.0~11
X-Git-Url: https://code.octet-stream.net/broadcaster/commitdiff_plain/8320951221d45c5f5f3d387c5cb4b97d9fa2094c

Allow web user to cancel transmission in progress
---

diff --git a/protocol/protocol.go b/protocol/protocol.go
index d168bc3..e23da3a 100644
--- a/protocol/protocol.go
+++ b/protocol/protocol.go
@@ -19,6 +19,7 @@ const (
 
 	FilesType     = "files"
 	PlaylistsType = "playlists"
+	StopType      = "stop"
 
 	// Status values
 
@@ -52,6 +53,11 @@ type PlaylistsMessage struct {
 	Playlists []PlaylistSpec
 }
 
+// Any playlist currently being played should be stopped and PTT disengaged.
+type StopMessage struct {
+	T string
+}
+
 type StatusMessage struct {
 	T string
 
@@ -148,5 +154,14 @@ func ParseMessage(data []byte) (string, interface{}, error) {
 		return t.T, status, nil
 	}
 
+	if t.T == StopType {
+		var stop StopMessage
+		err = json.Unmarshal(data, &stop)
+		if err != nil {
+			return "", nil, err
+		}
+		return t.T, stop, nil
+	}
+
 	return "", nil, errors.New(fmt.Sprintf("unknown message type %v", t.T))
 }
diff --git a/radio/radio.go b/radio/radio.go
index f6b3b50..bcf5e38 100644
--- a/radio/radio.go
+++ b/radio/radio.go
@@ -60,17 +60,18 @@ func main() {
 	fileSpecChan := make(chan []protocol.FileSpec)
 	go filesWorker(config.CachePath, fileSpecChan)
 
+	stop := make(chan bool)
 	playlistSpecChan := make(chan []protocol.PlaylistSpec)
-	go playlistWorker(playlistSpecChan)
+	go playlistWorker(playlistSpecChan, stop)
 
 	for {
-		runWebsocket(fileSpecChan, playlistSpecChan)
+		runWebsocket(fileSpecChan, playlistSpecChan, stop)
 		log.Println("Websocket failed, retry in 30 seconds")
 		time.Sleep(time.Second * time.Duration(30))
 	}
 }
 
-func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec) error {
+func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error {
 	log.Println("Establishing websocket connection to:", config.WebsocketURL())
 	ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL)
 	if err != nil {
@@ -120,6 +121,10 @@ func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan [
 			playlistsMsg := msg.(protocol.PlaylistsMessage)
 			playlistSpecChan <- playlistsMsg.Playlists
 		}
+
+		if t == protocol.StopType {
+			stop <- true
+		}
 	}
 }
 
@@ -167,10 +172,11 @@ func filesWorker(cachePath string, ch chan []protocol.FileSpec) {
 	}
 }
 
-func playlistWorker(ch <-chan []protocol.PlaylistSpec) {
+func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) {
 	var specs []protocol.PlaylistSpec
 	isPlaying := false
 	playbackFinished := make(chan error)
+	cancel := make(chan bool)
 	nextId := 0
 	var timer *time.Timer
 
@@ -187,14 +193,20 @@ func playlistWorker(ch <-chan []protocol.PlaylistSpec) {
 		case <-playbackFinished:
 			isPlaying = false
 			doNext = true
+			cancel = make(chan bool)
 		case <-timerCh:
 			timer = nil
 			isPlaying = true
 			for _, v := range specs {
 				if v.Id == nextId {
-					go playPlaylist(v, playbackFinished)
+					go playPlaylist(v, playbackFinished, cancel)
 				}
 			}
+		case <-stop:
+			if isPlaying {
+				log.Println("Cancelling playlist in progress")
+				cancel <- true
+			}
 		}
 
 		if doNext && !isPlaying {
@@ -231,10 +243,10 @@ func playlistWorker(ch <-chan []protocol.PlaylistSpec) {
 	}
 }
 
-func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error) {
-	// TODO: possibility of on-demand cancellation
+func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) {
 	startTime := time.Now()
 	log.Println("Beginning playback of playlist", playlist.Name)
+entries:
 	for _, p := range playlist.Entries {
 		// delay
 		var duration time.Duration
@@ -248,7 +260,11 @@ func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error)
 			Seconds:  int(duration.Seconds()),
 			Filename: p.Filename,
 		}
-		<-time.After(duration)
+		select {
+		case <-time.After(duration):
+		case <-cancel:
+			break entries
+		}
 
 		statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{
 			Playlist: playlist.Name,
@@ -297,7 +313,12 @@ func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error)
 			})))
 		}
 
-		<-done
+		select {
+		case <-done:
+		case <-cancel:
+			ptt.DisengagePTT()
+			break entries
+		}
 		ptt.DisengagePTT()
 	}
 	log.Println("Playlist finished", playlist.Name)
diff --git a/server/broadcaster.go b/server/broadcaster.go
index d2495fd..e5f9922 100644
--- a/server/broadcaster.go
+++ b/server/broadcaster.go
@@ -33,6 +33,7 @@ func main() {
 	InitDatabase()
 	defer db.CloseDatabase()
 
+	InitCommandRouter()
 	InitPlaylists()
 	InitAudioFiles(config.AudioFilesPath)
 	InitServerStatus()
@@ -41,6 +42,7 @@ func main() {
 	http.HandleFunc("/login", logInPage)
 	http.HandleFunc("/logout", logOutPage)
 	http.HandleFunc("/secret", secretPage)
+	http.HandleFunc("/stop", stopPage)
 
 	http.HandleFunc("/playlist/", playlistSection)
 	http.HandleFunc("/file/", fileSection)
@@ -404,3 +406,19 @@ func logOutPage(w http.ResponseWriter, r *http.Request) {
 	tmpl := template.Must(template.ParseFiles("templates/logout.html"))
 	tmpl.Execute(w, nil)
 }
+
+func stopPage(w http.ResponseWriter, r *http.Request) {
+	_, err := currentUser(w, r)
+	if err != nil {
+		http.Redirect(w, r, "/login", http.StatusFound)
+		return
+	}
+	r.ParseForm()
+	radioId, err := strconv.Atoi(r.Form.Get("radioId"))
+	if err != nil {
+		http.NotFound(w, r)
+		return
+	}
+	commandRouter.Stop(radioId)
+	http.Redirect(w, r, "/", http.StatusFound)
+}
diff --git a/server/command.go b/server/command.go
new file mode 100644
index 0000000..cd7410b
--- /dev/null
+++ b/server/command.go
@@ -0,0 +1,53 @@
+package main
+
+import (
+	"code.octet-stream.net/broadcaster/protocol"
+	"encoding/json"
+	"golang.org/x/net/websocket"
+	"sync"
+)
+
+type CommandRouter struct {
+	connsMutex sync.Mutex
+	conns      map[int]*websocket.Conn
+}
+
+var commandRouter CommandRouter
+
+func InitCommandRouter() {
+	commandRouter.conns = make(map[int]*websocket.Conn)
+}
+
+func (c *CommandRouter) AddWebsocket(radioId int, ws *websocket.Conn) {
+	c.connsMutex.Lock()
+	defer c.connsMutex.Unlock()
+	c.conns[radioId] = ws
+}
+
+func (c *CommandRouter) RemoveWebsocket(ws *websocket.Conn) {
+	c.connsMutex.Lock()
+	defer c.connsMutex.Unlock()
+	key := -1
+	for k, v := range c.conns {
+		if v == ws {
+			key = k
+		}
+	}
+	if key != -1 {
+		delete(c.conns, key)
+	}
+
+}
+
+func (c *CommandRouter) Stop(radioId int) {
+	c.connsMutex.Lock()
+	defer c.connsMutex.Unlock()
+	ws := c.conns[radioId]
+	if ws != nil {
+		stop := protocol.StopMessage{
+			T: protocol.StopType,
+		}
+		msg, _ := json.Marshal(stop)
+		ws.Write(msg)
+	}
+}
diff --git a/server/radio_sync.go b/server/radio_sync.go
index 9fd9a65..2eaef34 100644
--- a/server/radio_sync.go
+++ b/server/radio_sync.go
@@ -53,6 +53,8 @@ func RadioSync(ws *websocket.Conn) {
 			radio = r
 			log.Println("Radio authenticated:", radio.Name)
 			isAuthenticated = true
+			commandRouter.AddWebsocket(r.Id, ws)
+			defer commandRouter.RemoveWebsocket(ws)
 
 			go KeepFilesUpdated(ws)
 			go KeepPlaylistsUpdated(ws)