From: Thomas Karpiniec Date: Sat, 19 Oct 2024 02:58:41 +0000 (+1100) Subject: Solve race condition when updating files or playlists and notifying radios X-Git-Tag: v1.0.0~15 X-Git-Url: https://code.octet-stream.net/broadcaster/commitdiff_plain/d14180328760fa282068927068ca5675b9427bc2?hp=c94fef11f43279165f39680fa0b0922c86702687 Solve race condition when updating files or playlists and notifying radios --- diff --git a/server/broadcaster.go b/server/broadcaster.go index 2e09249..d2495fd 100644 --- a/server/broadcaster.go +++ b/server/broadcaster.go @@ -18,9 +18,6 @@ const formatString = "2006-01-02T15:04" var config ServerConfig = NewServerConfig() -// Channel that will be closed and recreated every time the playlists change -var playlistChangeWait = make(chan bool) - func main() { configFlag := flag.String("c", "", "path to configuration file") // TODO: support this @@ -36,6 +33,7 @@ func main() { InitDatabase() defer db.CloseDatabase() + InitPlaylists() InitAudioFiles(config.AudioFilesPath) InitServerStatus() @@ -288,8 +286,7 @@ func submitPlaylist(w http.ResponseWriter, r *http.Request) { } db.SetEntriesForPlaylist(cleanedEntries, id) // Notify connected radios - close(playlistChangeWait) - playlistChangeWait = make(chan bool) + playlists.NotifyChanges() } http.Redirect(w, r, "/playlist/", http.StatusFound) } @@ -302,6 +299,7 @@ func deletePlaylist(w http.ResponseWriter, r *http.Request) { return } db.DeletePlaylist(id) + playlists.NotifyChanges() } http.Redirect(w, r, "/playlist/", http.StatusFound) } diff --git a/server/files.go b/server/files.go index 4d6e293..b452997 100644 --- a/server/files.go +++ b/server/files.go @@ -7,6 +7,7 @@ import ( "log" "os" "path/filepath" + "sync" ) type FileSpec struct { @@ -18,6 +19,7 @@ type AudioFiles struct { path string list []FileSpec changeWait chan bool + filesMutex sync.Mutex } var files AudioFiles @@ -36,6 +38,8 @@ func (r *AudioFiles) Refresh() { log.Println("couldn't read dir", r.path) return } + r.filesMutex.Lock() + defer r.filesMutex.Unlock() r.list = nil for _, file := range entries { f, err := os.Open(filepath.Join(r.path, file.Name())) @@ -57,6 +61,8 @@ func (r *AudioFiles) Path() string { } func (r *AudioFiles) Files() []FileSpec { + r.filesMutex.Lock() + defer r.filesMutex.Unlock() return r.list } @@ -68,6 +74,8 @@ func (r *AudioFiles) Delete(filename string) { } } -func (r *AudioFiles) ChangeChannel() chan bool { - return r.changeWait +func (r *AudioFiles) WatchForChanges() ([]FileSpec, chan bool) { + r.filesMutex.Lock() + defer r.filesMutex.Unlock() + return r.list, r.changeWait } diff --git a/server/playlist.go b/server/playlist.go new file mode 100644 index 0000000..f0a6c89 --- /dev/null +++ b/server/playlist.go @@ -0,0 +1,35 @@ +package main + +import ( + "sync" +) + +type Playlists struct { + changeWait chan bool + playlistMutex sync.Mutex +} + +var playlists Playlists + +func InitPlaylists() { + playlists.changeWait = make(chan bool) +} + +func (p *Playlists) GetPlaylists() []Playlist { + p.playlistMutex.Lock() + defer p.playlistMutex.Unlock() + return db.GetPlaylists() +} + +func (p *Playlists) WatchForChanges() ([]Playlist, chan bool) { + p.playlistMutex.Lock() + defer p.playlistMutex.Unlock() + return db.GetPlaylists(), p.changeWait +} + +func (p *Playlists) NotifyChanges() { + p.playlistMutex.Lock() + defer p.playlistMutex.Unlock() + close(p.changeWait) + p.changeWait = make(chan bool) +} diff --git a/server/radio_sync.go b/server/radio_sync.go index 3dfd4ed..9fd9a65 100644 --- a/server/radio_sync.go +++ b/server/radio_sync.go @@ -55,20 +55,7 @@ func RadioSync(ws *websocket.Conn) { isAuthenticated = true go KeepFilesUpdated(ws) - - // send initial file message - err = sendFilesMessageToRadio(ws) - if err != nil { - return - } - go KeepPlaylistsUpdated(ws) - - // send initial playlists message - err = sendPlaylistsMessageToRadio(ws) - if err != nil { - return - } } if t == protocol.StatusType { @@ -79,9 +66,9 @@ func RadioSync(ws *websocket.Conn) { } } -func sendPlaylistsMessageToRadio(ws *websocket.Conn) error { +func sendPlaylistsMessageToRadio(ws *websocket.Conn, p []Playlist) error { playlistSpecs := make([]protocol.PlaylistSpec, 0) - for _, v := range db.GetPlaylists() { + for _, v := range p { if v.Enabled { entrySpecs := make([]protocol.EntrySpec, 0) for _, e := range db.GetEntriesForPlaylist(v.Id) { @@ -101,17 +88,18 @@ func sendPlaylistsMessageToRadio(ws *websocket.Conn) error { func KeepPlaylistsUpdated(ws *websocket.Conn) { for { - <-playlistChangeWait - err := sendPlaylistsMessageToRadio(ws) + p, ch := playlists.WatchForChanges() + err := sendPlaylistsMessageToRadio(ws, p) if err != nil { return } + <-ch } } -func sendFilesMessageToRadio(ws *websocket.Conn) error { +func sendFilesMessageToRadio(ws *websocket.Conn, f []FileSpec) error { specs := make([]protocol.FileSpec, 0) - for _, v := range files.Files() { + for _, v := range f { specs = append(specs, protocol.FileSpec{Name: v.Name, Hash: v.Hash}) } files := protocol.FilesMessage{ @@ -125,10 +113,11 @@ func sendFilesMessageToRadio(ws *websocket.Conn) error { func KeepFilesUpdated(ws *websocket.Conn) { for { - <-files.ChangeChannel() - err := sendFilesMessageToRadio(ws) + f, ch := files.WatchForChanges() + err := sendFilesMessageToRadio(ws, f) if err != nil { return } + <-ch } }