From: Thomas Karpiniec Date: Tue, 22 Oct 2024 08:36:37 +0000 (+1100) Subject: Add licence, etc. X-Git-Tag: v1.0.0~10 X-Git-Url: https://code.octet-stream.net/broadcaster/commitdiff_plain/33a19d553807d171f6ba9f4dafe30f43bc4bab5e?ds=sidebyside Add licence, etc. --- diff --git a/.gitignore b/.gitignore index c2f7731..b657038 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,2 @@ -audio -broadcaster-server -broadcaster-radio -test-server.conf -test-radio.conf -test.db* +build +*.kate-swp diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5dacd4d --- /dev/null +++ b/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (c) 2024 Thomas Karpiniec + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/broadcaster-radio/config.go b/broadcaster-radio/config.go new file mode 100644 index 0000000..6837b7d --- /dev/null +++ b/broadcaster-radio/config.go @@ -0,0 +1,70 @@ +package main + +import ( + "errors" + "log" + "os" + "strings" + + "github.com/BurntSushi/toml" +) + +type RadioConfig struct { + GpioDevice string + PTTPin int + COSPin int + ServerURL string + Token string + CachePath string + TimeZone string +} + +func NewRadioConfig() RadioConfig { + return RadioConfig{ + GpioDevice: "gpiochip0", + PTTPin: -1, + COSPin: -1, + ServerURL: "", + Token: "", + CachePath: "", + TimeZone: "Australia/Hobart", + } +} + +func (c *RadioConfig) LoadFromFile(path string) { + _, err := toml.DecodeFile(path, &c) + if err != nil { + log.Fatal("could not read config file for reading at path:", path, err) + } + err = c.Validate() + if err != nil { + log.Fatal(err) + } + c.ApplyDefaults() +} + +func (c *RadioConfig) Validate() error { + if c.ServerURL == "" { + return errors.New("ServerURL must be provided in the configuration") + } + if c.Token == "" { + return errors.New("Token must be provided in the configuration") + } + return nil +} + +func (c *RadioConfig) ApplyDefaults() { + if c.CachePath == "" { + dir, err := os.MkdirTemp("", "broadcast") + if err != nil { + log.Fatal(err) + } + c.CachePath = dir + } +} + +func (c *RadioConfig) WebsocketURL() string { + addr := strings.Replace(c.ServerURL, "https://", "wss://", -1) + addr = strings.Replace(addr, "http://", "ws://", -1) + return addr + "/radio-ws" +} diff --git a/broadcaster-radio/files_machine.go b/broadcaster-radio/files_machine.go new file mode 100644 index 0000000..143397a --- /dev/null +++ b/broadcaster-radio/files_machine.go @@ -0,0 +1,117 @@ +package main + +import ( + "code.octet-stream.net/broadcaster/internal/protocol" + "crypto/sha256" + "encoding/hex" + "io" + "log" + "net/http" + "os" + "path/filepath" +) + +type FilesMachine struct { + specs []protocol.FileSpec + cachePath string + missing []string +} + +func NewFilesMachine(cachePath string) FilesMachine { + if err := os.MkdirAll(cachePath, 0750); err != nil { + log.Fatal(err) + } + return FilesMachine{ + cachePath: cachePath, + } +} + +func (m *FilesMachine) UpdateSpecs(specs []protocol.FileSpec) { + m.specs = specs + m.RefreshMissing() +} + +func (m *FilesMachine) RefreshMissing() { + // Delete any files in the cache dir who are not in the spec + entries, err := os.ReadDir(m.cachePath) + if err != nil { + log.Fatal(err) + } + okay := make([]string, 0) + for _, file := range entries { + hash := "" + for _, spec := range m.specs { + if file.Name() == spec.Name { + hash = spec.Hash + break + } + } + // if we have an extraneous file, delete it + if hash == "" { + log.Println("Deleting extraneous cached audio file:", file.Name()) + os.Remove(filepath.Join(m.cachePath, file.Name())) + continue + } + // if the hash isn't right, delete it + f, err := os.Open(filepath.Join(m.cachePath, file.Name())) + if err != nil { + log.Fatal(err) + } + hasher := sha256.New() + io.Copy(hasher, f) + if hex.EncodeToString(hasher.Sum(nil)) != hash { + log.Println("Deleting cached audio file with incorrect hash:", file.Name()) + os.Remove(filepath.Join(m.cachePath, file.Name())) + } else { + okay = append(okay, file.Name()) + } + } + m.missing = nil + for _, spec := range m.specs { + missing := true + for _, file := range okay { + if spec.Name == file { + missing = false + } + } + if missing { + m.missing = append(m.missing, spec.Name) + } + } + if len(m.missing) > 1 { + log.Println(len(m.missing), "missing files") + } else if len(m.missing) == 1 { + log.Println("1 missing file") + } else { + log.Println("All files are in sync with server") + } + statusCollector.FilesInSync <- len(m.missing) == 0 +} + +func (m *FilesMachine) IsCacheComplete() bool { + return len(m.missing) == 0 +} + +func (m *FilesMachine) NextFile() string { + next, remainder := m.missing[0], m.missing[1:] + m.missing = remainder + return next +} + +func (m *FilesMachine) DownloadSingle(filename string, downloadResult chan<- error) { + log.Println("Downloading", filename) + out, err := os.Create(filepath.Join(m.cachePath, filename)) + if err != nil { + downloadResult <- err + return + } + defer out.Close() + resp, err := http.Get(config.ServerURL + "/file-downloads/" + filename) + if err != nil { + downloadResult <- err + return + } + defer resp.Body.Close() + _, err = io.Copy(out, resp.Body) + downloadResult <- err +} diff --git a/broadcaster-radio/gpio.go b/broadcaster-radio/gpio.go new file mode 100644 index 0000000..69807ac --- /dev/null +++ b/broadcaster-radio/gpio.go @@ -0,0 +1,123 @@ +package main + +import ( + gpio "github.com/warthog618/go-gpiocdev" + "github.com/warthog618/go-gpiocdev/device/rpi" + "log" + "strconv" +) + +type PTT interface { + EngagePTT() + DisengagePTT() +} + +type COS interface { + WaitForChannelClear() + COSValue() bool +} + +var ptt PTT = &DefaultPTT{} +var cos COS = &DefaultCOS{} + +type PiPTT struct { + pttLine *gpio.Line +} + +type PiCOS struct { + cosLine *gpio.Line + clearWait chan bool +} + +func InitRaspberryPiPTT(pttNum int, chipName string) { + pttPin, err := rpi.Pin("GPIO" + strconv.Itoa(pttNum)) + if err != nil { + log.Fatal("invalid PTT pin configured", ptt) + } + pttLine, err := gpio.RequestLine(chipName, pttPin, gpio.AsOutput(0)) + if err != nil { + log.Fatal("unable to open requested pin for PTT GPIO:", ptt, ". Are you running as root?") + } + ptt = &PiPTT{ + pttLine: pttLine, + } +} + +func InitRaspberryPiCOS(cosNum int, chipName string) { + var piCOS PiCOS + piCOS.clearWait = make(chan bool) + cosPin, err := rpi.Pin("GPIO" + strconv.Itoa(cosNum)) + if err != nil { + log.Fatal("invalid COS Pin configured", cos) + } + cosHandler := func(event gpio.LineEvent) { + if event.Type == gpio.LineEventFallingEdge { + log.Println("COS: channel clear") + close(piCOS.clearWait) + piCOS.clearWait = make(chan bool) + statusCollector.COS <- false + } + if event.Type == gpio.LineEventRisingEdge { + log.Println("COS: channel in use") + statusCollector.COS <- true + } + } + cosLine, err := gpio.RequestLine(chipName, cosPin, gpio.AsInput, gpio.WithBothEdges, gpio.WithEventHandler(cosHandler)) + if err != nil { + log.Fatal("unable to open requested pin for COS GPIO:", cos, ". Are you running as root?") + } + piCOS.cosLine = cosLine + cos = &piCOS +} + +func (g *PiCOS) COSValue() bool { + val, err := g.cosLine.Value() + if err != nil { + log.Fatal("Unable to read COS value") + } + return val != 0 +} + +func (g *PiCOS) WaitForChannelClear() { + ch := g.clearWait + val, err := g.cosLine.Value() + if err != nil || val == 0 { + return + } + // wait for close + <-ch +} + +func (g *PiPTT) EngagePTT() { + log.Println("PTT: on") + g.pttLine.SetValue(1) + statusCollector.PTT <- true +} + +func (g *PiPTT) DisengagePTT() { + log.Println("PTT: off") + g.pttLine.SetValue(0) + statusCollector.PTT <- false +} + +type DefaultPTT struct { +} + +func (g *DefaultPTT) EngagePTT() { + statusCollector.PTT <- true +} + +func (g *DefaultPTT) DisengagePTT() { + statusCollector.PTT <- false +} + +type DefaultCOS struct { +} + +func (g *DefaultCOS) WaitForChannelClear() { + log.Println("Assuming channel is clear since COS GPIO is not configured") +} + +func (g *DefaultCOS) COSValue() bool { + return false +} diff --git a/broadcaster-radio/main.go b/broadcaster-radio/main.go new file mode 100644 index 0000000..6e31772 --- /dev/null +++ b/broadcaster-radio/main.go @@ -0,0 +1,334 @@ +package main + +import ( + "code.octet-stream.net/broadcaster/internal/protocol" + "encoding/json" + "flag" + "fmt" + "github.com/gopxl/beep/v2" + "github.com/gopxl/beep/v2/mp3" + "github.com/gopxl/beep/v2/speaker" + "github.com/gopxl/beep/v2/wav" + "golang.org/x/net/websocket" + "log" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + "time" +) + +const version = "v1.0.0" +const sampleRate = 44100 + +var config RadioConfig = NewRadioConfig() + +func main() { + configFlag := flag.String("c", "", "path to configuration file") + versionFlag := flag.Bool("v", false, "print version and exit") + flag.Parse() + + if *versionFlag { + fmt.Println("Broadcaster Radio", version) + os.Exit(0) + } + if *configFlag == "" { + log.Fatal("must specify a configuration file with -c") + } + + log.Println("Broadcaster Radio", version, "starting up") + config.LoadFromFile(*configFlag) + statusCollector.Config <- config + + playbackSampleRate := beep.SampleRate(sampleRate) + speaker.Init(playbackSampleRate, playbackSampleRate.N(time.Second/10)) + + if config.PTTPin != -1 { + InitRaspberryPiPTT(config.PTTPin, config.GpioDevice) + } + if config.COSPin != -1 { + InitRaspberryPiCOS(config.COSPin, config.GpioDevice) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sig + log.Println("Radio shutting down due to signal", sig) + // Make sure we always stop PTT when program ends + ptt.DisengagePTT() + os.Exit(0) + }() + + log.Println("Config checks out, radio coming online") + log.Println("Audio file cache:", config.CachePath) + + fileSpecChan := make(chan []protocol.FileSpec) + go filesWorker(config.CachePath, fileSpecChan) + + stop := make(chan bool) + playlistSpecChan := make(chan []protocol.PlaylistSpec) + go playlistWorker(playlistSpecChan, stop) + + for { + runWebsocket(fileSpecChan, playlistSpecChan, stop) + log.Println("Websocket failed, retry in 30 seconds") + time.Sleep(time.Second * time.Duration(30)) + } +} + +func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error { + log.Println("Establishing websocket connection to:", config.WebsocketURL()) + ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL) + if err != nil { + return err + } + + auth := protocol.AuthenticateMessage{ + T: "authenticate", + Token: config.Token, + } + msg, _ := json.Marshal(auth) + + if _, err := ws.Write(msg); err != nil { + log.Fatal(err) + } + statusCollector.Websocket <- ws + + buf := make([]byte, 16384) + badRead := false + for { + n, err := ws.Read(buf) + if err != nil { + log.Println("Lost websocket to server") + return err + } + // Ignore any massively oversize messages + if n == len(buf) { + badRead = true + continue + } else if badRead { + badRead = false + continue + } + + t, msg, err := protocol.ParseMessage(buf[:n]) + if err != nil { + log.Println("Message parse error", err) + return err + } + + if t == protocol.FilesType { + filesMsg := msg.(protocol.FilesMessage) + fileSpecChan <- filesMsg.Files + } + + if t == protocol.PlaylistsType { + playlistsMsg := msg.(protocol.PlaylistsMessage) + playlistSpecChan <- playlistsMsg.Playlists + } + + if t == protocol.StopType { + stop <- true + } + } +} + +func filesWorker(cachePath string, ch chan []protocol.FileSpec) { + machine := NewFilesMachine(cachePath) + isDownloading := false + downloadResult := make(chan error) + var timer *time.Timer + + for { + var timerCh <-chan time.Time = nil + if timer != nil { + timerCh = timer.C + } + doNext := false + select { + case specs := <-ch: + log.Println("Received new file specs", specs) + machine.UpdateSpecs(specs) + doNext = true + timer = nil + case err := <-downloadResult: + isDownloading = false + machine.RefreshMissing() + if err != nil { + log.Println(err) + if !machine.IsCacheComplete() { + timer = time.NewTimer(30 * time.Second) + } + } else { + if !machine.IsCacheComplete() { + timer = time.NewTimer(10 * time.Millisecond) + } + } + case <-timerCh: + doNext = true + timer = nil + } + + if doNext && !isDownloading && !machine.IsCacheComplete() { + next := machine.NextFile() + isDownloading = true + go machine.DownloadSingle(next, downloadResult) + } + } +} + +func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) { + var specs []protocol.PlaylistSpec + isPlaying := false + playbackFinished := make(chan error) + cancel := make(chan bool) + nextId := 0 + var timer *time.Timer + + for { + var timerCh <-chan time.Time = nil + if timer != nil { + timerCh = timer.C + } + doNext := false + select { + case specs = <-ch: + log.Println("Received new playlist specs", specs) + doNext = true + case <-playbackFinished: + isPlaying = false + doNext = true + cancel = make(chan bool) + case <-timerCh: + timer = nil + isPlaying = true + for _, v := range specs { + if v.Id == nextId { + go playPlaylist(v, playbackFinished, cancel) + } + } + case <-stop: + if isPlaying { + log.Println("Cancelling playlist in progress") + cancel <- true + } + } + + if doNext && !isPlaying { + timer = nil + found := false + loc, err := time.LoadLocation(config.TimeZone) + if err != nil { + log.Fatal(err) + } + var soonestTime time.Time + for _, v := range specs { + t, err := time.ParseInLocation(protocol.StartTimeFormat, v.StartTime, loc) + if err != nil { + log.Println("Error parsing start time", err) + continue + } + if t.Before(time.Now()) { + continue + } + if !found || t.Before(soonestTime) { + soonestTime = t + found = true + nextId = v.Id + } + } + if found { + duration := soonestTime.Sub(time.Now()) + log.Println("Next playlist will be id", nextId, "in", duration.Seconds(), "seconds") + timer = time.NewTimer(duration) + } else { + log.Println("No future playlists") + } + } + } +} + +func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) { + startTime := time.Now() + log.Println("Beginning playback of playlist", playlist.Name) +entries: + for _, p := range playlist.Entries { + // delay + var duration time.Duration + if p.IsRelative { + duration = time.Second * time.Duration(p.DelaySeconds) + } else { + duration = time.Until(startTime.Add(time.Second * time.Duration(p.DelaySeconds))) + } + statusCollector.PlaylistBeginDelay <- BeginDelayStatus{ + Playlist: playlist.Name, + Seconds: int(duration.Seconds()), + Filename: p.Filename, + } + select { + case <-time.After(duration): + case <-cancel: + break entries + } + + statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{ + Playlist: playlist.Name, + Filename: p.Filename, + } + cos.WaitForChannelClear() + + // then play + statusCollector.PlaylistBeginPlayback <- BeginPlaybackStatus{ + Playlist: playlist.Name, + Filename: p.Filename, + } + ptt.EngagePTT() + f, err := os.Open(filepath.Join(config.CachePath, p.Filename)) + if err != nil { + log.Println("Couldn't open file for playlist", p.Filename) + continue + } + log.Println("Playing file", p.Filename) + l := strings.ToLower(p.Filename) + var streamer beep.StreamSeekCloser + var format beep.Format + if strings.HasSuffix(l, ".mp3") { + streamer, format, err = mp3.Decode(f) + } else if strings.HasSuffix(l, ".wav") { + streamer, format, err = wav.Decode(f) + } else { + log.Println("Unrecognised file extension (.wav and .mp3 supported), moving on") + } + if err != nil { + log.Println("Could not decode media file", err) + continue + } + defer streamer.Close() + + done := make(chan bool) + + if format.SampleRate != sampleRate { + resampled := beep.Resample(4, format.SampleRate, sampleRate, streamer) + speaker.Play(beep.Seq(resampled, beep.Callback(func() { + done <- true + }))) + } else { + speaker.Play(beep.Seq(streamer, beep.Callback(func() { + done <- true + }))) + } + + select { + case <-done: + case <-cancel: + ptt.DisengagePTT() + break entries + } + ptt.DisengagePTT() + } + log.Println("Playlist finished", playlist.Name) + statusCollector.PlaylistBeginIdle <- true + playbackFinished <- nil +} diff --git a/broadcaster-radio/status.go b/broadcaster-radio/status.go new file mode 100644 index 0000000..7836815 --- /dev/null +++ b/broadcaster-radio/status.go @@ -0,0 +1,140 @@ +package main + +import ( + "code.octet-stream.net/broadcaster/internal/protocol" + "encoding/json" + "golang.org/x/net/websocket" + "time" +) + +type BeginDelayStatus struct { + Playlist string + Seconds int + Filename string +} + +type BeginWaitForChannelStatus struct { + Playlist string + Filename string +} + +type BeginPlaybackStatus struct { + Playlist string + Filename string +} + +type StatusCollector struct { + Websocket chan *websocket.Conn + PlaylistBeginIdle chan bool + PlaylistBeginDelay chan BeginDelayStatus + PlaylistBeginWaitForChannel chan BeginWaitForChannelStatus + PlaylistBeginPlayback chan BeginPlaybackStatus + PTT chan bool + COS chan bool + Config chan RadioConfig + FilesInSync chan bool +} + +var statusCollector = NewStatusCollector() + +func NewStatusCollector() StatusCollector { + sc := StatusCollector{ + Websocket: make(chan *websocket.Conn), + PlaylistBeginIdle: make(chan bool), + PlaylistBeginDelay: make(chan BeginDelayStatus), + PlaylistBeginWaitForChannel: make(chan BeginWaitForChannelStatus), + PlaylistBeginPlayback: make(chan BeginPlaybackStatus), + PTT: make(chan bool), + COS: make(chan bool), + Config: make(chan RadioConfig), + FilesInSync: make(chan bool), + } + go runStatusCollector(sc) + return sc +} + +func runStatusCollector(sc StatusCollector) { + config := <-sc.Config + var msg protocol.StatusMessage + var lastSent protocol.StatusMessage + msg.T = protocol.StatusType + msg.TimeZone = config.TimeZone + msg.Status = protocol.StatusIdle + var ws *websocket.Conn + // Go 1.23: no need to stop tickers when finished + var ticker = time.NewTicker(time.Second * time.Duration(30)) + + for { + select { + case newWebsocket := <-sc.Websocket: + ws = newWebsocket + case <-ticker.C: + // should always be ticking at 1 second for these + if msg.Status == protocol.StatusDelay { + if msg.DelaySecondsRemaining > 0 { + msg.DelaySecondsRemaining -= 1 + } + } + if msg.Status == protocol.StatusChannelInUse { + msg.WaitingForChannelSeconds += 1 + } + if msg.Status == protocol.StatusPlaying { + msg.PlaybackSecondsElapsed += 1 + } + case <-sc.PlaylistBeginIdle: + msg.Status = protocol.StatusIdle + msg.DelaySecondsRemaining = 0 + msg.WaitingForChannelSeconds = 0 + msg.PlaybackSecondsElapsed = 0 + msg.Playlist = "" + msg.Filename = "" + // Update things more slowly when nothing's playing + ticker = time.NewTicker(time.Second * time.Duration(30)) + case delay := <-sc.PlaylistBeginDelay: + msg.Status = protocol.StatusDelay + msg.DelaySecondsRemaining = delay.Seconds + msg.WaitingForChannelSeconds = 0 + msg.PlaybackSecondsElapsed = 0 + msg.Playlist = delay.Playlist + msg.Filename = delay.Filename + // Align ticker with start of state change, make sure it's faster + ticker = time.NewTicker(time.Second * time.Duration(1)) + case wait := <-sc.PlaylistBeginWaitForChannel: + msg.Status = protocol.StatusChannelInUse + msg.DelaySecondsRemaining = 0 + msg.WaitingForChannelSeconds = 0 + msg.PlaybackSecondsElapsed = 0 + msg.Playlist = wait.Playlist + msg.Filename = wait.Filename + ticker = time.NewTicker(time.Second * time.Duration(1)) + case playback := <-sc.PlaylistBeginPlayback: + msg.Status = protocol.StatusPlaying + msg.DelaySecondsRemaining = 0 + msg.WaitingForChannelSeconds = 0 + msg.PlaybackSecondsElapsed = 0 + msg.Playlist = playback.Playlist + msg.Filename = playback.Filename + ticker = time.NewTicker(time.Second * time.Duration(1)) + case ptt := <-sc.PTT: + msg.PTT = ptt + case cos := <-sc.COS: + msg.COS = cos + case inSync := <-sc.FilesInSync: + msg.FilesInSync = inSync + } + msg.LocalTime = time.Now().Format(protocol.LocalTimeFormat) + msg.COS = cos.COSValue() + + if msg == lastSent { + continue + } + if ws != nil { + msgJson, _ := json.Marshal(msg) + if _, err := ws.Write(msgJson); err != nil { + // If websocket has failed, wait 'til we get a new one + ws = nil + } + lastSent = msg + } + } +} diff --git a/broadcaster-server/command.go b/broadcaster-server/command.go new file mode 100644 index 0000000..346e842 --- /dev/null +++ b/broadcaster-server/command.go @@ -0,0 +1,53 @@ +package main + +import ( + "code.octet-stream.net/broadcaster/internal/protocol" + "encoding/json" + "golang.org/x/net/websocket" + "sync" +) + +type CommandRouter struct { + connsMutex sync.Mutex + conns map[int]*websocket.Conn +} + +var commandRouter CommandRouter + +func InitCommandRouter() { + commandRouter.conns = make(map[int]*websocket.Conn) +} + +func (c *CommandRouter) AddWebsocket(radioId int, ws *websocket.Conn) { + c.connsMutex.Lock() + defer c.connsMutex.Unlock() + c.conns[radioId] = ws +} + +func (c *CommandRouter) RemoveWebsocket(ws *websocket.Conn) { + c.connsMutex.Lock() + defer c.connsMutex.Unlock() + key := -1 + for k, v := range c.conns { + if v == ws { + key = k + } + } + if key != -1 { + delete(c.conns, key) + } + +} + +func (c *CommandRouter) Stop(radioId int) { + c.connsMutex.Lock() + defer c.connsMutex.Unlock() + ws := c.conns[radioId] + if ws != nil { + stop := protocol.StopMessage{ + T: protocol.StopType, + } + msg, _ := json.Marshal(stop) + ws.Write(msg) + } +} diff --git a/broadcaster-server/config.go b/broadcaster-server/config.go new file mode 100644 index 0000000..4a4866c --- /dev/null +++ b/broadcaster-server/config.go @@ -0,0 +1,45 @@ +package main + +import ( + "errors" + "log" + + "github.com/BurntSushi/toml" +) + +type ServerConfig struct { + BindAddress string + Port int + SqliteDB string + AudioFilesPath string +} + +func NewServerConfig() ServerConfig { + return ServerConfig{ + BindAddress: "0.0.0.0", + Port: 55134, + SqliteDB: "", + AudioFilesPath: "", + } +} + +func (c *ServerConfig) LoadFromFile(path string) { + _, err := toml.DecodeFile(path, &c) + if err != nil { + log.Fatal("could not read config file for reading at path:", path, err) + } + err = c.Validate() + if err != nil { + log.Fatal(err) + } +} + +func (c *ServerConfig) Validate() error { + if c.SqliteDB == "" { + return errors.New("Configuration must provide SqliteDB") + } + if c.AudioFilesPath == "" { + return errors.New("Configuration must provide AudioFilesPath") + } + return nil +} diff --git a/broadcaster-server/database.go b/broadcaster-server/database.go new file mode 100644 index 0000000..1312208 --- /dev/null +++ b/broadcaster-server/database.go @@ -0,0 +1,247 @@ +package main + +import ( + "database/sql" + "errors" + "log" + _ "modernc.org/sqlite" + "time" +) + +type Database struct { + sqldb *sql.DB +} + +var db Database + +func InitDatabase() { + sqldb, err := sql.Open("sqlite", config.SqliteDB) + if err != nil { + log.Fatal(err) + } + db.sqldb = sqldb + + _, err = db.sqldb.Exec("PRAGMA journal_mode = WAL") + if err != nil { + log.Fatal(err) + } + + _, err = db.sqldb.Exec("PRAGMA foreign_keys = ON") + if err != nil { + log.Fatal(err) + } + + _, err = db.sqldb.Exec("PRAGMA busy_timeout = 5000") + if err != nil { + log.Fatal(err) + } + + sqlStmt := ` + CREATE TABLE IF NOT EXISTS sessions (id INTEGER PRIMARY KEY AUTOINCREMENT, token TEXT, username TEXT, created TIMESTAMP, expiry TIMESTAMP); + CREATE TABLE IF NOT EXISTS playlists (id INTEGER PRIMARY KEY AUTOINCREMENT, enabled INTEGER, name TEXT, start_time TEXT); + CREATE TABLE IF NOT EXISTS playlist_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, playlist_id INTEGER, position INTEGER, filename TEXT, delay_seconds INTEGER, is_relative INTEGER, CONSTRAINT fk_playlists FOREIGN KEY (playlist_id) REFERENCES playlists(id) ON DELETE CASCADE); + CREATE TABLE IF NOT EXISTS radios (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, token TEXT); + CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE, password_hash TEXT, is_admin INTEGER); + ` + _, err = db.sqldb.Exec(sqlStmt) + if err != nil { + log.Printf("%q: %s\n", err, sqlStmt) + return + } +} + +func (d *Database) CloseDatabase() { + d.sqldb.Close() +} + +func (d *Database) InsertSession(user string, token string, expiry time.Time) { + _, err := d.sqldb.Exec("INSERT INTO sessions (token, username, created, expiry) values (?, ?, CURRENT_TIMESTAMP, ?)", token, user, expiry) + if err != nil { + log.Fatal(err) + } +} + +func (d *Database) GetUserNameForSession(token string) (string, error) { + var username string + err := d.sqldb.QueryRow("SELECT username FROM sessions WHERE token = ? AND expiry > CURRENT_TIMESTAMP", token).Scan(&username) + if err != nil { + return "", errors.New("no matching token") + } + return username, nil +} + +func (d *Database) GetUser(username string) (User, error) { + var user User + err := d.sqldb.QueryRow("SELECT id, username, password_hash, is_admin FROM users WHERE username = ?", username).Scan(&user.Id, &user.Username, &user.PasswordHash, &user.IsAdmin) + if err != nil { + return User{}, errors.New("no user with that username") + } + return user, nil +} + +func (d *Database) GetUsers() []User { + ret := make([]User, 0) + rows, err := d.sqldb.Query("SELECT id, username, password_hash, is_admin FROM users ORDER BY username ASC") + if err != nil { + return ret + } + defer rows.Close() + for rows.Next() { + var u User + if err := rows.Scan(&u.Id, &u.Username, &u.PasswordHash, &u.IsAdmin); err != nil { + return ret + } + ret = append(ret, u) + } + return ret +} + +func (d *Database) SetUserPassword(username string, passwordHash string) { + d.sqldb.Exec("UPDATE users SET password_hash = ? WHERE username = ?", passwordHash, username) +} + +func (d *Database) ClearOtherSessions(username string, token string) { + d.sqldb.Exec("DELETE FROM sessions WHERE username = ? AND token != ?", username, token) +} + +func (d *Database) SetUserIsAdmin(username string, isAdmin bool) { + d.sqldb.Exec("UPDATE users SET is_admin = ? WHERE username = ?", isAdmin, username) +} + +func (d *Database) CreateUser(user User) error { + _, err := d.sqldb.Exec("INSERT INTO users (username, password_hash, is_admin) values (?, ?, ?)", user.Username, user.PasswordHash, user.IsAdmin) + return err +} + +func (d *Database) DeleteUser(username string) error { + _, err := d.sqldb.Exec("DELETE FROM users WHERE username = ?", username) + return err +} + +func (d *Database) CreatePlaylist(playlist Playlist) int { + var id int + tx, _ := d.sqldb.Begin() + _, err := tx.Exec("INSERT INTO playlists (enabled, name, start_time) values (?, ?, ?)", playlist.Enabled, playlist.Name, playlist.StartTime) + if err != nil { + log.Fatal(err) + } + err = tx.QueryRow("SELECT last_insert_rowid()").Scan(&id) + if err != nil { + log.Fatal(err) + } + err = tx.Commit() + if err != nil { + log.Fatal(err) + } + return id +} + +func (d *Database) DeletePlaylist(playlistId int) { + d.sqldb.Exec("DELETE FROM playlists WHERE id = ?", playlistId) +} + +func (d *Database) GetPlaylists() []Playlist { + ret := make([]Playlist, 0) + rows, err := d.sqldb.Query("SELECT id, enabled, name, start_time FROM playlists ORDER BY id ASC") + if err != nil { + return ret + } + defer rows.Close() + for rows.Next() { + var p Playlist + if err := rows.Scan(&p.Id, &p.Enabled, &p.Name, &p.StartTime); err != nil { + return ret + } + ret = append(ret, p) + } + return ret +} + +func (d *Database) GetPlaylist(playlistId int) (Playlist, error) { + var p Playlist + err := d.sqldb.QueryRow("SELECT id, enabled, name, start_time FROM playlists WHERE id = ?", playlistId).Scan(&p.Id, &p.Enabled, &p.Name, &p.StartTime) + if err != nil { + return p, err + } + return p, nil +} + +func (d *Database) UpdatePlaylist(playlist Playlist) { + d.sqldb.Exec("UPDATE playlists SET enabled = ?, name = ?, start_time = ? WHERE id = ?", playlist.Enabled, playlist.Name, playlist.StartTime, playlist.Id) +} + +func (d *Database) SetEntriesForPlaylist(entries []PlaylistEntry, playlistId int) { + tx, _ := d.sqldb.Begin() + _, err := tx.Exec("DELETE FROM playlist_entries WHERE playlist_id = ?", playlistId) + for _, e := range entries { + _, err = tx.Exec("INSERT INTO playlist_entries (playlist_id, position, filename, delay_seconds, is_relative) values (?, ?, ?, ?, ?)", playlistId, e.Position, e.Filename, e.DelaySeconds, e.IsRelative) + if err != nil { + log.Fatal(err) + } + } + tx.Commit() // ignore errors +} + +func (d *Database) GetEntriesForPlaylist(playlistId int) []PlaylistEntry { + ret := make([]PlaylistEntry, 0) + rows, err := d.sqldb.Query("SELECT id, position, filename, delay_seconds, is_relative FROM playlist_entries WHERE playlist_id = ? ORDER by position ASC", playlistId) + if err != nil { + return ret + } + defer rows.Close() + for rows.Next() { + var entry PlaylistEntry + if err := rows.Scan(&entry.Id, &entry.Position, &entry.Filename, &entry.DelaySeconds, &entry.IsRelative); err != nil { + return ret + } + ret = append(ret, entry) + } + return ret +} + +func (d *Database) GetRadio(radioId int) (Radio, error) { + var r Radio + err := d.sqldb.QueryRow("SELECT id, name, token FROM radios WHERE id = ?", radioId).Scan(&r.Id, &r.Name, &r.Token) + if err != nil { + return r, err + } + return r, nil +} + +func (d *Database) GetRadioByToken(token string) (Radio, error) { + var r Radio + err := d.sqldb.QueryRow("SELECT id, name, token FROM radios WHERE token = ?", token).Scan(&r.Id, &r.Name, &r.Token) + if err != nil { + return r, err + } + return r, nil +} + +func (d *Database) GetRadios() []Radio { + ret := make([]Radio, 0) + rows, err := d.sqldb.Query("SELECT id, name, token FROM radios ORDER BY id ASC") + if err != nil { + return ret + } + defer rows.Close() + for rows.Next() { + var r Radio + if err := rows.Scan(&r.Id, &r.Name, &r.Token); err != nil { + return ret + } + ret = append(ret, r) + } + return ret +} + +func (d *Database) DeleteRadio(radioId int) { + d.sqldb.Exec("DELETE FROM radios WHERE id = ?", radioId) +} + +func (d *Database) CreateRadio(radio Radio) { + d.sqldb.Exec("INSERT INTO radios (name, token) values (?, ?)", radio.Name, radio.Token) +} + +func (d *Database) UpdateRadio(radio Radio) { + d.sqldb.Exec("UPDATE radios SET name = ?, token = ? WHERE id = ?", radio.Name, radio.Token, radio.Id) +} diff --git a/broadcaster-server/files.go b/broadcaster-server/files.go new file mode 100644 index 0000000..b452997 --- /dev/null +++ b/broadcaster-server/files.go @@ -0,0 +1,81 @@ +package main + +import ( + "crypto/sha256" + "encoding/hex" + "io" + "log" + "os" + "path/filepath" + "sync" +) + +type FileSpec struct { + Name string + Hash string +} + +type AudioFiles struct { + path string + list []FileSpec + changeWait chan bool + filesMutex sync.Mutex +} + +var files AudioFiles + +func InitAudioFiles(path string) { + files.changeWait = make(chan bool) + files.path = path + log.Println("initing audio files") + files.Refresh() + log.Println("done") +} + +func (r *AudioFiles) Refresh() { + entries, err := os.ReadDir(r.path) + if err != nil { + log.Println("couldn't read dir", r.path) + return + } + r.filesMutex.Lock() + defer r.filesMutex.Unlock() + r.list = nil + for _, file := range entries { + f, err := os.Open(filepath.Join(r.path, file.Name())) + if err != nil { + log.Println("couldn't open", file.Name()) + return + } + hash := sha256.New() + io.Copy(hash, f) + r.list = append(r.list, FileSpec{Name: file.Name(), Hash: hex.EncodeToString(hash.Sum(nil))}) + } + log.Println("Files updated", r.list) + close(files.changeWait) + files.changeWait = make(chan bool) +} + +func (r *AudioFiles) Path() string { + return r.path +} + +func (r *AudioFiles) Files() []FileSpec { + r.filesMutex.Lock() + defer r.filesMutex.Unlock() + return r.list +} + +func (r *AudioFiles) Delete(filename string) { + path := filepath.Join(r.path, filepath.Base(filename)) + if filepath.Clean(r.path) != filepath.Clean(path) { + os.Remove(path) + r.Refresh() + } +} + +func (r *AudioFiles) WatchForChanges() ([]FileSpec, chan bool) { + r.filesMutex.Lock() + defer r.filesMutex.Unlock() + return r.list, r.changeWait +} diff --git a/broadcaster-server/main.go b/broadcaster-server/main.go new file mode 100644 index 0000000..079f977 --- /dev/null +++ b/broadcaster-server/main.go @@ -0,0 +1,494 @@ +package main + +import ( + "bufio" + "embed" + "flag" + "fmt" + "golang.org/x/net/websocket" + "html/template" + "io" + "log" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" +) + +const version = "v1.0.0" +const formatString = "2006-01-02T15:04" + +//go:embed templates/* +var content embed.FS + +var config ServerConfig = NewServerConfig() + +func main() { + configFlag := flag.String("c", "", "path to configuration file") + addUserFlag := flag.Bool("a", false, "interactively add an admin user then exit") + versionFlag := flag.Bool("v", false, "print version then exit") + flag.Parse() + + if *versionFlag { + fmt.Println("Broadcaster Server", version) + os.Exit(0) + } + if *configFlag == "" { + log.Fatal("must specify a configuration file with -c") + } + config.LoadFromFile(*configFlag) + + InitDatabase() + defer db.CloseDatabase() + + if *addUserFlag { + scanner := bufio.NewScanner(os.Stdin) + fmt.Println("Enter new admin username:") + if !scanner.Scan() { + os.Exit(1) + } + username := scanner.Text() + fmt.Println("Enter new admin password (will be printed in the clear):") + if !scanner.Scan() { + os.Exit(1) + } + password := scanner.Text() + if username == "" || password == "" { + fmt.Println("Both username and password must be specified") + os.Exit(1) + } + if err := users.CreateUser(username, password, true); err != nil { + log.Fatal(err) + } + os.Exit(0) + } + + log.Println("Broadcaster Server", version, "starting up") + InitCommandRouter() + InitPlaylists() + InitAudioFiles(config.AudioFilesPath) + InitServerStatus() + + // Public routes + + http.HandleFunc("/login", logInPage) + http.Handle("/file-downloads/", http.StripPrefix("/file-downloads/", http.FileServer(http.Dir(config.AudioFilesPath)))) + + // Authenticated routes + + http.HandleFunc("/", homePage) + http.HandleFunc("/logout", logOutPage) + http.HandleFunc("/change-password", changePasswordPage) + + http.HandleFunc("/playlists/", playlistSection) + http.HandleFunc("/files/", fileSection) + http.HandleFunc("/radios/", radioSection) + + http.Handle("/radio-ws", websocket.Handler(RadioSync)) + http.Handle("/web-ws", websocket.Handler(WebSync)) + http.HandleFunc("/stop", stopPage) + + // Admin routes + + err := http.ListenAndServe(config.BindAddress+":"+strconv.Itoa(config.Port), nil) + if err != nil { + log.Fatal(err) + } +} + +type HomeData struct { + LoggedIn bool + Username string +} + +func homePage(w http.ResponseWriter, r *http.Request) { + tmpl := template.Must(template.ParseFS(content, "templates/index.html")) + data := HomeData{ + LoggedIn: true, + Username: "Bob", + } + tmpl.Execute(w, data) +} + +type LogInData struct { + Error string +} + +func logInPage(w http.ResponseWriter, r *http.Request) { + log.Println("Log in page!") + r.ParseForm() + username := r.Form["username"] + password := r.Form["password"] + errText := "" + if username != nil { + user, err := users.Authenticate(username[0], password[0]) + if err != nil { + errText = "Incorrect login" + } else { + createSessionCookie(w, user.Username) + http.Redirect(w, r, "/", http.StatusFound) + return + } + } + + data := LogInData{ + Error: errText, + } + + tmpl := template.Must(template.ParseFS(content, "templates/login.html")) + tmpl.Execute(w, data) +} + +func playlistSection(w http.ResponseWriter, r *http.Request) { + path := strings.Split(r.URL.Path, "/") + if len(path) != 3 { + http.NotFound(w, r) + return + } + if path[2] == "new" { + editPlaylistPage(w, r, 0) + } else if path[2] == "submit" && r.Method == "POST" { + submitPlaylist(w, r) + } else if path[2] == "delete" && r.Method == "POST" { + deletePlaylist(w, r) + } else if path[2] == "" { + playlistsPage(w, r) + } else { + id, err := strconv.Atoi(path[2]) + if err != nil { + http.NotFound(w, r) + return + } + editPlaylistPage(w, r, id) + } +} + +func fileSection(w http.ResponseWriter, r *http.Request) { + path := strings.Split(r.URL.Path, "/") + if len(path) != 3 { + http.NotFound(w, r) + return + } + if path[2] == "upload" { + uploadFile(w, r) + } else if path[2] == "delete" && r.Method == "POST" { + deleteFile(w, r) + } else if path[2] == "" { + filesPage(w, r) + } else { + http.NotFound(w, r) + return + } +} + +func radioSection(w http.ResponseWriter, r *http.Request) { + path := strings.Split(r.URL.Path, "/") + if len(path) != 3 { + http.NotFound(w, r) + return + } + if path[2] == "new" { + editRadioPage(w, r, 0) + } else if path[2] == "submit" && r.Method == "POST" { + submitRadio(w, r) + } else if path[2] == "delete" && r.Method == "POST" { + deleteRadio(w, r) + } else if path[2] == "" { + radiosPage(w, r) + } else { + id, err := strconv.Atoi(path[2]) + if err != nil { + http.NotFound(w, r) + return + } + editRadioPage(w, r, id) + } +} + +type ChangePasswordPageData struct { + Message string + ShowForm bool +} + +func changePasswordPage(w http.ResponseWriter, r *http.Request) { + user, err := currentUser(w, r) + if err != nil { + http.Redirect(w, r, "/login", http.StatusFound) + return + } + var data ChangePasswordPageData + if r.Method == "POST" { + err := r.ParseForm() + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + oldPassword := r.Form.Get("oldPassword") + newPassword := r.Form.Get("newPassword") + err = users.UpdatePassword(user.Username, oldPassword, newPassword) + if err != nil { + data.Message = "Failed to change password: " + err.Error() + data.ShowForm = true + } else { + data.Message = "Successfully changed password" + data.ShowForm = false + cookie, err := r.Cookie("broadcast_session") + if err == nil { + log.Println("clearing other sessions for username", user.Username, "token", cookie.Value) + db.ClearOtherSessions(user.Username, cookie.Value) + } + } + } else { + data.Message = "" + data.ShowForm = true + } + tmpl := template.Must(template.ParseFS(content, "templates/change_password.html")) + err = tmpl.Execute(w, data) + if err != nil { + log.Fatal(err) + } +} + +type PlaylistsPageData struct { + Playlists []Playlist +} + +func playlistsPage(w http.ResponseWriter, _ *http.Request) { + data := PlaylistsPageData{ + Playlists: db.GetPlaylists(), + } + tmpl := template.Must(template.ParseFS(content, "templates/playlists.html")) + err := tmpl.Execute(w, data) + if err != nil { + log.Fatal(err) + } +} + +type RadiosPageData struct { + Radios []Radio +} + +func radiosPage(w http.ResponseWriter, _ *http.Request) { + data := RadiosPageData{ + Radios: db.GetRadios(), + } + tmpl := template.Must(template.ParseFS(content, "templates/radios.html")) + err := tmpl.Execute(w, data) + if err != nil { + log.Fatal(err) + } +} + +type EditPlaylistPageData struct { + Playlist Playlist + Entries []PlaylistEntry + Files []string +} + +func editPlaylistPage(w http.ResponseWriter, r *http.Request, id int) { + var data EditPlaylistPageData + for _, f := range files.Files() { + data.Files = append(data.Files, f.Name) + } + if id == 0 { + data.Playlist.Enabled = true + data.Playlist.Name = "New Playlist" + data.Playlist.StartTime = time.Now().Format(formatString) + data.Entries = append(data.Entries, PlaylistEntry{}) + } else { + playlist, err := db.GetPlaylist(id) + if err != nil { + http.NotFound(w, r) + return + } + data.Playlist = playlist + data.Entries = db.GetEntriesForPlaylist(id) + } + tmpl := template.Must(template.ParseFS(content, "templates/playlist.html")) + tmpl.Execute(w, data) +} + +func submitPlaylist(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err == nil { + var p Playlist + id, err := strconv.Atoi(r.Form.Get("playlistId")) + if err != nil { + return + } + _, err = time.Parse(formatString, r.Form.Get("playlistStartTime")) + if err != nil { + return + } + p.Id = id + p.Enabled = r.Form.Get("playlistEnabled") == "1" + p.Name = r.Form.Get("playlistName") + p.StartTime = r.Form.Get("playlistStartTime") + + delays := r.Form["delaySeconds"] + filenames := r.Form["filename"] + isRelatives := r.Form["isRelative"] + + entries := make([]PlaylistEntry, 0) + for i := range delays { + var e PlaylistEntry + delay, err := strconv.Atoi(delays[i]) + if err != nil { + return + } + e.DelaySeconds = delay + e.Position = i + e.IsRelative = isRelatives[i] == "1" + e.Filename = filenames[i] + entries = append(entries, e) + } + cleanedEntries := make([]PlaylistEntry, 0) + for _, e := range entries { + if e.DelaySeconds != 0 || e.Filename != "" { + cleanedEntries = append(cleanedEntries, e) + } + } + + if id != 0 { + db.UpdatePlaylist(p) + } else { + id = db.CreatePlaylist(p) + } + db.SetEntriesForPlaylist(cleanedEntries, id) + // Notify connected radios + playlists.NotifyChanges() + } + http.Redirect(w, r, "/playlists/", http.StatusFound) +} + +func deletePlaylist(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err == nil { + id, err := strconv.Atoi(r.Form.Get("playlistId")) + if err != nil { + return + } + db.DeletePlaylist(id) + playlists.NotifyChanges() + } + http.Redirect(w, r, "/playlists/", http.StatusFound) +} + +type EditRadioPageData struct { + Radio Radio +} + +func editRadioPage(w http.ResponseWriter, r *http.Request, id int) { + var data EditRadioPageData + if id == 0 { + data.Radio.Name = "New Radio" + data.Radio.Token = generateSession() + } else { + radio, err := db.GetRadio(id) + if err != nil { + http.NotFound(w, r) + return + } + data.Radio = radio + } + tmpl := template.Must(template.ParseFS(content, "templates/radio.html")) + tmpl.Execute(w, data) +} + +func submitRadio(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err == nil { + var radio Radio + id, err := strconv.Atoi(r.Form.Get("radioId")) + if err != nil { + return + } + radio.Id = id + radio.Name = r.Form.Get("radioName") + radio.Token = r.Form.Get("radioToken") + if id != 0 { + db.UpdateRadio(radio) + } else { + db.CreateRadio(radio) + } + } + http.Redirect(w, r, "/radios/", http.StatusFound) +} + +func deleteRadio(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err == nil { + id, err := strconv.Atoi(r.Form.Get("radioId")) + if err != nil { + return + } + db.DeleteRadio(id) + } + http.Redirect(w, r, "/radios/", http.StatusFound) +} + +type FilesPageData struct { + Files []FileSpec +} + +func filesPage(w http.ResponseWriter, _ *http.Request) { + data := FilesPageData{ + Files: files.Files(), + } + log.Println("file page data", data) + tmpl := template.Must(template.ParseFS(content, "templates/files.html")) + err := tmpl.Execute(w, data) + if err != nil { + log.Fatal(err) + } +} + +func deleteFile(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err == nil { + filename := r.Form.Get("filename") + if filename == "" { + return + } + files.Delete(filename) + } + http.Redirect(w, r, "/files/", http.StatusFound) +} + +func uploadFile(w http.ResponseWriter, r *http.Request) { + err := r.ParseMultipartForm(100 << 20) + file, handler, err := r.FormFile("file") + if err == nil { + path := filepath.Join(files.Path(), filepath.Base(handler.Filename)) + f, _ := os.Create(path) + defer f.Close() + io.Copy(f, file) + log.Println("uploaded file to", path) + files.Refresh() + } + http.Redirect(w, r, "/files/", http.StatusFound) +} + +func logOutPage(w http.ResponseWriter, r *http.Request) { + clearSessionCookie(w) + tmpl := template.Must(template.ParseFS(content, "templates/logout.html")) + tmpl.Execute(w, nil) +} + +func stopPage(w http.ResponseWriter, r *http.Request) { + _, err := currentUser(w, r) + if err != nil { + http.Redirect(w, r, "/login", http.StatusFound) + return + } + r.ParseForm() + radioId, err := strconv.Atoi(r.Form.Get("radioId")) + if err != nil { + http.NotFound(w, r) + return + } + commandRouter.Stop(radioId) + http.Redirect(w, r, "/", http.StatusFound) +} diff --git a/broadcaster-server/model.go b/broadcaster-server/model.go new file mode 100644 index 0000000..11f94e7 --- /dev/null +++ b/broadcaster-server/model.go @@ -0,0 +1,29 @@ +package main + +type PlaylistEntry struct { + Id int + Position int + Filename string + DelaySeconds int + IsRelative bool +} + +type User struct { + Id int + Username string + PasswordHash string + IsAdmin bool +} + +type Playlist struct { + Id int + Enabled bool + Name string + StartTime string +} + +type Radio struct { + Id int + Name string + Token string +} diff --git a/broadcaster-server/playlist.go b/broadcaster-server/playlist.go new file mode 100644 index 0000000..f0a6c89 --- /dev/null +++ b/broadcaster-server/playlist.go @@ -0,0 +1,35 @@ +package main + +import ( + "sync" +) + +type Playlists struct { + changeWait chan bool + playlistMutex sync.Mutex +} + +var playlists Playlists + +func InitPlaylists() { + playlists.changeWait = make(chan bool) +} + +func (p *Playlists) GetPlaylists() []Playlist { + p.playlistMutex.Lock() + defer p.playlistMutex.Unlock() + return db.GetPlaylists() +} + +func (p *Playlists) WatchForChanges() ([]Playlist, chan bool) { + p.playlistMutex.Lock() + defer p.playlistMutex.Unlock() + return db.GetPlaylists(), p.changeWait +} + +func (p *Playlists) NotifyChanges() { + p.playlistMutex.Lock() + defer p.playlistMutex.Unlock() + close(p.changeWait) + p.changeWait = make(chan bool) +} diff --git a/broadcaster-server/radio_sync.go b/broadcaster-server/radio_sync.go new file mode 100644 index 0000000..66521f8 --- /dev/null +++ b/broadcaster-server/radio_sync.go @@ -0,0 +1,125 @@ +package main + +import ( + "code.octet-stream.net/broadcaster/internal/protocol" + "encoding/json" + "golang.org/x/net/websocket" + "log" +) + +func RadioSync(ws *websocket.Conn) { + log.Println("A websocket connected, I think") + buf := make([]byte, 16384) + + badRead := false + isAuthenticated := false + var radio Radio + for { + // Ignore any massively oversize messages + n, err := ws.Read(buf) + if err != nil { + if radio.Name != "" { + log.Println("Lost websocket to radio:", radio.Name) + status.RadioDisconnected(radio.Id) + } else { + log.Println("Lost unauthenticated websocket") + } + return + } + if n == len(buf) { + badRead = true + continue + } else if badRead { + badRead = false + continue + } + + t, msg, err := protocol.ParseMessage(buf[:n]) + if err != nil { + log.Println(err) + return + } + + if !isAuthenticated && t != protocol.AuthenticateType { + continue + } + + if t == protocol.AuthenticateType && !isAuthenticated { + authMsg := msg.(protocol.AuthenticateMessage) + r, err := db.GetRadioByToken(authMsg.Token) + if err != nil { + log.Println("Could not find radio for offered token", authMsg.Token) + } + radio = r + log.Println("Radio authenticated:", radio.Name) + isAuthenticated = true + commandRouter.AddWebsocket(r.Id, ws) + defer commandRouter.RemoveWebsocket(ws) + + go KeepFilesUpdated(ws) + go KeepPlaylistsUpdated(ws) + } + + if t == protocol.StatusType { + statusMsg := msg.(protocol.StatusMessage) + log.Println("Received Status from", radio.Name, ":", statusMsg) + status.MergeStatus(radio.Id, statusMsg) + } + } +} + +func sendPlaylistsMessageToRadio(ws *websocket.Conn, p []Playlist) error { + playlistSpecs := make([]protocol.PlaylistSpec, 0) + for _, v := range p { + if v.Enabled { + entrySpecs := make([]protocol.EntrySpec, 0) + for _, e := range db.GetEntriesForPlaylist(v.Id) { + entrySpecs = append(entrySpecs, protocol.EntrySpec{Filename: e.Filename, DelaySeconds: e.DelaySeconds, IsRelative: e.IsRelative}) + } + playlistSpecs = append(playlistSpecs, protocol.PlaylistSpec{Id: v.Id, Name: v.Name, StartTime: v.StartTime, Entries: entrySpecs}) + } + } + playlists := protocol.PlaylistsMessage{ + T: protocol.PlaylistsType, + Playlists: playlistSpecs, + } + msg, _ := json.Marshal(playlists) + _, err := ws.Write(msg) + return err +} + +func KeepPlaylistsUpdated(ws *websocket.Conn) { + for { + p, ch := playlists.WatchForChanges() + err := sendPlaylistsMessageToRadio(ws, p) + if err != nil { + return + } + <-ch + } +} + +func sendFilesMessageToRadio(ws *websocket.Conn, f []FileSpec) error { + specs := make([]protocol.FileSpec, 0) + for _, v := range f { + specs = append(specs, protocol.FileSpec{Name: v.Name, Hash: v.Hash}) + } + files := protocol.FilesMessage{ + T: protocol.FilesType, + Files: specs, + } + msg, _ := json.Marshal(files) + _, err := ws.Write(msg) + return err +} + +func KeepFilesUpdated(ws *websocket.Conn) { + for { + f, ch := files.WatchForChanges() + err := sendFilesMessageToRadio(ws, f) + if err != nil { + return + } + <-ch + } +} diff --git a/broadcaster-server/session.go b/broadcaster-server/session.go new file mode 100644 index 0000000..a097989 --- /dev/null +++ b/broadcaster-server/session.go @@ -0,0 +1,46 @@ +package main + +import ( + "crypto/rand" + "encoding/hex" + "log" + "net/http" + "time" +) + +func generateSession() string { + b := make([]byte, 32) + _, err := rand.Read(b) + if err != nil { + log.Fatal(err) + } + return hex.EncodeToString(b) +} + +func currentUser(_ http.ResponseWriter, r *http.Request) (User, error) { + cookie, e := r.Cookie("broadcast_session") + if e != nil { + return User{}, e + } + + return users.GetUserForSession(cookie.Value) +} + +func createSessionCookie(w http.ResponseWriter, username string) { + sess := generateSession() + log.Println("Generated a random session", sess) + expiration := time.Now().Add(365 * 24 * time.Hour) + cookie := http.Cookie{Name: "broadcast_session", Value: sess, Expires: expiration, SameSite: http.SameSiteLaxMode} + db.InsertSession(username, sess, expiration) + http.SetCookie(w, &cookie) +} + +func clearSessionCookie(w http.ResponseWriter) { + c := &http.Cookie{ + Name: "broadcast_session", + Value: "", + MaxAge: -1, + HttpOnly: true, + } + http.SetCookie(w, c) +} diff --git a/broadcaster-server/status.go b/broadcaster-server/status.go new file mode 100644 index 0000000..b2edd49 --- /dev/null +++ b/broadcaster-server/status.go @@ -0,0 +1,54 @@ +package main + +import ( + "code.octet-stream.net/broadcaster/internal/protocol" + "sync" +) + +type ServerStatus struct { + statuses map[int]protocol.StatusMessage + statusesMutex sync.Mutex + changeWait chan bool +} + +var status ServerStatus + +func InitServerStatus() { + status = ServerStatus{ + statuses: make(map[int]protocol.StatusMessage), + changeWait: make(chan bool), + } +} + +func (s *ServerStatus) MergeStatus(radioId int, status protocol.StatusMessage) { + s.statusesMutex.Lock() + defer s.statusesMutex.Unlock() + s.statuses[radioId] = status + s.TriggerChange() +} + +func (s *ServerStatus) RadioDisconnected(radioId int) { + s.statusesMutex.Lock() + defer s.statusesMutex.Unlock() + delete(s.statuses, radioId) + s.TriggerChange() +} + +func (s *ServerStatus) TriggerChange() { + close(s.changeWait) + s.changeWait = make(chan bool) +} + +func (s *ServerStatus) Statuses() map[int]protocol.StatusMessage { + s.statusesMutex.Lock() + defer s.statusesMutex.Unlock() + c := make(map[int]protocol.StatusMessage) + for k, v := range s.statuses { + c[k] = v + } + return c +} + +func (s *ServerStatus) ChangeChannel() chan bool { + return s.changeWait +} diff --git a/broadcaster-server/templates/change_password.html b/broadcaster-server/templates/change_password.html new file mode 100644 index 0000000..431e2a4 --- /dev/null +++ b/broadcaster-server/templates/change_password.html @@ -0,0 +1,25 @@ + + + + + + Broadcaster + + +
+

