diff --git a/fuelprices.go b/fuelprices.go index 08dd5c8..57749ac 100644 --- a/fuelprices.go +++ b/fuelprices.go @@ -1,67 +1,29 @@ package main import ( - "archive/zip" "flag" - "fmt" - "time" - - _ "github.com/influxdata/influxdb1-client" - client "github.com/influxdata/influxdb1-client/v2" ) -var szip Srcfile -var sxml Srcfile -var ofile Outfile -var err error -var zipfile *zip.Reader -var prices *[]Price -var now = time.Now() -var fpc FuelPricesConfig -var configpath string - func main() { + var configpath string + var fpc FuelPricesConfig + var err error flag.StringVar(&configpath, "configfile", "common.ini", "config file to use with fuelprices section") flag.Parse() GetConfig(configpath, &fpc) - sxml.Filename = fpc.RemoteFilename + var szip Srcfile + var output []byte - httpClient, err := client.NewHTTPClient(client.HTTPConfig{ - Addr: fmt.Sprintf("http://%s:%d", fpc.InfluxHost, fpc.InfluxPort), - Username: fpc.InfluxUser, - Password: fpc.InfluxPass, - }) + err = DownloadFile(&fpc, &szip) HandleFatalError(err) - bp, err := client.NewBatchPoints(client.BatchPointsConfig{ - Database: fpc.InfluxDB, - }) + err = ExtractZip(&fpc, &szip, &output) HandleFatalError(err) - err = DownloadFile(fpc.RemoteURL) - HandleFatalError(err) + var prices *[]Price + GetPrices(&prices, &fpc, &output) - err = ExtractZip() - HandleFatalError(err) - - GetPrices(&prices, fpc.Pos, fpc.Types, fpc.XPathBase) - - for _, p := range *prices { - - tags := map[string]string{"pdv": p.ID, "fuel": p.Fuel} - fields := map[string]interface{}{"value": p.Amount} - - point, _ := client.NewPoint( - fpc.Table, - tags, - fields, - now, - ) - - bp.AddPoint(point) - err = httpClient.Write(bp) - HandleError(err) - } + SendToInflux(&fpc, prices) } diff --git a/functions.go b/functions.go index dc9e288..4f0d4d8 100644 --- a/functions.go +++ b/functions.go @@ -10,8 +10,11 @@ import ( "net/http" "os" "strconv" + "time" "github.com/antchfx/xmlquery" + _ "github.com/influxdata/influxdb1-client" + client "github.com/influxdata/influxdb1-client/v2" "gopkg.in/ini.v1" ) @@ -54,26 +57,32 @@ func GetConfig(configfile string, fuelpricesconfig *FuelPricesConfig) error { } // DownloadFile fetch file from webserver -func DownloadFile(url string) error { - resp, err := http.Get(url) +func DownloadFile(fpc *FuelPricesConfig, szip *Srcfile) error { + pollTo := 30 * time.Millisecond + + c := &http.Client{Timeout: pollTo * time.Second, Transport: &http.Transport{ + IdleConnTimeout: pollTo, + DisableCompression: false, + }} + //resp, err := http.Get(url) + resp, err := c.Get(fpc.RemoteURL) HandleError(err) + defer resp.Body.Close() + szip.Content, err = ioutil.ReadAll(resp.Body) HandleError(err) - err = resp.Body.Close() - HandleError(err) + time.Sleep(pollTo) return err } // ExtractZip get the XML file to be processed -func ExtractZip() error { +func ExtractZip(fpc *FuelPricesConfig, szip *Srcfile, output *[]byte) error { - zipfile, err = zip.NewReader(bytes.NewReader(szip.Content), int64(len(szip.Content))) - if err != nil { - log.Fatal("Unable to open zipfile") - } + zipfile, err := zip.NewReader(bytes.NewReader(szip.Content), int64(len(szip.Content))) + HandleFatalError(err) for _, f := range zipfile.File { if f.Name == fpc.RemoteFilename { @@ -81,7 +90,7 @@ func ExtractZip() error { if err != nil { return err } - ofile.Content, err = ioutil.ReadAll(rc) + *output, err = ioutil.ReadAll(rc) rc.Close() } else { log.Fatal("File not found") @@ -91,17 +100,17 @@ func ExtractZip() error { } // GetPrices parses the XML file and get values of prices -func GetPrices(prices **[]Price, spos []string, stypes []string, sxpathbase string) error { +func GetPrices(prices **[]Price, fpc *FuelPricesConfig, output *[]byte) error { var pr []Price var xml *xmlquery.Node - f := bytes.NewReader(ofile.Content) - xml, err = xmlquery.Parse(f) + f := bytes.NewReader(*output) + xml, err := xmlquery.Parse(f) HandleError(err) - for _, station := range spos { - for _, fuel := range stypes { - query := fmt.Sprintf(sxpathbase, station, fuel) + for _, station := range fpc.Pos { + for _, fuel := range fpc.Types { + query := fmt.Sprintf(fpc.XPathBase, station, fuel) list := xmlquery.FindOne(xml, query) for _, i := range list.Attr { @@ -119,6 +128,40 @@ func GetPrices(prices **[]Price, spos []string, stypes []string, sxpathbase stri return err } +// SendToInflux sends time series data to influxdb +func SendToInflux(fpc *FuelPricesConfig, prices *[]Price) { + httpClient, err := client.NewHTTPClient(client.HTTPConfig{ + Addr: fmt.Sprintf("http://%s:%d", fpc.InfluxHost, fpc.InfluxPort), + Username: fpc.InfluxUser, + Password: fpc.InfluxPass, + }) + HandleFatalError(err) + + bp, err := client.NewBatchPoints(client.BatchPointsConfig{ + Database: fpc.InfluxDB, + }) + HandleFatalError(err) + + for _, p := range *prices { + + tags := map[string]string{"pdv": p.ID, "fuel": p.Fuel} + fields := map[string]interface{}{"value": p.Amount} + + point, _ := client.NewPoint( + fpc.Table, + tags, + fields, + time.Now(), + ) + + log.Println(point) + + bp.AddPoint(point) + err = httpClient.Write(bp) + HandleError(err) + } +} + // HandleError handles errors to return err func HandleError(err error) error { if err != nil { diff --git a/types.go b/types.go index 5f7d59f..993222e 100644 --- a/types.go +++ b/types.go @@ -7,11 +7,6 @@ type Srcfile struct { Content []byte } -// Outfile is the output XML file -type Outfile struct { - Content []byte -} - // FuelPricesConfig is the main configuration type FuelPricesConfig struct { RemoteURL string