]> code.octet-stream.net Git - broadcaster/commitdiff
Solve race condition when updating files or playlists and notifying radios
authorThomas Karpiniec <tom.karpiniec@outlook.com>
Sat, 19 Oct 2024 02:58:41 +0000 (13:58 +1100)
committerThomas Karpiniec <tom.karpiniec@outlook.com>
Sat, 19 Oct 2024 02:58:41 +0000 (13:58 +1100)
server/broadcaster.go
server/files.go
server/playlist.go [new file with mode: 0644]
server/radio_sync.go

index 2e09249dfb40b64fe7f666fbe0ee9328bf00302f..d2495fd45e8d8d386197d56b7a21ff2d09769387 100644 (file)
@@ -18,9 +18,6 @@ const formatString = "2006-01-02T15:04"
 
 var config ServerConfig = NewServerConfig()
 
 
 var config ServerConfig = NewServerConfig()
 
-// Channel that will be closed and recreated every time the playlists change
-var playlistChangeWait = make(chan bool)
-
 func main() {
        configFlag := flag.String("c", "", "path to configuration file")
        // TODO: support this
 func main() {
        configFlag := flag.String("c", "", "path to configuration file")
        // TODO: support this
@@ -36,6 +33,7 @@ func main() {
        InitDatabase()
        defer db.CloseDatabase()
 
        InitDatabase()
        defer db.CloseDatabase()
 
+       InitPlaylists()
        InitAudioFiles(config.AudioFilesPath)
        InitServerStatus()
 
        InitAudioFiles(config.AudioFilesPath)
        InitServerStatus()
 
@@ -288,8 +286,7 @@ func submitPlaylist(w http.ResponseWriter, r *http.Request) {
                }
                db.SetEntriesForPlaylist(cleanedEntries, id)
                // Notify connected radios
                }
                db.SetEntriesForPlaylist(cleanedEntries, id)
                // Notify connected radios
-               close(playlistChangeWait)
-               playlistChangeWait = make(chan bool)
+               playlists.NotifyChanges()
        }
        http.Redirect(w, r, "/playlist/", http.StatusFound)
 }
        }
        http.Redirect(w, r, "/playlist/", http.StatusFound)
 }
@@ -302,6 +299,7 @@ func deletePlaylist(w http.ResponseWriter, r *http.Request) {
                        return
                }
                db.DeletePlaylist(id)
                        return
                }
                db.DeletePlaylist(id)
+               playlists.NotifyChanges()
        }
        http.Redirect(w, r, "/playlist/", http.StatusFound)
 }
        }
        http.Redirect(w, r, "/playlist/", http.StatusFound)
 }
index 4d6e2939912f76afc8d5760b2c741d788f68fd6e..b452997ef8248312f4ac139942cbddcd9f57354c 100644 (file)
@@ -7,6 +7,7 @@ import (
        "log"
        "os"
        "path/filepath"
        "log"
        "os"
        "path/filepath"
+       "sync"
 )
 
 type FileSpec struct {
 )
 
 type FileSpec struct {
@@ -18,6 +19,7 @@ type AudioFiles struct {
        path       string
        list       []FileSpec
        changeWait chan bool
        path       string
        list       []FileSpec
        changeWait chan bool
+       filesMutex sync.Mutex
 }
 
 var files AudioFiles
 }
 
 var files AudioFiles
@@ -36,6 +38,8 @@ func (r *AudioFiles) Refresh() {
                log.Println("couldn't read dir", r.path)
                return
        }
                log.Println("couldn't read dir", r.path)
                return
        }
+       r.filesMutex.Lock()
+       defer r.filesMutex.Unlock()
        r.list = nil
        for _, file := range entries {
                f, err := os.Open(filepath.Join(r.path, file.Name()))
        r.list = nil
        for _, file := range entries {
                f, err := os.Open(filepath.Join(r.path, file.Name()))
@@ -57,6 +61,8 @@ func (r *AudioFiles) Path() string {
 }
 
 func (r *AudioFiles) Files() []FileSpec {
 }
 
 func (r *AudioFiles) Files() []FileSpec {
+       r.filesMutex.Lock()
+       defer r.filesMutex.Unlock()
        return r.list
 }
 
        return r.list
 }
 
@@ -68,6 +74,8 @@ func (r *AudioFiles) Delete(filename string) {
        }
 }
 
        }
 }
 
-func (r *AudioFiles) ChangeChannel() chan bool {
-       return r.changeWait
+func (r *AudioFiles) WatchForChanges() ([]FileSpec, chan bool) {
+       r.filesMutex.Lock()
+       defer r.filesMutex.Unlock()
+       return r.list, r.changeWait
 }
 }