Change Password

+ {{if ne .Message ""}} +

{{.Message}}

+ {{end}} + {{if .ShowForm}} +
+
+
+
+
+ +
+ {{end}} +
+ + diff --git a/broadcaster-server/templates/files.html b/broadcaster-server/templates/files.html new file mode 100644 index 0000000..fa2614f --- /dev/null +++ b/broadcaster-server/templates/files.html @@ -0,0 +1,26 @@ + + + + + + Broadcaster + + +
+

Files! List

+

All files can be downloaded from the public file listing.

+ +

Upload New File

+

+

+ + +
+

+
+ + diff --git a/broadcaster-server/templates/index.html b/broadcaster-server/templates/index.html new file mode 100644 index 0000000..367d48c --- /dev/null +++ b/broadcaster-server/templates/index.html @@ -0,0 +1,81 @@ + + + + + + Broadcaster + + + + +
+

Welcome!

+ {{if .LoggedIn}} +

Your username is: {{.Username}}.

+

Log Out

+ {{else}} +

Log In

+ {{end}} +

File Management

+

Playlist Management

+

Radio Management

+

Connected Radios

+
+ Loading... +
+
+ + diff --git a/broadcaster-server/templates/login.html b/broadcaster-server/templates/login.html new file mode 100644 index 0000000..56772b9 --- /dev/null +++ b/broadcaster-server/templates/login.html @@ -0,0 +1,23 @@ + + + + + + Broadcaster Log In + + +
+

