]> code.octet-stream.net Git - broadcaster/blob - server/radio_sync.go
Solve race condition when updating files or playlists and notifying radios
[broadcaster] / server / radio_sync.go
1 package main
2
3 import (
4 "code.octet-stream.net/broadcaster/protocol"
5 "encoding/json"
6 "golang.org/x/net/websocket"
7 "log"
8 )
9
10 func RadioSync(ws *websocket.Conn) {
11 log.Println("A websocket connected, I think")
12 buf := make([]byte, 16384)
13
14 badRead := false
15 isAuthenticated := false
16 var radio Radio
17 for {
18 // Ignore any massively oversize messages
19 n, err := ws.Read(buf)
20 if err != nil {
21 if radio.Name != "" {
22 log.Println("Lost websocket to radio:", radio.Name)
23 status.RadioDisconnected(radio.Id)
24 } else {
25 log.Println("Lost unauthenticated websocket")
26 }
27 return
28 }
29 if n == len(buf) {
30 badRead = true
31 continue
32 } else if badRead {
33 badRead = false
34 continue
35 }
36
37 t, msg, err := protocol.ParseMessage(buf[:n])
38 if err != nil {
39 log.Println(err)
40 return
41 }
42
43 if !isAuthenticated && t != protocol.AuthenticateType {
44 continue
45 }
46
47 if t == protocol.AuthenticateType && !isAuthenticated {
48 authMsg := msg.(protocol.AuthenticateMessage)
49 r, err := db.GetRadioByToken(authMsg.Token)
50 if err != nil {
51 log.Println("Could not find radio for offered token", authMsg.Token)
52 }
53 radio = r
54 log.Println("Radio authenticated:", radio.Name)
55 isAuthenticated = true
56
57 go KeepFilesUpdated(ws)
58 go KeepPlaylistsUpdated(ws)
59 }
60
61 if t == protocol.StatusType {
62 statusMsg := msg.(protocol.StatusMessage)
63 log.Println("Received Status from", radio.Name, ":", statusMsg)
64 status.MergeStatus(radio.Id, statusMsg)
65 }
66 }
67 }
68
69 func sendPlaylistsMessageToRadio(ws *websocket.Conn, p []Playlist) error {
70 playlistSpecs := make([]protocol.PlaylistSpec, 0)
71 for _, v := range p {
72 if v.Enabled {
73 entrySpecs := make([]protocol.EntrySpec, 0)
74 for _, e := range db.GetEntriesForPlaylist(v.Id) {
75 entrySpecs = append(entrySpecs, protocol.EntrySpec{Filename: e.Filename, DelaySeconds: e.DelaySeconds, IsRelative: e.IsRelative})
76 }
77 playlistSpecs = append(playlistSpecs, protocol.PlaylistSpec{Id: v.Id, Name: v.Name, StartTime: v.StartTime, Entries: entrySpecs})
78 }
79 }
80 playlists := protocol.PlaylistsMessage{
81 T: protocol.PlaylistsType,
82 Playlists: playlistSpecs,
83 }
84 msg, _ := json.Marshal(playlists)
85 _, err := ws.Write(msg)
86 return err
87 }
88
89 func KeepPlaylistsUpdated(ws *websocket.Conn) {
90 for {
91 p, ch := playlists.WatchForChanges()
92 err := sendPlaylistsMessageToRadio(ws, p)
93 if err != nil {
94 return
95 }
96 <-ch
97 }
98 }
99
100 func sendFilesMessageToRadio(ws *websocket.Conn, f []FileSpec) error {
101 specs := make([]protocol.FileSpec, 0)
102 for _, v := range f {
103 specs = append(specs, protocol.FileSpec{Name: v.Name, Hash: v.Hash})
104 }
105 files := protocol.FilesMessage{
106 T: protocol.FilesType,
107 Files: specs,
108 }
109 msg, _ := json.Marshal(files)
110 _, err := ws.Write(msg)
111 return err
112 }
113
114 func KeepFilesUpdated(ws *websocket.Conn) {
115 for {
116 f, ch := files.WatchForChanges()
117 err := sendFilesMessageToRadio(ws, f)
118 if err != nil {
119 return
120 }
121 <-ch
122 }
123 }