reworked some code parts

This commit is contained in:
Paul 2019-06-26 02:10:35 +02:00
parent 0083207235
commit a92d119b9d
3 changed files with 69 additions and 69 deletions

View File

@ -1,67 +1,29 @@
package main
import (
"archive/zip"
"flag"
"fmt"
"time"
_ "github.com/influxdata/influxdb1-client"
client "github.com/influxdata/influxdb1-client/v2"
)
var szip Srcfile
var sxml Srcfile
var ofile Outfile
var err error
var zipfile *zip.Reader
var prices *[]Price
var now = time.Now()
var fpc FuelPricesConfig
var configpath string
func main() {
var configpath string
var fpc FuelPricesConfig
var err error
flag.StringVar(&configpath, "configfile", "common.ini", "config file to use with fuelprices section")
flag.Parse()
GetConfig(configpath, &fpc)
sxml.Filename = fpc.RemoteFilename
var szip Srcfile
var output []byte
httpClient, err := client.NewHTTPClient(client.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%d", fpc.InfluxHost, fpc.InfluxPort),
Username: fpc.InfluxUser,
Password: fpc.InfluxPass,
})
err = DownloadFile(&fpc, &szip)
HandleFatalError(err)
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: fpc.InfluxDB,
})
err = ExtractZip(&fpc, &szip, &output)
HandleFatalError(err)
err = DownloadFile(fpc.RemoteURL)
HandleFatalError(err)
var prices *[]Price
GetPrices(&prices, &fpc, &output)
err = ExtractZip()
HandleFatalError(err)
GetPrices(&prices, fpc.Pos, fpc.Types, fpc.XPathBase)
for _, p := range *prices {
tags := map[string]string{"pdv": p.ID, "fuel": p.Fuel}
fields := map[string]interface{}{"value": p.Amount}
point, _ := client.NewPoint(
fpc.Table,
tags,
fields,
now,
)
bp.AddPoint(point)
err = httpClient.Write(bp)
HandleError(err)
}
SendToInflux(&fpc, prices)
}

View File

@ -10,8 +10,11 @@ import (
"net/http"
"os"
"strconv"
"time"
"github.com/antchfx/xmlquery"
_ "github.com/influxdata/influxdb1-client"
client "github.com/influxdata/influxdb1-client/v2"
"gopkg.in/ini.v1"
)
@ -54,26 +57,32 @@ func GetConfig(configfile string, fuelpricesconfig *FuelPricesConfig) error {
}
// DownloadFile fetch file from webserver
func DownloadFile(url string) error {
resp, err := http.Get(url)
func DownloadFile(fpc *FuelPricesConfig, szip *Srcfile) error {
pollTo := 30 * time.Millisecond
c := &http.Client{Timeout: pollTo * time.Second, Transport: &http.Transport{
IdleConnTimeout: pollTo,
DisableCompression: false,
}}
//resp, err := http.Get(url)
resp, err := c.Get(fpc.RemoteURL)
HandleError(err)
defer resp.Body.Close()
szip.Content, err = ioutil.ReadAll(resp.Body)
HandleError(err)
err = resp.Body.Close()
HandleError(err)
time.Sleep(pollTo)
return err
}
// ExtractZip get the XML file to be processed
func ExtractZip() error {
func ExtractZip(fpc *FuelPricesConfig, szip *Srcfile, output *[]byte) error {
zipfile, err = zip.NewReader(bytes.NewReader(szip.Content), int64(len(szip.Content)))
if err != nil {
log.Fatal("Unable to open zipfile")
}
zipfile, err := zip.NewReader(bytes.NewReader(szip.Content), int64(len(szip.Content)))
HandleFatalError(err)
for _, f := range zipfile.File {
if f.Name == fpc.RemoteFilename {
@ -81,7 +90,7 @@ func ExtractZip() error {
if err != nil {
return err
}
ofile.Content, err = ioutil.ReadAll(rc)
*output, err = ioutil.ReadAll(rc)
rc.Close()
} else {
log.Fatal("File not found")
@ -91,17 +100,17 @@ func ExtractZip() error {
}
// GetPrices parses the XML file and get values of prices
func GetPrices(prices **[]Price, spos []string, stypes []string, sxpathbase string) error {
func GetPrices(prices **[]Price, fpc *FuelPricesConfig, output *[]byte) error {
var pr []Price
var xml *xmlquery.Node
f := bytes.NewReader(ofile.Content)
xml, err = xmlquery.Parse(f)
f := bytes.NewReader(*output)
xml, err := xmlquery.Parse(f)
HandleError(err)
for _, station := range spos {
for _, fuel := range stypes {
query := fmt.Sprintf(sxpathbase, station, fuel)
for _, station := range fpc.Pos {
for _, fuel := range fpc.Types {
query := fmt.Sprintf(fpc.XPathBase, station, fuel)
list := xmlquery.FindOne(xml, query)
for _, i := range list.Attr {
@ -119,6 +128,40 @@ func GetPrices(prices **[]Price, spos []string, stypes []string, sxpathbase stri
return err
}
// SendToInflux sends time series data to influxdb
func SendToInflux(fpc *FuelPricesConfig, prices *[]Price) {
httpClient, err := client.NewHTTPClient(client.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%d", fpc.InfluxHost, fpc.InfluxPort),
Username: fpc.InfluxUser,
Password: fpc.InfluxPass,
})
HandleFatalError(err)
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: fpc.InfluxDB,
})
HandleFatalError(err)
for _, p := range *prices {
tags := map[string]string{"pdv": p.ID, "fuel": p.Fuel}
fields := map[string]interface{}{"value": p.Amount}
point, _ := client.NewPoint(
fpc.Table,
tags,
fields,
time.Now(),
)
log.Println(point)
bp.AddPoint(point)
err = httpClient.Write(bp)
HandleError(err)
}
}
// HandleError handles errors to return err
func HandleError(err error) error {
if err != nil {

View File

@ -7,11 +7,6 @@ type Srcfile struct {
Content []byte
}
// Outfile is the output XML file
type Outfile struct {
Content []byte
}
// FuelPricesConfig is the main configuration
type FuelPricesConfig struct {
RemoteURL string