From 14725c30e45760e16a6e2e1166c115f256416f7c Mon Sep 17 00:00:00 2001 From: Nathan Osman Date: Sun, 24 Apr 2016 17:43:08 -0700 Subject: [PATCH] Added Writer type. --- writer.go | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 writer.go diff --git a/writer.go b/writer.go new file mode 100644 index 0000000..0429ad1 --- /dev/null +++ b/writer.go @@ -0,0 +1,102 @@ +package main + +import ( + "container/list" + "io" + "log" + "net/http" + "os" + "sync" +) + +// Status of download in progress. +type Status int + +const ( + StatusNone = iota // Nothing has happened yet + StatusDownloading // JSON and data files created + StatusError // Error retrieving the data + StatusDone // All data has been retrieved +) + +// Writer connects to a remote URL via HTTP and retrieves the data, writing it +// directly to disk and notifying subscribed readers of its status. +type Writer struct { + mutex sync.Mutex + channels *list.List + status Status +} + +func (w *Writer) sendStatus(statusChan chan Status, status Status) { + statusChan <- status +} + +func (w *Writer) setStatus(status Status) { + w.mutex.Lock() + defer w.mutex.Unlock() + w.status = status + for e := w.channels.Front(); e != nil; e = e.Next() { + go w.sendStatus(e.Value.(chan Status), status) + } +} + +// Create a new writer for the given URL. Status information is passed along to +// subscribed channels. The done channel is used to notify the storage system +// that the download is complete (either success or an error). +func NewWriter(url, jsonFilename, dataFilename string, done chan<- *Writer) *Writer { + w := &Writer{ + channels: make([]chan Status), + status: StatusNone, + } + go func() { + r, err := http.Get(url) + if err != nil { + goto error + } + defer r.Body.Close() + e := &Entry{ + URL: url, + ContentType: r.Header.Get("Content-Type"), + } + if err = e.Save(jsonFilename); err != nil { + goto error + } + f, err := os.Create(dataFilename) + if err != nil { + goto error + } + defer f.Close() + w.setStatus(StatusDownloading) + _, err = io.Copy(f, r.Body) + if err != nil { + goto error + } + w.setStatus(StatusDone) + done <- w + error: + w.setStatus(StatusError) + done <- w + }() + return w +} + +// Subscribe adds a channel to the list to be notified when the writer's status +// changes. The channel will also immediately receive the current status. +func (w *Writer) Subscribe(statusChan chan Status) { + w.mutex.Lock() + defer w.mutex.Unlock() + w.channels = append(w.channels, c) + w.sendStatus(c, w.status) +} + +// Unsubscribe a channel from the list to be notified. This may occur when a +// client cancels a request, for example. +func (w *Writer) Unsubscribe(statusChan chan Status) { + w.mutex.Lock() + defer w.mutex.Unlock() + for i, c := range w.channels { + if c == statusChan { + w.channels = append(w.channels[:i], w.channels[i+1:]...) + } + } +}