From b233dd8e9211e522ac4d635faa8a6e6c725692db Mon Sep 17 00:00:00 2001 From: Nathan Osman Date: Sat, 30 Apr 2016 21:00:43 -0700 Subject: [PATCH] Moved cache to a separate package. --- cache.go => cache/cache.go | 49 ++++++-------- cache/diskreader.go | 43 ++++++++++++ downloader.go => cache/downloader.go | 23 ++++--- entry.go => cache/entry.go | 2 +- cache/livereader.go | 93 ++++++++++++++++++++++++++ livereader.go | 99 ---------------------------- server.go | 47 +++++++------ 7 files changed, 192 insertions(+), 164 deletions(-) rename cache.go => cache/cache.go (67%) create mode 100644 cache/diskreader.go rename downloader.go => cache/downloader.go (71%) rename entry.go => cache/entry.go (98%) create mode 100644 cache/livereader.go delete mode 100644 livereader.go diff --git a/cache.go b/cache/cache.go similarity index 67% rename from cache.go rename to cache/cache.go index baffc52..b59534f 100644 --- a/cache.go +++ b/cache/cache.go @@ -1,4 +1,4 @@ -package main +package cache import ( "crypto/md5" @@ -11,11 +11,18 @@ import ( "sync" ) +// Reader is a generic interface for reading cache entries either from disk or +// directly attached to a downloader. +type Reader interface { + io.ReadCloser + GetEntry() (*Entry, error) +} + // Cache provides access to entries in the cache. type Cache struct { mutex sync.Mutex directory string - downloaders map[string]*Downloader + downloaders map[string]*downloader waitGroup sync.WaitGroup } @@ -26,15 +33,15 @@ func NewCache(directory string) (*Cache, error) { } return &Cache{ directory: directory, - downloaders: make(map[string]*Downloader), + downloaders: make(map[string]*downloader), }, nil } -// GetReader obtains an io.Reader for the specified rawurl. If a downloader +// GetReader obtains a Reader for the specified rawurl. If a downloader // currently exists for the URL, a live reader is created and connected to it. // If the URL exists in the cache, it is read using the standard file API. If // not, a downloader and live reader are created. -func (c *Cache) GetReader(rawurl string) (io.ReadCloser, chan *Entry, error) { +func (c *Cache) GetReader(rawurl string) (Reader, error) { var ( b = md5.Sum([]byte(rawurl)) hash = hex.EncodeToString(b[:]) @@ -48,30 +55,21 @@ func (c *Cache) GetReader(rawurl string) (io.ReadCloser, chan *Entry, error) { _, err := os.Stat(jsonFilename) if err != nil { if !os.IsNotExist(err) { - return nil, nil, err + return nil, err } } else { - e := &Entry{} - if err = e.Load(jsonFilename); err != nil { - return nil, nil, err + r, err := newDiskReader(jsonFilename, dataFilename) + if err != nil { + return nil, err } - if e.Complete { - f, err := os.Open(dataFilename) - if err != nil { - return nil, nil, err - } - eChan := make(chan *Entry) - go func() { - eChan <- e - close(eChan) - }() + if e, _ := r.GetEntry(); e.Complete { log.Println("[HIT]", rawurl) - return f, eChan, nil + return r, nil } } - d = NewDownloader(rawurl, jsonFilename, dataFilename) + d = newDownloader(rawurl, jsonFilename, dataFilename) go func() { - d.Wait() + d.WaitForDone() c.mutex.Lock() defer c.mutex.Unlock() delete(c.downloaders, hash) @@ -80,13 +78,8 @@ func (c *Cache) GetReader(rawurl string) (io.ReadCloser, chan *Entry, error) { c.downloaders[hash] = d c.waitGroup.Add(1) } - eChan := make(chan *Entry) - go func() { - eChan <- d.GetEntry() - close(eChan) - }() log.Println("[MISS]", rawurl) - return NewLiveReader(d, dataFilename), eChan, nil + return newLiveReader(d, dataFilename) } // TODO: implement some form of "safe abort" for downloads so that the entire diff --git a/cache/diskreader.go b/cache/diskreader.go new file mode 100644 index 0000000..510cb47 --- /dev/null +++ b/cache/diskreader.go @@ -0,0 +1,43 @@ +package cache + +import ( + "os" +) + +// diskReader reads a file from the cache on disk. +type diskReader struct { + entry *Entry + file *os.File +} + +// newDiskReader creates a reader from the provided JSON and data filenames. +// Failure to open either of these results in an immediate error. +func newDiskReader(jsonFilename, dataFilename string) (*diskReader, error) { + e := &Entry{} + if err := e.Load(jsonFilename); err != nil { + return nil, err + } + f, err := os.Open(dataFilename) + if err != nil { + return nil, err + } + return &diskReader{ + entry: e, + file: f, + }, nil +} + +// Read attempts to read as much data as possible into the provided buffer. +func (d *diskReader) Read(p []byte) (int, error) { + return d.file.Read(p) +} + +// Close attempts to close the data file. +func (d *diskReader) Close() error { + return d.file.Close() +} + +// GetEntry returns the Entry associated with the file. +func (d *diskReader) GetEntry() (*Entry, error) { + return d.entry, nil +} diff --git a/downloader.go b/cache/downloader.go similarity index 71% rename from downloader.go rename to cache/downloader.go index 09e6291..be4bdb1 100644 --- a/downloader.go +++ b/cache/downloader.go @@ -1,4 +1,4 @@ -package main +package cache import ( "errors" @@ -9,17 +9,17 @@ import ( "sync" ) -// Downloader attempts to download a file from a remote URL. -type Downloader struct { +// downloader attempts to download a file from a remote URL. +type downloader struct { doneMutex sync.Mutex err error entry *Entry entryMutex sync.Mutex } -// NewDownloader creates a new downloader. -func NewDownloader(rawurl, jsonFilename, dataFilename string) *Downloader { - d := &Downloader{} +// newDownloader creates a new downloader. +func newDownloader(rawurl, jsonFilename, dataFilename string) *downloader { + d := &downloader{} d.doneMutex.Lock() d.entryMutex.Lock() go func() { @@ -72,16 +72,15 @@ func NewDownloader(rawurl, jsonFilename, dataFilename string) *Downloader { return d } -// GetEntry waits until the Entry associated with the download is available. -// This call will block until the entry is available or an error occurs. -func (d *Downloader) GetEntry() *Entry { +// GetEntry retrieves the entry associated with the download. +func (d *downloader) GetEntry() (*Entry, error) { d.entryMutex.Lock() defer d.entryMutex.Unlock() - return d.entry + return d.entry, d.err } -// Wait will block until the download completes. -func (d *Downloader) Wait() error { +// WaitForDone will block until the download completes. +func (d *downloader) WaitForDone() error { d.doneMutex.Lock() defer d.doneMutex.Unlock() return d.err diff --git a/entry.go b/cache/entry.go similarity index 98% rename from entry.go rename to cache/entry.go index 895e7ec..0a51cd2 100644 --- a/entry.go +++ b/cache/entry.go @@ -1,4 +1,4 @@ -package main +package cache import ( "encoding/json" diff --git a/cache/livereader.go b/cache/livereader.go new file mode 100644 index 0000000..be7a594 --- /dev/null +++ b/cache/livereader.go @@ -0,0 +1,93 @@ +package cache + +import ( + "github.com/fsnotify/fsnotify" + + "io" + "os" +) + +// liveReader reads a file from disk, synchronizing reads with a downloader. +type liveReader struct { + downloader *downloader + file *os.File + watcher *fsnotify.Watcher + entry *Entry + done chan error + err error + eof bool +} + +// newLiveReader creates a reader from the provided downloader and data +// file. fsnotify is used to watch for writes to the file to avoid using a +// spinloop. Invoking this function assumes the existence of the data file. +func newLiveReader(d *downloader, dataFilename string) (*liveReader, error) { + f, err := os.Open(dataFilename) + if err != nil { + return nil, err + } + w, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + if err = w.Add(dataFilename); err != nil { + return nil, err + } + l := &liveReader{ + downloader: d, + file: f, + watcher: w, + done: make(chan error), + } + go func() { + defer close(l.done) + l.done <- d.WaitForDone() + }() + return l, err +} + +// Read attempts to read as much data as possible into the provided buffer. +// Since data is being downloaded as data is being read, fsnotify is used to +// monitor writes to the file. This function blocks until the requested amount +// of data is read, an error occurs, or EOF is encountered. +func (l *liveReader) Read(p []byte) (int, error) { + if l.err != nil { + return 0, l.err + } + bytesRead := 0 +loop: + for bytesRead < len(p) { + n, err := l.file.Read(p[bytesRead:]) + bytesRead += n + if err != nil { + if err != io.EOF || l.eof { + l.err = err + break loop + } + for { + select { + case e := <-l.watcher.Events: + if e.Op&fsnotify.Write != fsnotify.Write { + continue + } + case err = <-l.done: + l.err = err + l.eof = true + } + continue loop + } + } + } + return bytesRead, l.err +} + +// Close attempts to close the data file (if opened). +func (l *liveReader) Close() error { + return l.file.Close() +} + +// GetEntry returns the Entry associated with the file, blocking until either +// the data is available or an error occurs. +func (l *liveReader) GetEntry() (*Entry, error) { + return l.downloader.GetEntry() +} diff --git a/livereader.go b/livereader.go deleted file mode 100644 index ef256bd..0000000 --- a/livereader.go +++ /dev/null @@ -1,99 +0,0 @@ -package main - -import ( - "github.com/fsnotify/fsnotify" - - "io" - "os" -) - -// LiveReader synchronizes with a downloader to read from a file. -type LiveReader struct { - dataFilename string - open chan bool - done chan error - file *os.File - err error - eof bool -} - -// NewLiveReader creates a new live reader. -func NewLiveReader(d *Downloader, dataFilename string) *LiveReader { - l := &LiveReader{ - dataFilename: dataFilename, - open: make(chan bool), - done: make(chan error), - } - go func() { - d.GetEntry() - close(l.open) - l.done <- d.Wait() - close(l.done) - }() - return l -} - -// Read attempts to read data as it is being downloaded. If EOF is reached, -// fsnotify is used to watch for new data being written. The download is not -// complete until the "done" channel receives a value. -func (l *LiveReader) Read(p []byte) (int, error) { - if l.err != nil { - return 0, l.err - } - <-l.open - if l.file == nil { - f, err := os.Open(l.dataFilename) - if err != nil { - return 0, err - } - l.file = f - } - var ( - bytesRead int - watcher *fsnotify.Watcher - ) -loop: - for bytesRead < len(p) { - n, err := l.file.Read(p[bytesRead:]) - bytesRead += n - if err != nil { - if err != io.EOF || l.eof { - l.err = err - break loop - } - if watcher == nil { - watcher, err = fsnotify.NewWatcher() - if err != nil { - l.err = err - break loop - } - defer watcher.Close() - if err = watcher.Add(l.dataFilename); err != nil { - l.err = err - break loop - } - } - for { - select { - case e := <-watcher.Events: - if e.Op&fsnotify.Write != fsnotify.Write { - continue - } - case err = <-l.done: - l.err = err - l.eof = true - } - continue loop - } - } - } - return bytesRead, l.err -} - -// Close frees resources associated with the reader. -func (l *LiveReader) Close() error { - if l.file != nil { - l.file.Close() - } - return nil -} diff --git a/server.go b/server.go index 316e718..e1f897a 100644 --- a/server.go +++ b/server.go @@ -2,6 +2,7 @@ package main import ( "github.com/hectane/go-asyncserver" + "github.com/nathan-osman/go-aptproxy/cache" "io" "log" @@ -16,7 +17,7 @@ import ( // needless duplication. type Server struct { server *server.AsyncServer - cache *Cache + cache *cache.Cache } func rewrite(rawurl string) string { @@ -31,7 +32,7 @@ func rewrite(rawurl string) string { return rawurl } -func (s *Server) writeHeaders(w http.ResponseWriter, e *Entry) { +func (s *Server) writeHeaders(w http.ResponseWriter, e *cache.Entry) { if e.ContentType != "" { w.Header().Set("Content-Type", e.ContentType) } else { @@ -48,39 +49,37 @@ func (s *Server) writeHeaders(w http.ResponseWriter, e *Entry) { } // TODO: support for HEAD requests -// TODO: find a reasonable way for getting errors from eChan // ServeHTTP processes an incoming request to the proxy. GET requests are // served with the storage backend and every other request is (out of // necessity) rejected since it can't be cached. func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if req.Method == "GET" { - r, eChan, err := s.cache.GetReader(rewrite(req.RequestURI)) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - log.Println("[ERR]", err) - return - } - defer r.Close() - e := <-eChan - if e == nil { - http.Error(w, "header retrieval error", http.StatusInternalServerError) - log.Println("[ERR] header retrieval") - return - } - s.writeHeaders(w, e) - _, err = io.Copy(w, r) - if err != nil { - log.Println("[ERR]", err) - } - } else { + if req.Method != "GET" { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) } + r, err := s.cache.GetReader(rewrite(req.RequestURI)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Println("[ERR]", err) + return + } + defer r.Close() + e, err := r.GetEntry() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Println("[ERR]", err) + return + } + s.writeHeaders(w, e) + _, err = io.Copy(w, r) + if err != nil { + log.Println("[ERR]", err) + } } // NewServer creates a new server. func NewServer(addr, directory string) (*Server, error) { - c, err := NewCache(directory) + c, err := cache.NewCache(directory) if err != nil { return nil, err }