Log In

+
+ {{if ne .Error ""}} +

{{.Error}}

+ {{end}} +
+
+
+
+ +
+
+ + diff --git a/broadcaster-server/templates/logout.html b/broadcaster-server/templates/logout.html new file mode 100644 index 0000000..b151265 --- /dev/null +++ b/broadcaster-server/templates/logout.html @@ -0,0 +1,14 @@ + + + + + + Broadcaster Log Out + + +
+

Logged Out

+

Log In again

+
+ + diff --git a/broadcaster-server/templates/playlist.html b/broadcaster-server/templates/playlist.html new file mode 100644 index 0000000..e609712 --- /dev/null +++ b/broadcaster-server/templates/playlist.html @@ -0,0 +1,100 @@ + + + + + + Broadcaster + + + +
+

A specific playlist

+

+ {{if .Playlist.Id}} + Edit Playlist + {{else}} + Create New Playlist + {{end}} +

+
+ +

+ +
+

+

+ + +

+

+ + +

+

Playlist Items

+ {{range .Entries}} +

+ Wait until + + seconds from + + then play + + (Delete Item) +

+ {{end}} +

+ Add Item +

+

+ +

+
+ {{if .Playlist.Id}} +

Delete

+
+ +

+ +

+
+ {{end}} + +
+ + diff --git a/broadcaster-server/templates/playlists.html b/broadcaster-server/templates/playlists.html new file mode 100644 index 0000000..8b83d3d --- /dev/null +++ b/broadcaster-server/templates/playlists.html @@ -0,0 +1,19 @@ + + + + + + Broadcaster + + +
+