diff --git a/server/playlist.go b/server/playlist.go
new file mode 100644 (file)
index 0000000..f0a6c89
--- /dev/null
@@ -0,0 +1,35 @@
+package main
+
+import (
+       "sync"
+)
+
+type Playlists struct {
+       changeWait    chan bool
+       playlistMutex sync.Mutex
+}
+
+var playlists Playlists
+
+func InitPlaylists() {
+       playlists.changeWait = make(chan bool)
+}
+
+func (p *Playlists) GetPlaylists() []Playlist {
+       p.playlistMutex.Lock()
+       defer p.playlistMutex.Unlock()
+       return db.GetPlaylists()
+}
+
+func (p *Playlists) WatchForChanges() ([]Playlist, chan bool) {
+       p.playlistMutex.Lock()
+       defer p.playlistMutex.Unlock()
+       return db.GetPlaylists(), p.changeWait
+}
+
+func (p *Playlists) NotifyChanges() {
+       p.playlistMutex.Lock()
+       defer p.playlistMutex.Unlock()
+       close(p.changeWait)
+       p.changeWait = make(chan bool)
+}
index 3dfd4ed5395c008b6cdfe9889fda4d8e6de240ae..9fd9a658a834622b8b4a02b560e963b01a231445 100644 (file)
@@ -55,20 +55,7 @@ func RadioSync(ws *websocket.Conn) {
                        isAuthenticated = true
 
                        go KeepFilesUpdated(ws)
                        isAuthenticated = true
 
                        go KeepFilesUpdated(ws)
-
-                       // send initial file message
-                       err = sendFilesMessageToRadio(ws)
-                       if err != nil {
-                               return
-                       }
-
                        go KeepPlaylistsUpdated(ws)
                        go KeepPlaylistsUpdated(ws)
-
-                       // send initial playlists message
-                       err = sendPlaylistsMessageToRadio(ws)
-                       if err != nil {
-                               return
-                       }
                }
 
                if t == protocol.StatusType {
                }
 
                if t == protocol.StatusType {
@@ -79,9 +66,9 @@ func RadioSync(ws *websocket.Conn) {
        }
 }
 
        }
 }
 
-func sendPlaylistsMessageToRadio(ws *websocket.Conn) error {
+func sendPlaylistsMessageToRadio(ws *websocket.Conn, p []Playlist) error {
        playlistSpecs := make([]protocol.PlaylistSpec, 0)
        playlistSpecs := make([]protocol.PlaylistSpec, 0)
-       for _, v := range db.GetPlaylists() {
+       for _, v := range p {
                if v.Enabled {
                        entrySpecs := make([]protocol.EntrySpec, 0)
                        for _, e := range db.GetEntriesForPlaylist(v.Id) {
                if v.Enabled {
                        entrySpecs := make([]protocol.EntrySpec, 0)
                        for _, e := range db.GetEntriesForPlaylist(v.Id) {
@@ -101,17 +88,18 @@ func sendPlaylistsMessageToRadio(ws *websocket.Conn) error {
 
 func KeepPlaylistsUpdated(ws *websocket.Conn) {
        for {
 
 func KeepPlaylistsUpdated(ws *websocket.Conn) {
        for {
-               <-playlistChangeWait
-               err := sendPlaylistsMessageToRadio(ws)
+               p, ch := playlists.WatchForChanges()
+               err := sendPlaylistsMessageToRadio(ws, p)
                if err != nil {
                        return
                }
                if err != nil {
                        return
                }
+               <-ch
        }
 }
 
        }
 }
 
-func sendFilesMessageToRadio(ws *websocket.Conn) error {
+func sendFilesMessageToRadio(ws *websocket.Conn, f []FileSpec) error {
        specs := make([]protocol.FileSpec, 0)
        specs := make([]protocol.FileSpec, 0)
-       for _, v := range files.Files() {
+       for _, v := range f {
                specs = append(specs, protocol.FileSpec{Name: v.Name, Hash: v.Hash})
        }
        files := protocol.FilesMessage{
                specs = append(specs, protocol.FileSpec{Name: v.Name, Hash: v.Hash})
        }
        files := protocol.FilesMessage{
@@ -125,10 +113,11 @@ func sendFilesMessageToRadio(ws *websocket.Conn) error {
 
 func KeepFilesUpdated(ws *websocket.Conn) {
        for {
 
 func KeepFilesUpdated(ws *websocket.Conn) {
        for {
-               <-files.ChangeChannel()
-               err := sendFilesMessageToRadio(ws)
+               f, ch := files.WatchForChanges()
+               err := sendFilesMessageToRadio(ws, f)
                if err != nil {
                        return
                }
                if err != nil {
                        return
                }
+               <-ch
        }
 }
        }
 }