]> code.octet-stream.net Git - broadcaster/blob - server/radio_sync.go
66521f8b5a132b304519cb0cce835e5d8241c53a
[broadcaster] / server / radio_sync.go
1 package main
2
3 import (
4 "code.octet-stream.net/broadcaster/internal/protocol"
5 "encoding/json"
6 "golang.org/x/net/websocket"
7 "log"
8 )
9
10 func RadioSync(ws *websocket.Conn) {
11 log.Println("A websocket connected, I think")
12 buf := make([]byte, 16384)
13
14 badRead := false
15 isAuthenticated := false
16 var radio Radio
17 for {
18 // Ignore any massively oversize messages
19 n, err := ws.Read(buf)
20 if err != nil {
21 if radio.Name != "" {
22 log.Println("Lost websocket to radio:", radio.Name)
23 status.RadioDisconnected(radio.Id)
24 } else {
25 log.Println("Lost unauthenticated websocket")
26 }
27 return
28 }
29 if n == len(buf) {
30 badRead = true
31 continue
32 } else if badRead {
33 badRead = false
34 continue
35 }
36
37 t, msg, err := protocol.ParseMessage(buf[:n])
38 if err != nil {
39 log.Println(err)
40 return
41 }
42
43 if !isAuthenticated && t != protocol.AuthenticateType {
44 continue
45 }
46
47 if t == protocol.AuthenticateType && !isAuthenticated {
48 authMsg := msg.(protocol.AuthenticateMessage)
49 r, err := db.GetRadioByToken(authMsg.Token)
50 if err != nil {
51 log.Println("Could not find radio for offered token", authMsg.Token)
52 }
53 radio = r
54 log.Println("Radio authenticated:", radio.Name)
55 isAuthenticated = true
56 commandRouter.AddWebsocket(r.Id, ws)
57 defer commandRouter.RemoveWebsocket(ws)
58
59 go KeepFilesUpdated(ws)
60 go KeepPlaylistsUpdated(ws)
61 }
62
63 if t == protocol.StatusType {
64 statusMsg := msg.(protocol.StatusMessage)
65 log.Println("Received Status from", radio.Name, ":", statusMsg)
66 status.MergeStatus(radio.Id, statusMsg)
67 }
68 }
69 }
70
71 func sendPlaylistsMessageToRadio(ws *websocket.Conn, p []Playlist) error {
72 playlistSpecs := make([]protocol.PlaylistSpec, 0)
73 for _, v := range p {
74 if v.Enabled {
75 entrySpecs := make([]protocol.EntrySpec, 0)
76 for _, e := range db.GetEntriesForPlaylist(v.Id) {
77 entrySpecs = append(entrySpecs, protocol.EntrySpec{Filename: e.Filename, DelaySeconds: e.DelaySeconds, IsRelative: e.IsRelative})
78 }
79 playlistSpecs = append(playlistSpecs, protocol.PlaylistSpec{Id: v.Id, Name: v.Name, StartTime: v.StartTime, Entries: entrySpecs})
80 }
81 }
82 playlists := protocol.PlaylistsMessage{
83 T: protocol.PlaylistsType,
84 Playlists: playlistSpecs,
85 }
86 msg, _ := json.Marshal(playlists)
87 _, err := ws.Write(msg)
88 return err
89 }
90
91 func KeepPlaylistsUpdated(ws *websocket.Conn) {
92 for {
93 p, ch := playlists.WatchForChanges()
94 err := sendPlaylistsMessageToRadio(ws, p)
95 if err != nil {
96 return
97 }
98 <-ch
99 }
100 }
101
102 func sendFilesMessageToRadio(ws *websocket.Conn, f []FileSpec) error {
103 specs := make([]protocol.FileSpec, 0)
104 for _, v := range f {
105 specs = append(specs, protocol.FileSpec{Name: v.Name, Hash: v.Hash})
106 }
107 files := protocol.FilesMessage{
108 T: protocol.FilesType,
109 Files: specs,
110 }
111 msg, _ := json.Marshal(files)
112 _, err := ws.Write(msg)
113 return err
114 }
115
116 func KeepFilesUpdated(ws *websocket.Conn) {
117 for {
118 f, ch := files.WatchForChanges()
119 err := sendFilesMessageToRadio(ws, f)
120 if err != nil {
121 return
122 }
123 <-ch
124 }
125 }