Playlists!

+ +

Add New Playlist

+
+ + diff --git a/broadcaster-server/templates/radio.html b/broadcaster-server/templates/radio.html new file mode 100644 index 0000000..01e4f0c --- /dev/null +++ b/broadcaster-server/templates/radio.html @@ -0,0 +1,43 @@ + + + + + + Broadcaster + + +
+

A specific radio

+

+ {{if .Radio.Id}} + Edit Radio + {{else}} + Register New Radio + {{end}} +

+
+ +

+ + +

+

+ Authentication token: {{.Radio.Token}} + +

+

+ +

+
+ {{if .Radio.Id}} +

Delete

+
+ +

+ +

+
+ {{end}} +
+ + diff --git a/broadcaster-server/templates/radios.html b/broadcaster-server/templates/radios.html new file mode 100644 index 0000000..d55ea4a --- /dev/null +++ b/broadcaster-server/templates/radios.html @@ -0,0 +1,19 @@ + + + + + + Broadcaster + + +
+

Radios

+ +

Register New Radio

+
+ + diff --git a/broadcaster-server/templates/radios.partial.html b/broadcaster-server/templates/radios.partial.html new file mode 100644 index 0000000..fb36ca8 --- /dev/null +++ b/broadcaster-server/templates/radios.partial.html @@ -0,0 +1,82 @@ +{{if .Radios}} +{{range .Radios}} + + + + + + + + + + + + +
+ {{.Name}} +
+ + + + + + + + + + + + + +
+ Local Time + + {{.LocalTime}} +
+ Time Zone + + {{.TimeZone}} +
+ Files In Sync + + {{if .FilesInSync}} Yes {{else}} No {{end}} +
+
+ {{.ChannelState}} + + + + + + + + + + + + + + +
+ Playlist: + + {{.Playlist}} +
+ File: + + {{.File}} +
+ Status: + + {{.Status}} +
+
+
+ + +
+
+{{end}} +{{else}} +

There are no radios online.

+{{end}} diff --git a/broadcaster-server/user.go b/broadcaster-server/user.go new file mode 100644 index 0000000..275f0ec --- /dev/null +++ b/broadcaster-server/user.go @@ -0,0 +1,82 @@ +package main + +import ( + "errors" + "golang.org/x/crypto/bcrypt" +) + +var users Users + +type Users struct{} + +func (u *Users) GetUserForSession(token string) (User, error) { + username, err := db.GetUserNameForSession(token) + if err != nil { + return User{}, err + } + user, err := db.GetUser(username) + if err != nil { + return User{}, err + } + return user, nil +} + +func (u *Users) Authenticate(username string, clearPassword string) (User, error) { + user, err := db.GetUser(username) + if err != nil { + return User{}, err + } + err = bcrypt.CompareHashAndPassword([]byte(user.PasswordHash), []byte(clearPassword)) + if err != nil { + return User{}, err + } + return user, nil +} + +func (u *Users) CreateUser(username string, clearPassword string, isAdmin bool) error { + if clearPassword == "" { + return errors.New("password cannot be empty") + } + hashed, err := bcrypt.GenerateFromPassword([]byte(clearPassword), bcrypt.DefaultCost) + if err != nil { + return err + } + return db.CreateUser(User{ + Id: 0, + Username: username, + PasswordHash: string(hashed), + IsAdmin: isAdmin, + }) +} + +func (u *Users) DeleteUser(username string) { + db.DeleteUser(username) +} + +func (u *Users) UpdatePassword(username string, oldClearPassword string, newClearPassword string) error { + user, err := db.GetUser(username) + if err != nil { + return err + } + err = bcrypt.CompareHashAndPassword([]byte(user.PasswordHash), []byte(oldClearPassword)) + if err != nil { + return errors.New("old password is incorrect") + } + if newClearPassword == "" { + return errors.New("password cannot be empty") + } + hashed, err := bcrypt.GenerateFromPassword([]byte(newClearPassword), bcrypt.DefaultCost) + if err != nil { + return err + } + db.SetUserPassword(username, string(hashed)) + return nil +} + +func (u *Users) UpdateIsAdmin(username string, isAdmin bool) { + db.SetUserIsAdmin(username, isAdmin) +} + +func (u *Users) Users() []User { + return db.GetUsers() +} diff --git a/broadcaster-server/web_sync.go b/broadcaster-server/web_sync.go new file mode 100644 index 0000000..a6b1a6e --- /dev/null +++ b/broadcaster-server/web_sync.go @@ -0,0 +1,162 @@ +package main + +import ( + "fmt" + "html/template" + "log" + "sort" + "strconv" + "strings" + + "code.octet-stream.net/broadcaster/internal/protocol" + "golang.org/x/net/websocket" +) + +func WebSync(ws *websocket.Conn) { + log.Println("A web user connected with WebSocket") + buf := make([]byte, 16384) + + badRead := false + isAuthenticated := false + var user User + for { + // Ignore any massively oversize messages + n, err := ws.Read(buf) + if err != nil { + if user.Username != "" { + log.Println("Lost websocket to user:", user) + } else { + log.Println("Lost unauthenticated website websocket") + } + return + } + if n == len(buf) { + badRead = true + continue + } else if badRead { + badRead = false + continue + } + + if !isAuthenticated { + token := string(buf[:n]) + u, err := users.GetUserForSession(token) + if err != nil { + log.Println("Could not find user for offered token", token, err) + ws.Close() + return + } + user = u + log.Println("User authenticated:", user) + isAuthenticated = true + + go KeepWebUpdated(ws) + + // send initial playlists message + err = sendRadioStatusToWeb(ws) + if err != nil { + return + } + } + } +} + +type WebStatusData struct { + Radios []WebRadioStatus +} + +type WebRadioStatus struct { + Name string + LocalTime string + TimeZone string + ChannelClass string + ChannelState string + Playlist string + File string + Status string + Id string + DisableCancel bool + FilesInSync bool +} + +func sendRadioStatusToWeb(ws *websocket.Conn) error { + webStatuses := make([]WebRadioStatus, 0) + radioStatuses := status.Statuses() + keys := make([]int, 0) + for i := range radioStatuses { + keys = append(keys, i) + } + sort.Ints(keys) + for _, i := range keys { + v := radioStatuses[i] + radio, err := db.GetRadio(i) + if err != nil { + continue + } + var channelClass, channelState string + if v.PTT { + channelClass = "ptt" + channelState = "PTT" + } else if v.COS { + channelClass = "cos" + channelState = "RX" + } else { + channelClass = "clear" + channelState = "CLEAR" + } + var statusText string + var disableCancel bool + if v.Status == protocol.StatusIdle { + statusText = "Idle" + disableCancel = true + } else if v.Status == protocol.StatusDelay { + statusText = fmt.Sprintf("Performing delay before transmit: %ds remain", v.DelaySecondsRemaining) + disableCancel = false + } else if v.Status == protocol.StatusChannelInUse { + statusText = fmt.Sprintf("Waiting for channel to clear: %ds", v.WaitingForChannelSeconds) + disableCancel = false + } else if v.Status == protocol.StatusPlaying { + statusText = fmt.Sprintf("Playing: %d:%02d", v.PlaybackSecondsElapsed/60, v.PlaybackSecondsElapsed%60) + disableCancel = false + } + playlist := v.Playlist + if playlist == "" { + playlist = "-" + } + filename := v.Filename + if filename == "" { + filename = "-" + } + webStatuses = append(webStatuses, WebRadioStatus{ + Name: radio.Name, + LocalTime: v.LocalTime, + TimeZone: v.TimeZone, + ChannelClass: channelClass, + ChannelState: channelState, + Playlist: playlist, + File: filename, + Status: statusText, + Id: strconv.Itoa(i), + DisableCancel: disableCancel, + FilesInSync: v.FilesInSync, + }) + } + data := WebStatusData{ + Radios: webStatuses, + } + buf := new(strings.Builder) + tmpl := template.Must(template.ParseFS(content, "templates/radios.partial.html")) + tmpl.Execute(buf, data) + _, err := ws.Write([]byte(buf.String())) + return err +} + +func KeepWebUpdated(ws *websocket.Conn) { + for { + <-status.ChangeChannel() + err := sendRadioStatusToWeb(ws) + if err != nil { + return + } + } +} diff --git a/go.mod b/go.mod index c8401c7..7012bcb 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/BurntSushi/toml v1.4.0 github.com/gopxl/beep/v2 v2.1.0 github.com/warthog618/go-gpiocdev v0.9.0 + golang.org/x/crypto v0.28.0 golang.org/x/net v0.30.0 modernc.org/sqlite v1.33.1 ) diff --git a/go.sum b/go.sum index 3184d32..ab7d07f 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/warthog618/go-gpiocdev v0.9.0 h1:AZWUq1WObgKCO9cJCACFpwWQw6yu8vJbIE6f github.com/warthog618/go-gpiocdev v0.9.0/go.mod h1:GV4NZC82fWJERqk7Gu0+KfLSDIBEDNm6aPGiHlmT5fY= github.com/warthog618/go-gpiosim v0.1.0 h1:2rTMTcKUVZxpUuvRKsagnKAbKpd3Bwffp87xywEDVGI= github.com/warthog618/go-gpiosim v0.1.0/go.mod h1:Ngx/LYI5toxHr4E+Vm6vTgCnt0of0tktsSuMUEJ2wCI= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go new file mode 100644 index 0000000..e23da3a --- /dev/null +++ b/internal/protocol/protocol.go @@ -0,0 +1,167 @@ +package protocol + +import ( + "encoding/json" + "errors" + "fmt" +) + +const ( + StartTimeFormat = "2006-01-02T15:04" + LocalTimeFormat = "Mon _2 Jan 2006 15:04:05" + + // Radio to server + + AuthenticateType = "authenticate" + StatusType = "status" + + // Server to radio + + FilesType = "files" + PlaylistsType = "playlists" + StopType = "stop" + + // Status values + + StatusIdle = "idle" + StatusDelay = "delay" + StatusChannelInUse = "channel_in_use" + StatusPlaying = "playing" +) + +// Base message type to determine what type of payload is expected. +type Message struct { + T string +} + +// Initial message from Radio to authenticate itself with a token string. +type AuthenticateMessage struct { + T string + Token string +} + +// Server updates the radio with the list of files that currently exist. +// This will be provided on connect and when there are any changes. +// The radio is expected to obtain all these files and cache them locally. +type FilesMessage struct { + T string + Files []FileSpec +} + +type PlaylistsMessage struct { + T string + Playlists []PlaylistSpec +} + +// Any playlist currently being played should be stopped and PTT disengaged. +type StopMessage struct { + T string +} + +type StatusMessage struct { + T string + + // Status w.r.t. playing a playlist + Status string + + // File being played or about to be played - empty string in idle status + Filename string + + // Name of playlist being played - empty string in idle status + Playlist string + + // Seconds until playback begins - never mind latency + DelaySecondsRemaining int + + // Number of seconds file has been actually playing + PlaybackSecondsElapsed int + + // Number of seconds waiting for channel to clear + WaitingForChannelSeconds int + + PTT bool + COS bool + FilesInSync bool + + // Timestamp of the current time on this radio, using LocalTimeFormat + LocalTime string + + // Time zone in use, e.g. "Australia/Hobart" + TimeZone string +} + +// Description of an individual file available in the broadcasting system. +type FileSpec struct { + // Filename, e.g. "broadcast.wav" + Name string + // SHA-256 hash of the file's contents + Hash string +} + +type PlaylistSpec struct { + Id int + Name string + StartTime string + Entries []EntrySpec +} + +type EntrySpec struct { + Filename string + DelaySeconds int + IsRelative bool +} + +func ParseMessage(data []byte) (string, interface{}, error) { + var t Message + err := json.Unmarshal(data, &t) + if err != nil { + return "", nil, err + } + + if t.T == AuthenticateType { + var auth AuthenticateMessage + err = json.Unmarshal(data, &auth) + if err != nil { + return "", nil, err + } + return t.T, auth, nil + } + + if t.T == FilesType { + var files FilesMessage + err = json.Unmarshal(data, &files) + if err != nil { + return "", nil, err + } + return t.T, files, nil + } + + if t.T == PlaylistsType { + var playlists PlaylistsMessage + err = json.Unmarshal(data, &playlists) + if err != nil { + return "", nil, err + } + return t.T, playlists, nil + } + + if t.T == StatusType { + var status StatusMessage + err = json.Unmarshal(data, &status) + if err != nil { + return "", nil, err + } + return t.T, status, nil + } + + if t.T == StopType { + var stop StopMessage + err = json.Unmarshal(data, &stop) + if err != nil { + return "", nil, err + } + return t.T, stop, nil + } + + return "", nil, errors.New(fmt.Sprintf("unknown message type %v", t.T)) +} diff --git a/protocol/protocol.go b/protocol/protocol.go deleted file mode 100644 index e23da3a..0000000 --- a/protocol/protocol.go +++ /dev/null @@ -1,167 +0,0 @@ -package protocol - -import ( - "encoding/json" - "errors" - "fmt" -) - -const ( - StartTimeFormat = "2006-01-02T15:04" - LocalTimeFormat = "Mon _2 Jan 2006 15:04:05" - - // Radio to server - - AuthenticateType = "authenticate" - StatusType = "status" - - // Server to radio - - FilesType = "files" - PlaylistsType = "playlists" - StopType = "stop" - - // Status values - - StatusIdle = "idle" - StatusDelay = "delay" - StatusChannelInUse = "channel_in_use" - StatusPlaying = "playing" -) - -// Base message type to determine what type of payload is expected. -type Message struct { - T string -} - -// Initial message from Radio to authenticate itself with a token string. -type AuthenticateMessage struct { - T string - Token string -} - -// Server updates the radio with the list of files that currently exist. -// This will be provided on connect and when there are any changes. -// The radio is expected to obtain all these files and cache them locally. -type FilesMessage struct { - T string - Files []FileSpec -} - -type PlaylistsMessage struct { - T string - Playlists []PlaylistSpec -} - -// Any playlist currently being played should be stopped and PTT disengaged. -type StopMessage struct { - T string -} - -type StatusMessage struct { - T string - - // Status w.r.t. playing a playlist - Status string - - // File being played or about to be played - empty string in idle status - Filename string - - // Name of playlist being played - empty string in idle status - Playlist string - - // Seconds until playback begins - never mind latency - DelaySecondsRemaining int - - // Number of seconds file has been actually playing - PlaybackSecondsElapsed int - - // Number of seconds waiting for channel to clear - WaitingForChannelSeconds int - - PTT bool - COS bool - FilesInSync bool - - // Timestamp of the current time on this radio, using LocalTimeFormat - LocalTime string - - // Time zone in use, e.g. "Australia/Hobart" - TimeZone string -} - -// Description of an individual file available in the broadcasting system. -type FileSpec struct { - // Filename, e.g. "broadcast.wav" - Name string - // SHA-256 hash of the file's contents - Hash string -} - -type PlaylistSpec struct { - Id int - Name string - StartTime string - Entries []EntrySpec -} - -type EntrySpec struct { - Filename string - DelaySeconds int - IsRelative bool -} - -func ParseMessage(data []byte) (string, interface{}, error) { - var t Message - err := json.Unmarshal(data, &t) - if err != nil { - return "", nil, err - } - - if t.T == AuthenticateType { - var auth AuthenticateMessage - err = json.Unmarshal(data, &auth) - if err != nil { - return "", nil, err - } - return t.T, auth, nil - } - - if t.T == FilesType { - var files FilesMessage - err = json.Unmarshal(data, &files) - if err != nil { - return "", nil, err - } - return t.T, files, nil - } - - if t.T == PlaylistsType { - var playlists PlaylistsMessage - err = json.Unmarshal(data, &playlists) - if err != nil { - return "", nil, err - } - return t.T, playlists, nil - } - - if t.T == StatusType { - var status StatusMessage - err = json.Unmarshal(data, &status) - if err != nil { - return "", nil, err - } - return t.T, status, nil - } - - if t.T == StopType { - var stop StopMessage - err = json.Unmarshal(data, &stop) - if err != nil { - return "", nil, err - } - return t.T, stop, nil - } - - return "", nil, errors.New(fmt.Sprintf("unknown message type %v", t.T)) -} diff --git a/radio/config.go b/radio/config.go deleted file mode 100644 index 3152653..0000000 --- a/radio/config.go +++ /dev/null @@ -1,70 +0,0 @@ -package main - -import ( - "errors" - "log" - "os" - "strings" - - "github.com/BurntSushi/toml" -) - -type RadioConfig struct { - GpioDevice string - PTTPin int - COSPin int - ServerURL string - Token string - CachePath string - TimeZone string -} - -func NewRadioConfig() RadioConfig { - return RadioConfig{ - GpioDevice: "gpiochip0", - PTTPin: -1, - COSPin: -1, - ServerURL: "", - Token: "", - CachePath: "", - TimeZone: "Australia/Hobart", - } -} - -func (c *RadioConfig) LoadFromFile(path string) { - _, err := toml.DecodeFile(path, &c) - if err != nil { - log.Fatal("could not read config file for reading at path:", path, err) - } - err = c.Validate() - if err != nil { - log.Fatal(err) - } - c.ApplyDefaults() -} - -func (c *RadioConfig) Validate() error { - if c.ServerURL == "" { - return errors.New("ServerURL must be provided in the configuration") - } - if c.Token == "" { - return errors.New("Token must be provided in the configuration") - } - return nil -} - -func (c *RadioConfig) ApplyDefaults() { - if c.CachePath == "" { - dir, err := os.MkdirTemp("", "broadcast") - if err != nil { - log.Fatal(err) - } - c.CachePath = dir - } -} - -func (c *RadioConfig) WebsocketURL() string { - addr := strings.Replace(c.ServerURL, "https://", "wss://", -1) - addr = strings.Replace(addr, "http://", "ws://", -1) - return addr + "/radiosync" -} diff --git a/radio/files_machine.go b/radio/files_machine.go deleted file mode 100644 index ea2f775..0000000 --- a/radio/files_machine.go +++ /dev/null @@ -1,117 +0,0 @@ -package main - -import ( - "code.octet-stream.net/broadcaster/protocol" - "crypto/sha256" - "encoding/hex" - "io" - "log" - "net/http" - "os" - "path/filepath" -) - -type FilesMachine struct { - specs []protocol.FileSpec - cachePath string - missing []string -} - -func NewFilesMachine(cachePath string) FilesMachine { - if err := os.MkdirAll(cachePath, 0750); err != nil { - log.Fatal(err) - } - return FilesMachine{ - cachePath: cachePath, - } -} - -func (m *FilesMachine) UpdateSpecs(specs []protocol.FileSpec) { - m.specs = specs - m.RefreshMissing() -} - -func (m *FilesMachine) RefreshMissing() { - // Delete any files in the cache dir who are not in the spec - entries, err := os.ReadDir(m.cachePath) - if err != nil { - log.Fatal(err) - } - okay := make([]string, 0) - for _, file := range entries { - hash := "" - for _, spec := range m.specs { - if file.Name() == spec.Name { - hash = spec.Hash - break - } - } - // if we have an extraneous file, delete it - if hash == "" { - log.Println("Deleting extraneous cached audio file:", file.Name()) - os.Remove(filepath.Join(m.cachePath, file.Name())) - continue - } - // if the hash isn't right, delete it - f, err := os.Open(filepath.Join(m.cachePath, file.Name())) - if err != nil { - log.Fatal(err) - } - hasher := sha256.New() - io.Copy(hasher, f) - if hex.EncodeToString(hasher.Sum(nil)) != hash { - log.Println("Deleting cached audio file with incorrect hash:", file.Name()) - os.Remove(filepath.Join(m.cachePath, file.Name())) - } else { - okay = append(okay, file.Name()) - } - } - m.missing = nil - for _, spec := range m.specs { - missing := true - for _, file := range okay { - if spec.Name == file { - missing = false - } - } - if missing { - m.missing = append(m.missing, spec.Name) - } - } - if len(m.missing) > 1 { - log.Println(len(m.missing), "missing files") - } else if len(m.missing) == 1 { - log.Println("1 missing file") - } else { - log.Println("All files are in sync with server") - } - statusCollector.FilesInSync <- len(m.missing) == 0 -} - -func (m *FilesMachine) IsCacheComplete() bool { - return len(m.missing) == 0 -} - -func (m *FilesMachine) NextFile() string { - next, remainder := m.missing[0], m.missing[1:] - m.missing = remainder - return next -} - -func (m *FilesMachine) DownloadSingle(filename string, downloadResult chan<- error) { - log.Println("Downloading", filename) - out, err := os.Create(filepath.Join(m.cachePath, filename)) - if err != nil { - downloadResult <- err - return - } - defer out.Close() - resp, err := http.Get(config.ServerURL + "/audio-files/" + filename) - if err != nil { - downloadResult <- err - return - } - defer resp.Body.Close() - _, err = io.Copy(out, resp.Body) - downloadResult <- err -} diff --git a/radio/gpio.go b/radio/gpio.go deleted file mode 100644 index 69807ac..0000000 --- a/radio/gpio.go +++ /dev/null @@ -1,123 +0,0 @@ -package main - -import ( - gpio "github.com/warthog618/go-gpiocdev" - "github.com/warthog618/go-gpiocdev/device/rpi" - "log" - "strconv" -) - -type PTT interface { - EngagePTT() - DisengagePTT() -} - -type COS interface { - WaitForChannelClear() - COSValue() bool -} - -var ptt PTT = &DefaultPTT{} -var cos COS = &DefaultCOS{} - -type PiPTT struct { - pttLine *gpio.Line -} - -type PiCOS struct { - cosLine *gpio.Line - clearWait chan bool -} - -func InitRaspberryPiPTT(pttNum int, chipName string) { - pttPin, err := rpi.Pin("GPIO" + strconv.Itoa(pttNum)) - if err != nil { - log.Fatal("invalid PTT pin configured", ptt) - } - pttLine, err := gpio.RequestLine(chipName, pttPin, gpio.AsOutput(0)) - if err != nil { - log.Fatal("unable to open requested pin for PTT GPIO:", ptt, ". Are you running as root?") - } - ptt = &PiPTT{ - pttLine: pttLine, - } -} - -func InitRaspberryPiCOS(cosNum int, chipName string) { - var piCOS PiCOS - piCOS.clearWait = make(chan bool) - cosPin, err := rpi.Pin("GPIO" + strconv.Itoa(cosNum)) - if err != nil { - log.Fatal("invalid COS Pin configured", cos) - } - cosHandler := func(event gpio.LineEvent) { - if event.Type == gpio.LineEventFallingEdge { - log.Println("COS: channel clear") - close(piCOS.clearWait) - piCOS.clearWait = make(chan bool) - statusCollector.COS <- false - } - if event.Type == gpio.LineEventRisingEdge { - log.Println("COS: channel in use") - statusCollector.COS <- true - } - } - cosLine, err := gpio.RequestLine(chipName, cosPin, gpio.AsInput, gpio.WithBothEdges, gpio.WithEventHandler(cosHandler)) - if err != nil { - log.Fatal("unable to open requested pin for COS GPIO:", cos, ". Are you running as root?") - } - piCOS.cosLine = cosLine - cos = &piCOS -} - -func (g *PiCOS) COSValue() bool { - val, err := g.cosLine.Value() - if err != nil { - log.Fatal("Unable to read COS value") - } - return val != 0 -} - -func (g *PiCOS) WaitForChannelClear() { - ch := g.clearWait - val, err := g.cosLine.Value() - if err != nil || val == 0 { - return - } - // wait for close - <-ch -} - -func (g *PiPTT) EngagePTT() { - log.Println("PTT: on") - g.pttLine.SetValue(1) - statusCollector.PTT <- true -} - -func (g *PiPTT) DisengagePTT() { - log.Println("PTT: off") - g.pttLine.SetValue(0) - statusCollector.PTT <- false -} - -type DefaultPTT struct { -} - -func (g *DefaultPTT) EngagePTT() { - statusCollector.PTT <- true -} - -func (g *DefaultPTT) DisengagePTT() { - statusCollector.PTT <- false -} - -type DefaultCOS struct { -} - -func (g *DefaultCOS) WaitForChannelClear() { - log.Println("Assuming channel is clear since COS GPIO is not configured") -} - -func (g *DefaultCOS) COSValue() bool { - return false -} diff --git a/radio/radio.go b/radio/radio.go deleted file mode 100644 index bcf5e38..0000000 --- a/radio/radio.go +++ /dev/null @@ -1,327 +0,0 @@ -package main - -import ( - "code.octet-stream.net/broadcaster/protocol" - "encoding/json" - "flag" - "github.com/gopxl/beep/v2" - "github.com/gopxl/beep/v2/mp3" - "github.com/gopxl/beep/v2/speaker" - "github.com/gopxl/beep/v2/wav" - "golang.org/x/net/websocket" - "log" - "os" - "os/signal" - "path/filepath" - "strings" - "syscall" - "time" -) - -const sampleRate = 44100 - -var config RadioConfig = NewRadioConfig() - -func main() { - configFlag := flag.String("c", "", "path to configuration file") - // TODO: support this - //generateFlag := flag.String("g", "", "create a template config file with specified name then exit") - flag.Parse() - - if *configFlag == "" { - log.Fatal("must specify a configuration file with -c") - } - config.LoadFromFile(*configFlag) - statusCollector.Config <- config - - playbackSampleRate := beep.SampleRate(sampleRate) - speaker.Init(playbackSampleRate, playbackSampleRate.N(time.Second/10)) - - if config.PTTPin != -1 { - InitRaspberryPiPTT(config.PTTPin, config.GpioDevice) - } - if config.COSPin != -1 { - InitRaspberryPiCOS(config.COSPin, config.GpioDevice) - } - - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) - go func() { - sig := <-sig - log.Println("Radio shutting down due to signal", sig) - // Make sure we always stop PTT when program ends - ptt.DisengagePTT() - os.Exit(0) - }() - - log.Println("Config checks out, radio coming online") - log.Println("Audio file cache:", config.CachePath) - - fileSpecChan := make(chan []protocol.FileSpec) - go filesWorker(config.CachePath, fileSpecChan) - - stop := make(chan bool) - playlistSpecChan := make(chan []protocol.PlaylistSpec) - go playlistWorker(playlistSpecChan, stop) - - for { - runWebsocket(fileSpecChan, playlistSpecChan, stop) - log.Println("Websocket failed, retry in 30 seconds") - time.Sleep(time.Second * time.Duration(30)) - } -} - -func runWebsocket(fileSpecChan chan []protocol.FileSpec, playlistSpecChan chan []protocol.PlaylistSpec, stop chan bool) error { - log.Println("Establishing websocket connection to:", config.WebsocketURL()) - ws, err := websocket.Dial(config.WebsocketURL(), "", config.ServerURL) - if err != nil { - return err - } - - auth := protocol.AuthenticateMessage{ - T: "authenticate", - Token: config.Token, - } - msg, _ := json.Marshal(auth) - - if _, err := ws.Write(msg); err != nil { - log.Fatal(err) - } - statusCollector.Websocket <- ws - - buf := make([]byte, 16384) - badRead := false - for { - n, err := ws.Read(buf) - if err != nil { - log.Println("Lost websocket to server") - return err - } - // Ignore any massively oversize messages - if n == len(buf) { - badRead = true - continue - } else if badRead { - badRead = false - continue - } - - t, msg, err := protocol.ParseMessage(buf[:n]) - if err != nil { - log.Println("Message parse error", err) - return err - } - - if t == protocol.FilesType { - filesMsg := msg.(protocol.FilesMessage) - fileSpecChan <- filesMsg.Files - } - - if t == protocol.PlaylistsType { - playlistsMsg := msg.(protocol.PlaylistsMessage) - playlistSpecChan <- playlistsMsg.Playlists - } - - if t == protocol.StopType { - stop <- true - } - } -} - -func filesWorker(cachePath string, ch chan []protocol.FileSpec) { - machine := NewFilesMachine(cachePath) - isDownloading := false - downloadResult := make(chan error) - var timer *time.Timer - - for { - var timerCh <-chan time.Time = nil - if timer != nil { - timerCh = timer.C - } - doNext := false - select { - case specs := <-ch: - log.Println("Received new file specs", specs) - machine.UpdateSpecs(specs) - doNext = true - timer = nil - case err := <-downloadResult: - isDownloading = false - machine.RefreshMissing() - if err != nil { - log.Println(err) - if !machine.IsCacheComplete() { - timer = time.NewTimer(30 * time.Second) - } - } else { - if !machine.IsCacheComplete() { - timer = time.NewTimer(10 * time.Millisecond) - } - } - case <-timerCh: - doNext = true - timer = nil - } - - if doNext && !isDownloading && !machine.IsCacheComplete() { - next := machine.NextFile() - isDownloading = true - go machine.DownloadSingle(next, downloadResult) - } - } -} - -func playlistWorker(ch <-chan []protocol.PlaylistSpec, stop <-chan bool) { - var specs []protocol.PlaylistSpec - isPlaying := false - playbackFinished := make(chan error) - cancel := make(chan bool) - nextId := 0 - var timer *time.Timer - - for { - var timerCh <-chan time.Time = nil - if timer != nil { - timerCh = timer.C - } - doNext := false - select { - case specs = <-ch: - log.Println("Received new playlist specs", specs) - doNext = true - case <-playbackFinished: - isPlaying = false - doNext = true - cancel = make(chan bool) - case <-timerCh: - timer = nil - isPlaying = true - for _, v := range specs { - if v.Id == nextId { - go playPlaylist(v, playbackFinished, cancel) - } - } - case <-stop: - if isPlaying { - log.Println("Cancelling playlist in progress") - cancel <- true - } - } - - if doNext && !isPlaying { - timer = nil - found := false - loc, err := time.LoadLocation(config.TimeZone) - if err != nil { - log.Fatal(err) - } - var soonestTime time.Time - for _, v := range specs { - t, err := time.ParseInLocation(protocol.StartTimeFormat, v.StartTime, loc) - if err != nil { - log.Println("Error parsing start time", err) - continue - } - if t.Before(time.Now()) { - continue - } - if !found || t.Before(soonestTime) { - soonestTime = t - found = true - nextId = v.Id - } - } - if found { - duration := soonestTime.Sub(time.Now()) - log.Println("Next playlist will be id", nextId, "in", duration.Seconds(), "seconds") - timer = time.NewTimer(duration) - } else { - log.Println("No future playlists") - } - } - } -} - -func playPlaylist(playlist protocol.PlaylistSpec, playbackFinished chan<- error, cancel <-chan bool) { - startTime := time.Now() - log.Println("Beginning playback of playlist", playlist.Name) -entries: - for _, p := range playlist.Entries { - // delay - var duration time.Duration - if p.IsRelative { - duration = time.Second * time.Duration(p.DelaySeconds) - } else { - duration = time.Until(startTime.Add(time.Second * time.Duration(p.DelaySeconds))) - } - statusCollector.PlaylistBeginDelay <- BeginDelayStatus{ - Playlist: playlist.Name, - Seconds: int(duration.Seconds()), - Filename: p.Filename, - } - select { - case <-time.After(duration): - case <-cancel: - break entries - } - - statusCollector.PlaylistBeginWaitForChannel <- BeginWaitForChannelStatus{ - Playlist: playlist.Name, - Filename: p.Filename, - } - cos.WaitForChannelClear() - - // then play - statusCollector.PlaylistBeginPlayback <- BeginPlaybackStatus{ - Playlist: playlist.Name, - Filename: p.Filename, - } - ptt.EngagePTT() - f, err := os.Open(filepath.Join(config.CachePath, p.Filename)) - if err != nil { - log.Println("Couldn't open file for playlist", p.Filename) - continue - } - log.Println("Playing file", p.Filename) - l := strings.ToLower(p.Filename) - var streamer beep.StreamSeekCloser - var format beep.Format - if strings.HasSuffix(l, ".mp3") { - streamer, format, err = mp3.Decode(f) - } else if strings.HasSuffix(l, ".wav") { - streamer, format, err = wav.Decode(f) - } else { - log.Println("Unrecognised file extension (.wav and .mp3 supported), moving on") - } - if err != nil { - log.Println("Could not decode media file", err) - continue - } - defer streamer.Close() - - done := make(chan bool) - - if format.SampleRate != sampleRate { - resampled := beep.Resample(4, format.SampleRate, sampleRate, streamer) - speaker.Play(beep.Seq(resampled, beep.Callback(func() { - done <- true - }))) - } else { - speaker.Play(beep.Seq(streamer, beep.Callback(func() { - done <- true - }))) - } - - select { - case <-done: - case <-cancel: - ptt.DisengagePTT() - break entries - } - ptt.DisengagePTT() - } - log.Println("Playlist finished", playlist.Name) - statusCollector.PlaylistBeginIdle <- true - playbackFinished <- nil -} diff --git a/radio/status.go b/radio/status.go deleted file mode 100644 index 797a28f..0000000 --- a/radio/status.go +++ /dev/null @@ -1,140 +0,0 @@ -package main - -import ( - "code.octet-stream.net/broadcaster/protocol" - "encoding/json" - "golang.org/x/net/websocket" - "time" -) - -type BeginDelayStatus struct { - Playlist string - Seconds int - Filename string -} - -type BeginWaitForChannelStatus struct { - Playlist string - Filename string -} - -type BeginPlaybackStatus struct { - Playlist string - Filename string -} - -type StatusCollector struct { - Websocket chan *websocket.Conn - PlaylistBeginIdle chan bool - PlaylistBeginDelay chan BeginDelayStatus - PlaylistBeginWaitForChannel chan BeginWaitForChannelStatus - PlaylistBeginPlayback chan BeginPlaybackStatus - PTT chan bool - COS chan bool - Config chan RadioConfig - FilesInSync chan bool -} - -var statusCollector = NewStatusCollector() - -func NewStatusCollector() StatusCollector { - sc := StatusCollector{ - Websocket: make(chan *websocket.Conn), - PlaylistBeginIdle: make(chan bool), - PlaylistBeginDelay: make(chan BeginDelayStatus), - PlaylistBeginWaitForChannel: make(chan BeginWaitForChannelStatus), - PlaylistBeginPlayback: make(chan BeginPlaybackStatus), - PTT: make(chan bool), - COS: make(chan bool), - Config: make(chan RadioConfig), - FilesInSync: make(chan bool), - } - go runStatusCollector(sc) - return sc -} - -func runStatusCollector(sc StatusCollector) { - config := <-sc.Config - var msg protocol.StatusMessage - var lastSent protocol.StatusMessage - msg.T = protocol.StatusType - msg.TimeZone = config.TimeZone - msg.Status = protocol.StatusIdle - var ws *websocket.Conn - // Go 1.23: no need to stop tickers when finished - var ticker = time.NewTicker(time.Second * time.Duration(30)) - - for { - select { - case newWebsocket := <-sc.Websocket: - ws = newWebsocket - case <-ticker.C: - // should always be ticking at 1 second for these - if msg.Status == protocol.StatusDelay { - if msg.DelaySecondsRemaining > 0 { - msg.DelaySecondsRemaining -= 1 - } - } - if msg.Status == protocol.StatusChannelInUse { - msg.WaitingForChannelSeconds += 1 - } - if msg.Status == protocol.StatusPlaying { - msg.PlaybackSecondsElapsed += 1 - } - case <-sc.PlaylistBeginIdle: - msg.Status = protocol.StatusIdle - msg.DelaySecondsRemaining = 0 - msg.WaitingForChannelSeconds = 0 - msg.PlaybackSecondsElapsed = 0 - msg.Playlist = "" - msg.Filename = "" - // Update things more slowly when nothing's playing - ticker = time.NewTicker(time.Second * time.Duration(30)) - case delay := <-sc.PlaylistBeginDelay: - msg.Status = protocol.StatusDelay - msg.DelaySecondsRemaining = delay.Seconds - msg.WaitingForChannelSeconds = 0 - msg.PlaybackSecondsElapsed = 0 - msg.Playlist = delay.Playlist - msg.Filename = delay.Filename - // Align ticker with start of state change, make sure it's faster - ticker = time.NewTicker(time.Second * time.Duration(1)) - case wait := <-sc.PlaylistBeginWaitForChannel: - msg.Status = protocol.StatusChannelInUse - msg.DelaySecondsRemaining = 0 - msg.WaitingForChannelSeconds = 0 - msg.PlaybackSecondsElapsed = 0 - msg.Playlist = wait.Playlist - msg.Filename = wait.Filename - ticker = time.NewTicker(time.Second * time.Duration(1)) - case playback := <-sc.PlaylistBeginPlayback: - msg.Status = protocol.StatusPlaying - msg.DelaySecondsRemaining = 0 - msg.WaitingForChannelSeconds = 0 - msg.PlaybackSecondsElapsed = 0 - msg.Playlist = playback.Playlist - msg.Filename = playback.Filename - ticker = time.NewTicker(time.Second * time.Duration(1)) - case ptt := <-sc.PTT: - msg.PTT = ptt - case cos := <-sc.COS: - msg.COS = cos - case inSync := <-sc.FilesInSync: - msg.FilesInSync = inSync - } - msg.LocalTime = time.Now().Format(protocol.LocalTimeFormat) - msg.COS = cos.COSValue() - - if msg == lastSent { - continue - } - if ws != nil { - msgJson, _ := json.Marshal(msg) - if _, err := ws.Write(msgJson); err != nil { - // If websocket has failed, wait 'til we get a new one - ws = nil - } - lastSent = msg - } - } -} diff --git a/server/broadcaster.go b/server/broadcaster.go deleted file mode 100644 index e5f9922..0000000 --- a/server/broadcaster.go +++ /dev/null @@ -1,424 +0,0 @@ -package main - -import ( - "flag" - "golang.org/x/net/websocket" - "html/template" - "io" - "log" - "net/http" - "os" - "path/filepath" - "strconv" - "strings" - "time" -) - -const formatString = "2006-01-02T15:04" - -var config ServerConfig = NewServerConfig() - -func main() { - configFlag := flag.String("c", "", "path to configuration file") - // TODO: support this - //generateFlag := flag.String("g", "", "create a template config file with specified name then exit") - flag.Parse() - - if *configFlag == "" { - log.Fatal("must specify a configuration file with -c") - } - config.LoadFromFile(*configFlag) - - log.Println("Hello, World! Woo broadcast time") - InitDatabase() - defer db.CloseDatabase() - - InitCommandRouter() - InitPlaylists() - InitAudioFiles(config.AudioFilesPath) - InitServerStatus() - - http.HandleFunc("/", homePage) - http.HandleFunc("/login", logInPage) - http.HandleFunc("/logout", logOutPage) - http.HandleFunc("/secret", secretPage) - http.HandleFunc("/stop", stopPage) - - http.HandleFunc("/playlist/", playlistSection) - http.HandleFunc("/file/", fileSection) - http.HandleFunc("/radio/", radioSection) - - http.Handle("/radiosync", websocket.Handler(RadioSync)) - http.Handle("/websync", websocket.Handler(WebSync)) - http.Handle("/audio-files/", http.StripPrefix("/audio-files/", http.FileServer(http.Dir(config.AudioFilesPath)))) - - err := http.ListenAndServe(config.BindAddress+":"+strconv.Itoa(config.Port), nil) - if err != nil { - log.Fatal(err) - } -} - -type HomeData struct { - LoggedIn bool - Username string -} - -func homePage(w http.ResponseWriter, r *http.Request) { - tmpl := template.Must(template.ParseFiles("templates/index.html")) - data := HomeData{ - LoggedIn: true, - Username: "Bob", - } - tmpl.Execute(w, data) -} - -func secretPage(w http.ResponseWriter, r *http.Request) { - user, err := currentUser(w, r) - if err != nil { - http.Redirect(w, r, "/login", http.StatusFound) - return - } - tmpl := template.Must(template.ParseFiles("templates/index.html")) - data := HomeData{ - LoggedIn: true, - Username: user.username + ", you are special", - } - tmpl.Execute(w, data) -} - -type LogInData struct { - Error string -} - -func logInPage(w http.ResponseWriter, r *http.Request) { - log.Println("Log in page!") - r.ParseForm() - username := r.Form["username"] - password := r.Form["password"] - err := "" - if username != nil { - log.Println("Looks like we have a username", username[0]) - if username[0] == "admin" && password[0] == "test" { - createSessionCookie(w) - http.Redirect(w, r, "/", http.StatusFound) - return - } else { - err = "Incorrect login" - } - } - - data := LogInData{ - Error: err, - } - - tmpl := template.Must(template.ParseFiles("templates/login.html")) - tmpl.Execute(w, data) -} - -func playlistSection(w http.ResponseWriter, r *http.Request) { - path := strings.Split(r.URL.Path, "/") - if len(path) != 3 { - http.NotFound(w, r) - return - } - if path[2] == "new" { - editPlaylistPage(w, r, 0) - } else if path[2] == "submit" && r.Method == "POST" { - submitPlaylist(w, r) - } else if path[2] == "delete" && r.Method == "POST" { - deletePlaylist(w, r) - } else if path[2] == "" { - playlistsPage(w, r) - } else { - id, err := strconv.Atoi(path[2]) - if err != nil { - http.NotFound(w, r) - return - } - editPlaylistPage(w, r, id) - } -} - -func fileSection(w http.ResponseWriter, r *http.Request) { - path := strings.Split(r.URL.Path, "/") - if len(path) != 3 { - http.NotFound(w, r) - return - } - if path[2] == "upload" { - uploadFile(w, r) - } else if path[2] == "delete" && r.Method == "POST" { - deleteFile(w, r) - } else if path[2] == "" { - filesPage(w, r) - } else { - http.NotFound(w, r) - return - } -} - -func radioSection(w http.ResponseWriter, r *http.Request) { - path := strings.Split(r.URL.Path, "/") - if len(path) != 3 { - http.NotFound(w, r) - return - } - if path[2] == "new" { - editRadioPage(w, r, 0) - } else if path[2] == "submit" && r.Method == "POST" { - submitRadio(w, r) - } else if path[2] == "delete" && r.Method == "POST" { - deleteRadio(w, r) - } else if path[2] == "" { - radiosPage(w, r) - } else { - id, err := strconv.Atoi(path[2]) - if err != nil { - http.NotFound(w, r) - return - } - editRadioPage(w, r, id) - } -} - -type PlaylistsPageData struct { - Playlists []Playlist -} - -func playlistsPage(w http.ResponseWriter, _ *http.Request) { - data := PlaylistsPageData{ - Playlists: db.GetPlaylists(), - } - tmpl := template.Must(template.ParseFiles("templates/playlists.html")) - err := tmpl.Execute(w, data) - if err != nil { - log.Fatal(err) - } -} - -type RadiosPageData struct { - Radios []Radio -} - -func radiosPage(w http.ResponseWriter, _ *http.Request) { - data := RadiosPageData{ - Radios: db.GetRadios(), - } - tmpl := template.Must(template.ParseFiles("templates/radios.html")) - err := tmpl.Execute(w, data) - if err != nil { - log.Fatal(err) - } -} - -type EditPlaylistPageData struct { - Playlist Playlist - Entries []PlaylistEntry - Files []string -} - -func editPlaylistPage(w http.ResponseWriter, r *http.Request, id int) { - var data EditPlaylistPageData - for _, f := range files.Files() { - data.Files = append(data.Files, f.Name) - } - if id == 0 { - data.Playlist.Enabled = true - data.Playlist.Name = "New Playlist" - data.Playlist.StartTime = time.Now().Format(formatString) - data.Entries = append(data.Entries, PlaylistEntry{}) - } else { - playlist, err := db.GetPlaylist(id) - if err != nil { - http.NotFound(w, r) - return - } - data.Playlist = playlist - data.Entries = db.GetEntriesForPlaylist(id) - } - tmpl := template.Must(template.ParseFiles("templates/playlist.html")) - tmpl.Execute(w, data) -} - -func submitPlaylist(w http.ResponseWriter, r *http.Request) { - err := r.ParseForm() - if err == nil { - var p Playlist - id, err := strconv.Atoi(r.Form.Get("playlistId")) - if err != nil { - return - } - _, err = time.Parse(formatString, r.Form.Get("playlistStartTime")) - if err != nil { - return - } - p.Id = id - p.Enabled = r.Form.Get("playlistEnabled") == "1" - p.Name = r.Form.Get("playlistName") - p.StartTime = r.Form.Get("playlistStartTime") - - delays := r.Form["delaySeconds"] - filenames := r.Form["filename"] - isRelatives := r.Form["isRelative"] - - entries := make([]PlaylistEntry, 0) - for i := range delays { - var e PlaylistEntry - delay, err := strconv.Atoi(delays[i]) - if err != nil { - return - } - e.DelaySeconds = delay - e.Position = i - e.IsRelative = isRelatives[i] == "1" - e.Filename = filenames[i] - entries = append(entries, e) - } - cleanedEntries := make([]PlaylistEntry, 0) - for _, e := range entries { - if e.DelaySeconds != 0 || e.Filename != "" { - cleanedEntries = append(cleanedEntries, e) - } - } - - if id != 0 { - db.UpdatePlaylist(p) - } else { - id = db.CreatePlaylist(p) - } - db.SetEntriesForPlaylist(cleanedEntries, id) - // Notify connected radios - playlists.NotifyChanges() - } - http.Redirect(w, r, "/playlist/", http.StatusFound) -} - -func deletePlaylist(w http.ResponseWriter, r *http.Request) { - err := r.ParseForm() - if err == nil { - id, err := strconv.Atoi(r.Form.Get("playlistId")) - if err != nil { - return - } - db.DeletePlaylist(id) - playlists.NotifyChanges() - } - http.Redirect(w, r, "/playlist/", http.StatusFound) -} - -type EditRadioPageData struct { - Radio Radio -} - -func editRadioPage(w http.ResponseWriter, r *http.Request, id int) { - var data EditRadioPageData - if id == 0 { - data.Radio.Name = "New Radio" - data.Radio.Token = generateSession() - } else { - radio, err := db.GetRadio(id) - if err != nil { - http.NotFound(w, r) - return - } - data.Radio = radio - } - tmpl := template.Must(template.ParseFiles("templates/radio.html")) - tmpl.Execute(w, data) -} - -func submitRadio(w http.ResponseWriter, r *http.Request) { - err := r.ParseForm() - if err == nil { - var radio Radio - id, err := strconv.Atoi(r.Form.Get("radioId")) - if err != nil { - return - } - radio.Id = id - radio.Name = r.Form.Get("radioName") - radio.Token = r.Form.Get("radioToken") - if id != 0 { - db.UpdateRadio(radio) - } else { - db.CreateRadio(radio) - } - } - http.Redirect(w, r, "/radio/", http.StatusFound) -} - -func deleteRadio(w http.ResponseWriter, r *http.Request) { - err := r.ParseForm() - if err == nil { - id, err := strconv.Atoi(r.Form.Get("radioId")) - if err != nil { - return - } - db.DeleteRadio(id) - } - http.Redirect(w, r, "/radio/", http.StatusFound) -} - -type FilesPageData struct { - Files []FileSpec -} - -func filesPage(w http.ResponseWriter, _ *http.Request) { - data := FilesPageData{ - Files: files.Files(), - } - log.Println("file page data", data) - tmpl := template.Must(template.ParseFiles("templates/files.html")) - err := tmpl.Execute(w, data) - if err != nil { - log.Fatal(err) - } -} - -func deleteFile(w http.ResponseWriter, r *http.Request) { - err := r.ParseForm() - if err == nil { - filename := r.Form.Get("filename") - if filename == "" { - return - } - files.Delete(filename) - } - http.Redirect(w, r, "/file/", http.StatusFound) -} - -func uploadFile(w http.ResponseWriter, r *http.Request) { - err := r.ParseMultipartForm(100 << 20) - file, handler, err := r.FormFile("file") - if err == nil { - path := filepath.Join(files.Path(), filepath.Base(handler.Filename)) - f, _ := os.Create(path) - defer f.Close() - io.Copy(f, file) - log.Println("uploaded file to", path) - files.Refresh() - } - http.Redirect(w, r, "/file/", http.StatusFound) -} - -func logOutPage(w http.ResponseWriter, r *http.Request) { - clearSessionCookie(w) - tmpl := template.Must(template.ParseFiles("templates/logout.html")) - tmpl.Execute(w, nil) -} - -func stopPage(w http.ResponseWriter, r *http.Request) { - _, err := currentUser(w, r) - if err != nil { - http.Redirect(w, r, "/login", http.StatusFound) - return - } - r.ParseForm() - radioId, err := strconv.Atoi(r.Form.Get("radioId")) - if err != nil { - http.NotFound(w, r) - return - } - commandRouter.Stop(radioId) - http.Redirect(w, r, "/", http.StatusFound) -} diff --git a/server/command.go b/server/command.go deleted file mode 100644 index cd7410b..0000000 --- a/server/command.go +++ /dev/null @@ -1,53 +0,0 @@ -package main - -import ( - "code.octet-stream.net/broadcaster/protocol" - "encoding/json" - "golang.org/x/net/websocket" - "sync" -) - -type CommandRouter struct { - connsMutex sync.Mutex - conns map[int]*websocket.Conn -} - -var commandRouter CommandRouter - -func InitCommandRouter() { - commandRouter.conns = make(map[int]*websocket.Conn) -} - -func (c *CommandRouter) AddWebsocket(radioId int, ws *websocket.Conn) { - c.connsMutex.Lock() - defer c.connsMutex.Unlock() - c.conns[radioId] = ws -} - -func (c *CommandRouter) RemoveWebsocket(ws *websocket.Conn) { - c.connsMutex.Lock() - defer c.connsMutex.Unlock() - key := -1 - for k, v := range c.conns { - if v == ws { - key = k - } - } - if key != -1 { - delete(c.conns, key) - } - -} - -func (c *CommandRouter) Stop(radioId int) { - c.connsMutex.Lock() - defer c.connsMutex.Unlock() - ws := c.conns[radioId] - if ws != nil { - stop := protocol.StopMessage{ - T: protocol.StopType, - } - msg, _ := json.Marshal(stop) - ws.Write(msg) - } -} diff --git a/server/config.go b/server/config.go deleted file mode 100644 index 4a4866c..0000000 --- a/server/config.go +++ /dev/null @@ -1,45 +0,0 @@ -package main - -import ( - "errors" - "log" - - "github.com/BurntSushi/toml" -) - -type ServerConfig struct { - BindAddress string - Port int - SqliteDB string - AudioFilesPath string -} - -func NewServerConfig() ServerConfig { - return ServerConfig{ - BindAddress: "0.0.0.0", - Port: 55134, - SqliteDB: "", - AudioFilesPath: "", - } -} - -func (c *ServerConfig) LoadFromFile(path string) { - _, err := toml.DecodeFile(path, &c) - if err != nil { - log.Fatal("could not read config file for reading at path:", path, err) - } - err = c.Validate() - if err != nil { - log.Fatal(err) - } -} - -func (c *ServerConfig) Validate() error { - if c.SqliteDB == "" { - return errors.New("Configuration must provide SqliteDB") - } - if c.AudioFilesPath == "" { - return errors.New("Configuration must provide AudioFilesPath") - } - return nil -} diff --git a/server/database.go b/server/database.go deleted file mode 100644 index d6bbf92..0000000 --- a/server/database.go +++ /dev/null @@ -1,198 +0,0 @@ -package main - -import ( - "database/sql" - "errors" - "log" - _ "modernc.org/sqlite" - "time" -) - -type Database struct { - sqldb *sql.DB -} - -var db Database - -func InitDatabase() { - sqldb, err := sql.Open("sqlite", config.SqliteDB) - if err != nil { - log.Fatal(err) - } - db.sqldb = sqldb - - _, err = db.sqldb.Exec("PRAGMA journal_mode = WAL") - if err != nil { - log.Fatal(err) - } - - _, err = db.sqldb.Exec("PRAGMA foreign_keys = ON") - if err != nil { - log.Fatal(err) - } - - _, err = db.sqldb.Exec("PRAGMA busy_timeout = 5000") - if err != nil { - log.Fatal(err) - } - - sqlStmt := ` - CREATE TABLE IF NOT EXISTS sessions (id INTEGER PRIMARY KEY AUTOINCREMENT, token TEXT, username TEXT, created TIMESTAMP, expiry TIMESTAMP); - CREATE TABLE IF NOT EXISTS playlists (id INTEGER PRIMARY KEY AUTOINCREMENT, enabled INTEGER, name TEXT, start_time TEXT); - CREATE TABLE IF NOT EXISTS playlist_entries (id INTEGER PRIMARY KEY AUTOINCREMENT, playlist_id INTEGER, position INTEGER, filename TEXT, delay_seconds INTEGER, is_relative INTEGER, CONSTRAINT fk_playlists FOREIGN KEY (playlist_id) REFERENCES playlists(id) ON DELETE CASCADE); - CREATE TABLE IF NOT EXISTS radios (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, token TEXT); - ` - _, err = db.sqldb.Exec(sqlStmt) - if err != nil { - log.Printf("%q: %s\n", err, sqlStmt) - return - } -} - -func (d *Database) CloseDatabase() { - d.sqldb.Close() -} - -func (d *Database) InsertSession(user string, token string, expiry time.Time) { - _, err := d.sqldb.Exec("INSERT INTO sessions (token, username, created, expiry) values (?, ?, CURRENT_TIMESTAMP, ?)", token, user, expiry) - if err != nil { - log.Fatal(err) - } -} - -func (d *Database) GetUserForSession(token string) (string, error) { - var username string - err := d.sqldb.QueryRow("SELECT username FROM sessions WHERE token = ? AND expiry > CURRENT_TIMESTAMP", token).Scan(&username) - if err != nil { - return "", errors.New("no matching token") - } - return username, nil -} - -func (d *Database) CreatePlaylist(playlist Playlist) int { - var id int - tx, _ := d.sqldb.Begin() - _, err := tx.Exec("INSERT INTO playlists (enabled, name, start_time) values (?, ?, ?)", playlist.Enabled, playlist.Name, playlist.StartTime) - if err != nil { - log.Fatal(err) - } - err = tx.QueryRow("SELECT last_insert_rowid()").Scan(&id) - if err != nil { - log.Fatal(err) - } - err = tx.Commit() - if err != nil { - log.Fatal(err) - } - return id -} - -func (d *Database) DeletePlaylist(playlistId int) { - d.sqldb.Exec("DELETE FROM playlists WHERE id = ?", playlistId) -} - -func (d *Database) GetPlaylists() []Playlist { - ret := make([]Playlist, 0) - rows, err := d.sqldb.Query("SELECT id, enabled, name, start_time FROM playlists ORDER BY id ASC") - if err != nil { - return ret - } - defer rows.Close() - for rows.Next() { - var p Playlist - if err := rows.Scan(&p.Id, &p.Enabled, &p.Name, &p.StartTime); err != nil { - return ret - } - ret = append(ret, p) - } - return ret -} - -func (d *Database) GetPlaylist(playlistId int) (Playlist, error) { - var p Playlist - err := d.sqldb.QueryRow("SELECT id, enabled, name, start_time FROM playlists WHERE id = ?", playlistId).Scan(&p.Id, &p.Enabled, &p.Name, &p.StartTime) - if err != nil { - return p, err - } - return p, nil -} - -func (d *Database) UpdatePlaylist(playlist Playlist) { - d.sqldb.Exec("UPDATE playlists SET enabled = ?, name = ?, start_time = ? WHERE id = ?", playlist.Enabled, playlist.Name, playlist.StartTime, playlist.Id) -} - -func (d *Database) SetEntriesForPlaylist(entries []PlaylistEntry, playlistId int) { - tx, _ := d.sqldb.Begin() - _, err := tx.Exec("DELETE FROM playlist_entries WHERE playlist_id = ?", playlistId) - for _, e := range entries { - _, err = tx.Exec("INSERT INTO playlist_entries (playlist_id, position, filename, delay_seconds, is_relative) values (?, ?, ?, ?, ?)", playlistId, e.Position, e.Filename, e.DelaySeconds, e.IsRelative) - if err != nil { - log.Fatal(err) - } - } - tx.Commit() // ignore errors -} - -func (d *Database) GetEntriesForPlaylist(playlistId int) []PlaylistEntry { - ret := make([]PlaylistEntry, 0) - rows, err := d.sqldb.Query("SELECT id, position, filename, delay_seconds, is_relative FROM playlist_entries WHERE playlist_id = ? ORDER by position ASC", playlistId) - if err != nil { - return ret - } - defer rows.Close() - for rows.Next() { - var entry PlaylistEntry - if err := rows.Scan(&entry.Id, &entry.Position, &entry.Filename, &entry.DelaySeconds, &entry.IsRelative); err != nil { - return ret - } - ret = append(ret, entry) - } - return ret -} - -func (d *Database) GetRadio(radioId int) (Radio, error) { - var r Radio - err := d.sqldb.QueryRow("SELECT id, name, token FROM radios WHERE id = ?", radioId).Scan(&r.Id, &r.Name, &r.Token) - if err != nil { - return r, err - } - return r, nil -} - -func (d *Database) GetRadioByToken(token string) (Radio, error) { - var r Radio - err := d.sqldb.QueryRow("SELECT id, name, token FROM radios WHERE token = ?", token).Scan(&r.Id, &r.Name, &r.Token) - if err != nil { - return r, err - } - return r, nil -} - -func (d *Database) GetRadios() []Radio { - ret := make([]Radio, 0) - rows, err := d.sqldb.Query("SELECT id, name, token FROM radios ORDER BY id ASC") - if err != nil { - return ret - } - defer rows.Close() - for rows.Next() { - var r Radio - if err := rows.Scan(&r.Id, &r.Name, &r.Token); err != nil { - return ret - } - ret = append(ret, r) - } - return ret -} - -func (d *Database) DeleteRadio(radioId int) { - d.sqldb.Exec("DELETE FROM radios WHERE id = ?", radioId) -} - -func (d *Database) CreateRadio(radio Radio) { - d.sqldb.Exec("INSERT INTO radios (name, token) values (?, ?)", radio.Name, radio.Token) -} - -func (d *Database) UpdateRadio(radio Radio) { - d.sqldb.Exec("UPDATE radios SET name = ?, token = ? WHERE id = ?", radio.Name, radio.Token, radio.Id) -} diff --git a/server/files.go b/server/files.go deleted file mode 100644 index b452997..0000000 --- a/server/files.go +++ /dev/null @@ -1,81 +0,0 @@ -package main - -import ( - "crypto/sha256" - "encoding/hex" - "io" - "log" - "os" - "path/filepath" - "sync" -) - -type FileSpec struct { - Name string - Hash string -} - -type AudioFiles struct { - path string - list []FileSpec - changeWait chan bool - filesMutex sync.Mutex -} - -var files AudioFiles - -func InitAudioFiles(path string) { - files.changeWait = make(chan bool) - files.path = path - log.Println("initing audio files") - files.Refresh() - log.Println("done") -} - -func (r *AudioFiles) Refresh() { - entries, err := os.ReadDir(r.path) - if err != nil { - log.Println("couldn't read dir", r.path) - return - } - r.filesMutex.Lock() - defer r.filesMutex.Unlock() - r.list = nil - for _, file := range entries { - f, err := os.Open(filepath.Join(r.path, file.Name())) - if err != nil { - log.Println("couldn't open", file.Name()) - return - } - hash := sha256.New() - io.Copy(hash, f) - r.list = append(r.list, FileSpec{Name: file.Name(), Hash: hex.EncodeToString(hash.Sum(nil))}) - } - log.Println("Files updated", r.list) - close(files.changeWait) - files.changeWait = make(chan bool) -} - -func (r *AudioFiles) Path() string { - return r.path -} - -func (r *AudioFiles) Files() []FileSpec { - r.filesMutex.Lock() - defer r.filesMutex.Unlock() - return r.list -} - -func (r *AudioFiles) Delete(filename string) { - path := filepath.Join(r.path, filepath.Base(filename)) - if filepath.Clean(r.path) != filepath.Clean(path) { - os.Remove(path) - r.Refresh() - } -} - -func (r *AudioFiles) WatchForChanges() ([]FileSpec, chan bool) { - r.filesMutex.Lock() - defer r.filesMutex.Unlock() - return r.list, r.changeWait -} diff --git a/server/model.go b/server/model.go deleted file mode 100644 index 94a6ab4..0000000 --- a/server/model.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -type PlaylistEntry struct { - Id int - Position int - Filename string - DelaySeconds int - IsRelative bool -} - -type User struct { - username string -} - -type Playlist struct { - Id int - Enabled bool - Name string - StartTime string -} - -type Radio struct { - Id int - Name string - Token string -} diff --git a/server/playlist.go b/server/playlist.go deleted file mode 100644 index f0a6c89..0000000 --- a/server/playlist.go +++ /dev/null @@ -1,35 +0,0 @@ -package main - -import ( - "sync" -) - -type Playlists struct { - changeWait chan bool - playlistMutex sync.Mutex -} - -var playlists Playlists - -func InitPlaylists() { - playlists.changeWait = make(chan bool) -} - -func (p *Playlists) GetPlaylists() []Playlist { - p.playlistMutex.Lock() - defer p.playlistMutex.Unlock() - return db.GetPlaylists() -} - -func (p *Playlists) WatchForChanges() ([]Playlist, chan bool) { - p.playlistMutex.Lock() - defer p.playlistMutex.Unlock() - return db.GetPlaylists(), p.changeWait -} - -func (p *Playlists) NotifyChanges() { - p.playlistMutex.Lock() - defer p.playlistMutex.Unlock() - close(p.changeWait) - p.changeWait = make(chan bool) -} diff --git a/server/radio_sync.go b/server/radio_sync.go deleted file mode 100644 index 2eaef34..0000000 --- a/server/radio_sync.go +++ /dev/null @@ -1,125 +0,0 @@ -package main - -import ( - "code.octet-stream.net/broadcaster/protocol" - "encoding/json" - "golang.org/x/net/websocket" - "log" -) - -func RadioSync(ws *websocket.Conn) { - log.Println("A websocket connected, I think") - buf := make([]byte, 16384) - - badRead := false - isAuthenticated := false - var radio Radio - for { - // Ignore any massively oversize messages - n, err := ws.Read(buf) - if err != nil { - if radio.Name != "" { - log.Println("Lost websocket to radio:", radio.Name) - status.RadioDisconnected(radio.Id) - } else { - log.Println("Lost unauthenticated websocket") - } - return - } - if n == len(buf) { - badRead = true - continue - } else if badRead { - badRead = false - continue - } - - t, msg, err := protocol.ParseMessage(buf[:n]) - if err != nil { - log.Println(err) - return - } - - if !isAuthenticated && t != protocol.AuthenticateType { - continue - } - - if t == protocol.AuthenticateType && !isAuthenticated { - authMsg := msg.(protocol.AuthenticateMessage) - r, err := db.GetRadioByToken(authMsg.Token) - if err != nil { - log.Println("Could not find radio for offered token", authMsg.Token) - } - radio = r - log.Println("Radio authenticated:", radio.Name) - isAuthenticated = true - commandRouter.AddWebsocket(r.Id, ws) - defer commandRouter.RemoveWebsocket(ws) - - go KeepFilesUpdated(ws) - go KeepPlaylistsUpdated(ws) - } - - if t == protocol.StatusType { - statusMsg := msg.(protocol.StatusMessage) - log.Println("Received Status from", radio.Name, ":", statusMsg) - status.MergeStatus(radio.Id, statusMsg) - } - } -} - -func sendPlaylistsMessageToRadio(ws *websocket.Conn, p []Playlist) error { - playlistSpecs := make([]protocol.PlaylistSpec, 0) - for _, v := range p { - if v.Enabled { - entrySpecs := make([]protocol.EntrySpec, 0) - for _, e := range db.GetEntriesForPlaylist(v.Id) { - entrySpecs = append(entrySpecs, protocol.EntrySpec{Filename: e.Filename, DelaySeconds: e.DelaySeconds, IsRelative: e.IsRelative}) - } - playlistSpecs = append(playlistSpecs, protocol.PlaylistSpec{Id: v.Id, Name: v.Name, StartTime: v.StartTime, Entries: entrySpecs}) - } - } - playlists := protocol.PlaylistsMessage{ - T: protocol.PlaylistsType, - Playlists: playlistSpecs, - } - msg, _ := json.Marshal(playlists) - _, err := ws.Write(msg) - return err -} - -func KeepPlaylistsUpdated(ws *websocket.Conn) { - for { - p, ch := playlists.WatchForChanges() - err := sendPlaylistsMessageToRadio(ws, p) - if err != nil { - return - } - <-ch - } -} - -func sendFilesMessageToRadio(ws *websocket.Conn, f []FileSpec) error { - specs := make([]protocol.FileSpec, 0) - for _, v := range f { - specs = append(specs, protocol.FileSpec{Name: v.Name, Hash: v.Hash}) - } - files := protocol.FilesMessage{ - T: protocol.FilesType, - Files: specs, - } - msg, _ := json.Marshal(files) - _, err := ws.Write(msg) - return err -} - -func KeepFilesUpdated(ws *websocket.Conn) { - for { - f, ch := files.WatchForChanges() - err := sendFilesMessageToRadio(ws, f) - if err != nil { - return - } - <-ch - } -} diff --git a/server/session.go b/server/session.go deleted file mode 100644 index 4b4c445..0000000 --- a/server/session.go +++ /dev/null @@ -1,51 +0,0 @@ -package main - -import ( - "crypto/rand" - "encoding/hex" - "log" - "net/http" - "time" -) - -func generateSession() string { - b := make([]byte, 32) - _, err := rand.Read(b) - if err != nil { - log.Fatal(err) - } - return hex.EncodeToString(b) -} - -func currentUser(w http.ResponseWriter, r *http.Request) (User, error) { - // todo: check if user actually exists and is allowed to log in - cookie, e := r.Cookie("broadcast_session") - if e != nil { - return User{}, e - } - - username, e := db.GetUserForSession(cookie.Value) - if e != nil { - return User{}, e - } - return User{username: username}, nil -} - -func createSessionCookie(w http.ResponseWriter) { - sess := generateSession() - log.Println("Generated a random session", sess) - expiration := time.Now().Add(365 * 24 * time.Hour) - cookie := http.Cookie{Name: "broadcast_session", Value: sess, Expires: expiration, SameSite: http.SameSiteLaxMode} - db.InsertSession("admin", sess, expiration) - http.SetCookie(w, &cookie) -} - -func clearSessionCookie(w http.ResponseWriter) { - c := &http.Cookie{ - Name: "broadcast_session", - Value: "", - MaxAge: -1, - HttpOnly: true, - } - http.SetCookie(w, c) -} diff --git a/server/status.go b/server/status.go deleted file mode 100644 index 260a0ce..0000000 --- a/server/status.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "code.octet-stream.net/broadcaster/protocol" - "sync" -) - -type ServerStatus struct { - statuses map[int]protocol.StatusMessage - statusesMutex sync.Mutex - changeWait chan bool -} - -var status ServerStatus - -func InitServerStatus() { - status = ServerStatus{ - statuses: make(map[int]protocol.StatusMessage), - changeWait: make(chan bool), - } -} - -func (s *ServerStatus) MergeStatus(radioId int, status protocol.StatusMessage) { - s.statusesMutex.Lock() - defer s.statusesMutex.Unlock() - s.statuses[radioId] = status - s.TriggerChange() -} - -func (s *ServerStatus) RadioDisconnected(radioId int) { - s.statusesMutex.Lock() - defer s.statusesMutex.Unlock() - delete(s.statuses, radioId) - s.TriggerChange() -} - -func (s *ServerStatus) TriggerChange() { - close(s.changeWait) - s.changeWait = make(chan bool) -} - -func (s *ServerStatus) Statuses() map[int]protocol.StatusMessage { - s.statusesMutex.Lock() - defer s.statusesMutex.Unlock() - c := make(map[int]protocol.StatusMessage) - for k, v := range s.statuses { - c[k] = v - } - return c -} - -func (s *ServerStatus) ChangeChannel() chan bool { - return s.changeWait -} diff --git a/server/web_sync.go b/server/web_sync.go deleted file mode 100644 index e59403d..0000000 --- a/server/web_sync.go +++ /dev/null @@ -1,162 +0,0 @@ -package main - -import ( - "fmt" - "html/template" - "log" - "sort" - "strconv" - "strings" - - "code.octet-stream.net/broadcaster/protocol" - "golang.org/x/net/websocket" -) - -func WebSync(ws *websocket.Conn) { - log.Println("A web user connected with WebSocket") - buf := make([]byte, 16384) - - badRead := false - isAuthenticated := false - var user string - for { - // Ignore any massively oversize messages - n, err := ws.Read(buf) - if err != nil { - if user != "" { - log.Println("Lost websocket to user:", user) - } else { - log.Println("Lost unauthenticated website websocket") - } - return - } - if n == len(buf) { - badRead = true - continue - } else if badRead { - badRead = false - continue - } - - if !isAuthenticated { - token := string(buf[:n]) - u, err := db.GetUserForSession(token) - if err != nil { - log.Println("Could not find user for offered token", token) - ws.Close() - return - } - user = u - log.Println("User authenticated:", user) - isAuthenticated = true - - go KeepWebUpdated(ws) - - // send initial playlists message - err = sendRadioStatusToWeb(ws) - if err != nil { - return - } - } - } -} - -type WebStatusData struct { - Radios []WebRadioStatus -} - -type WebRadioStatus struct { - Name string - LocalTime string - TimeZone string - ChannelClass string - ChannelState string - Playlist string - File string - Status string - Id string - DisableCancel bool - FilesInSync bool -} - -func sendRadioStatusToWeb(ws *websocket.Conn) error { - webStatuses := make([]WebRadioStatus, 0) - radioStatuses := status.Statuses() - keys := make([]int, 0) - for i := range radioStatuses { - keys = append(keys, i) - } - sort.Ints(keys) - for _, i := range keys { - v := radioStatuses[i] - radio, err := db.GetRadio(i) - if err != nil { - continue - } - var channelClass, channelState string - if v.PTT { - channelClass = "ptt" - channelState = "PTT" - } else if v.COS { - channelClass = "cos" - channelState = "RX" - } else { - channelClass = "clear" - channelState = "CLEAR" - } - var statusText string - var disableCancel bool - if v.Status == protocol.StatusIdle { - statusText = "Idle" - disableCancel = true - } else if v.Status == protocol.StatusDelay { - statusText = fmt.Sprintf("Performing delay before transmit: %ds remain", v.DelaySecondsRemaining) - disableCancel = false - } else if v.Status == protocol.StatusChannelInUse { - statusText = fmt.Sprintf("Waiting for channel to clear: %ds", v.WaitingForChannelSeconds) - disableCancel = false - } else if v.Status == protocol.StatusPlaying { - statusText = fmt.Sprintf("Playing: %d:%02d", v.PlaybackSecondsElapsed/60, v.PlaybackSecondsElapsed%60) - disableCancel = false - } - playlist := v.Playlist - if playlist == "" { - playlist = "-" - } - filename := v.Filename - if filename == "" { - filename = "-" - } - webStatuses = append(webStatuses, WebRadioStatus{ - Name: radio.Name, - LocalTime: v.LocalTime, - TimeZone: v.TimeZone, - ChannelClass: channelClass, - ChannelState: channelState, - Playlist: playlist, - File: filename, - Status: statusText, - Id: strconv.Itoa(i), - DisableCancel: disableCancel, - FilesInSync: v.FilesInSync, - }) - } - data := WebStatusData{ - Radios: webStatuses, - } - buf := new(strings.Builder) - tmpl := template.Must(template.ParseFiles("templates/radios.partial.html")) - tmpl.Execute(buf, data) - _, err := ws.Write([]byte(buf.String())) - return err -} - -func KeepWebUpdated(ws *websocket.Conn) { - for { - <-status.ChangeChannel() - err := sendRadioStatusToWeb(ws) - if err != nil { - return - } - } -} diff --git a/templates/files.html b/templates/files.html deleted file mode 100644 index 70d666f..0000000 --- a/templates/files.html +++ /dev/null @@ -1,25 +0,0 @@ - - - - - - Broadcaster - - -
-

Files! List

- -

Upload New File

-

-

- - -
-

-
- - diff --git a/templates/index.html b/templates/index.html deleted file mode 100644 index 5f4d747..0000000 --- a/templates/index.html +++ /dev/null @@ -1,81 +0,0 @@ - - - - - - Broadcaster - - - - -
-

Welcome!

- {{if .LoggedIn}} -

Your username is: {{.Username}}.

-

Log Out

- {{else}} -

Log In

- {{end}} -

File Management

-

Playlist Management

-

Radio Management

-

Connected Radios

-
- Loading... -
-
- - diff --git a/templates/login.html b/templates/login.html deleted file mode 100644 index 56772b9..0000000 --- a/templates/login.html +++ /dev/null @@ -1,23 +0,0 @@ - - - - - - Broadcaster Log In - - -
-

Log In

-
- {{if ne .Error ""}} -

{{.Error}}

- {{end}} -
-
-
-
- -
-
- - diff --git a/templates/logout.html b/templates/logout.html deleted file mode 100644 index b151265..0000000 --- a/templates/logout.html +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - Broadcaster Log Out - - -
-

Logged Out

-

Log In again

-
- - diff --git a/templates/playlist.html b/templates/playlist.html deleted file mode 100644 index 1e550ca..0000000 --- a/templates/playlist.html +++ /dev/null @@ -1,100 +0,0 @@ - - - - - - Broadcaster - - - -
-

A specific playlist

-

- {{if .Playlist.Id}} - Edit Playlist - {{else}} - Create New Playlist - {{end}} -

-
- -

- -
-

-

- - -

-

- - -

-

Playlist Items

- {{range .Entries}} -

- Wait until - - seconds from - - then play - - (Delete Item) -

- {{end}} -

- Add Item -

-

- -

-
- {{if .Playlist.Id}} -

Delete

-
- -

- -

-
- {{end}} - -
- - diff --git a/templates/playlists.html b/templates/playlists.html deleted file mode 100644 index 207d020..0000000 --- a/templates/playlists.html +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - Broadcaster - - -
-

Playlists!

- -

Add New Playlist

-
- - diff --git a/templates/radio.html b/templates/radio.html deleted file mode 100644 index 1c3dde0..0000000 --- a/templates/radio.html +++ /dev/null @@ -1,43 +0,0 @@ - - - - - - Broadcaster - - -
-

A specific radio

-

- {{if .Radio.Id}} - Edit Radio - {{else}} - Register New Radio - {{end}} -

-
- -

- - -

-

- Authentication token: {{.Radio.Token}} - -

-

- -

-
- {{if .Radio.Id}} -

Delete

-
- -

- -

-
- {{end}} -
- - diff --git a/templates/radios.html b/templates/radios.html deleted file mode 100644 index 255fbe2..0000000 --- a/templates/radios.html +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - Broadcaster - - -
-

Radios

- -

Register New Radio

-
- - diff --git a/templates/radios.partial.html b/templates/radios.partial.html deleted file mode 100644 index fb36ca8..0000000 --- a/templates/radios.partial.html +++ /dev/null @@ -1,82 +0,0 @@ -{{if .Radios}} -{{range .Radios}} - - - - - - - - - - - - -
- {{.Name}} -
- - - - - - - - - - - - - -
- Local Time - - {{.LocalTime}} -
- Time Zone - - {{.TimeZone}} -
- Files In Sync - - {{if .FilesInSync}} Yes {{else}} No {{end}} -
-
- {{.ChannelState}} - - - - - - - - - - - - - - -
- Playlist: - - {{.Playlist}} -
- File: - - {{.File}} -
- Status: - - {{.Status}} -
-
-
- - -
-
-{{end}} -{{else}} -

There are no radios online.

-{{end}}