changed data input into influxdb
This commit is contained in:
parent
ceb9f7a971
commit
7baad7e839
@ -86,11 +86,11 @@ func SendToInflux(wc *WeatherConfig, city string, measurement string, value floa
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
tags := map[string]string{"city": city, "measurement": measurement}
|
tags := map[string]string{"city": city}
|
||||||
fields := map[string]interface{}{"value": value}
|
fields := map[string]interface{}{"value": value}
|
||||||
|
|
||||||
point, _ := client.NewPoint(
|
point, _ := client.NewPoint(
|
||||||
wc.WeatherTable,
|
measurement,
|
||||||
tags,
|
tags,
|
||||||
fields,
|
fields,
|
||||||
time.Now(),
|
time.Now(),
|
||||||
|
722
vendor/github.com/influxdata/influxdb1-client/v2/client.go
generated
vendored
Normal file
722
vendor/github.com/influxdata/influxdb1-client/v2/client.go
generated
vendored
Normal file
@ -0,0 +1,722 @@
|
|||||||
|
// Package client (v2) is the current official Go client for InfluxDB.
|
||||||
|
package client // import "github.com/influxdata/influxdb1-client/v2"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"mime"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb1-client/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HTTPConfig is the config data needed to create an HTTP Client.
|
||||||
|
type HTTPConfig struct {
|
||||||
|
// Addr should be of the form "http://host:port"
|
||||||
|
// or "http://[ipv6-host%zone]:port".
|
||||||
|
Addr string
|
||||||
|
|
||||||
|
// Username is the influxdb username, optional.
|
||||||
|
Username string
|
||||||
|
|
||||||
|
// Password is the influxdb password, optional.
|
||||||
|
Password string
|
||||||
|
|
||||||
|
// UserAgent is the http User Agent, defaults to "InfluxDBClient".
|
||||||
|
UserAgent string
|
||||||
|
|
||||||
|
// Timeout for influxdb writes, defaults to no timeout.
|
||||||
|
Timeout time.Duration
|
||||||
|
|
||||||
|
// InsecureSkipVerify gets passed to the http client, if true, it will
|
||||||
|
// skip https certificate verification. Defaults to false.
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
// TLSConfig allows the user to set their own TLS config for the HTTP
|
||||||
|
// Client. If set, this option overrides InsecureSkipVerify.
|
||||||
|
TLSConfig *tls.Config
|
||||||
|
|
||||||
|
// Proxy configures the Proxy function on the HTTP client.
|
||||||
|
Proxy func(req *http.Request) (*url.URL, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
|
||||||
|
type BatchPointsConfig struct {
|
||||||
|
// Precision is the write precision of the points, defaults to "ns".
|
||||||
|
Precision string
|
||||||
|
|
||||||
|
// Database is the database to write points to.
|
||||||
|
Database string
|
||||||
|
|
||||||
|
// RetentionPolicy is the retention policy of the points.
|
||||||
|
RetentionPolicy string
|
||||||
|
|
||||||
|
// Write consistency is the number of servers required to confirm write.
|
||||||
|
WriteConsistency string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client is a client interface for writing & querying the database.
|
||||||
|
type Client interface {
|
||||||
|
// Ping checks that status of cluster, and will always return 0 time and no
|
||||||
|
// error for UDP clients.
|
||||||
|
Ping(timeout time.Duration) (time.Duration, string, error)
|
||||||
|
|
||||||
|
// Write takes a BatchPoints object and writes all Points to InfluxDB.
|
||||||
|
Write(bp BatchPoints) error
|
||||||
|
|
||||||
|
// Query makes an InfluxDB Query on the database. This will fail if using
|
||||||
|
// the UDP client.
|
||||||
|
Query(q Query) (*Response, error)
|
||||||
|
|
||||||
|
// QueryAsChunk makes an InfluxDB Query on the database. This will fail if using
|
||||||
|
// the UDP client.
|
||||||
|
QueryAsChunk(q Query) (*ChunkedResponse, error)
|
||||||
|
|
||||||
|
// Close releases any resources a Client may be using.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHTTPClient returns a new Client from the provided config.
|
||||||
|
// Client is safe for concurrent use by multiple goroutines.
|
||||||
|
func NewHTTPClient(conf HTTPConfig) (Client, error) {
|
||||||
|
if conf.UserAgent == "" {
|
||||||
|
conf.UserAgent = "InfluxDBClient"
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(conf.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if u.Scheme != "http" && u.Scheme != "https" {
|
||||||
|
m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
|
||||||
|
" must start with http:// or https://", u.Scheme)
|
||||||
|
return nil, errors.New(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
tr := &http.Transport{
|
||||||
|
TLSClientConfig: &tls.Config{
|
||||||
|
InsecureSkipVerify: conf.InsecureSkipVerify,
|
||||||
|
},
|
||||||
|
Proxy: conf.Proxy,
|
||||||
|
}
|
||||||
|
if conf.TLSConfig != nil {
|
||||||
|
tr.TLSClientConfig = conf.TLSConfig
|
||||||
|
}
|
||||||
|
return &client{
|
||||||
|
url: *u,
|
||||||
|
username: conf.Username,
|
||||||
|
password: conf.Password,
|
||||||
|
useragent: conf.UserAgent,
|
||||||
|
httpClient: &http.Client{
|
||||||
|
Timeout: conf.Timeout,
|
||||||
|
Transport: tr,
|
||||||
|
},
|
||||||
|
transport: tr,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
|
||||||
|
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
|
||||||
|
func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "ping")
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("User-Agent", c.useragent)
|
||||||
|
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeout > 0 {
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent {
|
||||||
|
var err = errors.New(string(body))
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
version := resp.Header.Get("X-Influxdb-Version")
|
||||||
|
return time.Since(now), version, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close releases the client's resources.
|
||||||
|
func (c *client) Close() error {
|
||||||
|
c.transport.CloseIdleConnections()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// client is safe for concurrent use as the fields are all read-only
|
||||||
|
// once the client is instantiated.
|
||||||
|
type client struct {
|
||||||
|
// N.B - if url.UserInfo is accessed in future modifications to the
|
||||||
|
// methods on client, you will need to synchronize access to url.
|
||||||
|
url url.URL
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
useragent string
|
||||||
|
httpClient *http.Client
|
||||||
|
transport *http.Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchPoints is an interface into a batched grouping of points to write into
|
||||||
|
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
|
||||||
|
// batch for each goroutine.
|
||||||
|
type BatchPoints interface {
|
||||||
|
// AddPoint adds the given point to the Batch of points.
|
||||||
|
AddPoint(p *Point)
|
||||||
|
// AddPoints adds the given points to the Batch of points.
|
||||||
|
AddPoints(ps []*Point)
|
||||||
|
// Points lists the points in the Batch.
|
||||||
|
Points() []*Point
|
||||||
|
|
||||||
|
// Precision returns the currently set precision of this Batch.
|
||||||
|
Precision() string
|
||||||
|
// SetPrecision sets the precision of this batch.
|
||||||
|
SetPrecision(s string) error
|
||||||
|
|
||||||
|
// Database returns the currently set database of this Batch.
|
||||||
|
Database() string
|
||||||
|
// SetDatabase sets the database of this Batch.
|
||||||
|
SetDatabase(s string)
|
||||||
|
|
||||||
|
// WriteConsistency returns the currently set write consistency of this Batch.
|
||||||
|
WriteConsistency() string
|
||||||
|
// SetWriteConsistency sets the write consistency of this Batch.
|
||||||
|
SetWriteConsistency(s string)
|
||||||
|
|
||||||
|
// RetentionPolicy returns the currently set retention policy of this Batch.
|
||||||
|
RetentionPolicy() string
|
||||||
|
// SetRetentionPolicy sets the retention policy of this Batch.
|
||||||
|
SetRetentionPolicy(s string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBatchPoints returns a BatchPoints interface based on the given config.
|
||||||
|
func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
|
||||||
|
if conf.Precision == "" {
|
||||||
|
conf.Precision = "ns"
|
||||||
|
}
|
||||||
|
if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
bp := &batchpoints{
|
||||||
|
database: conf.Database,
|
||||||
|
precision: conf.Precision,
|
||||||
|
retentionPolicy: conf.RetentionPolicy,
|
||||||
|
writeConsistency: conf.WriteConsistency,
|
||||||
|
}
|
||||||
|
return bp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type batchpoints struct {
|
||||||
|
points []*Point
|
||||||
|
database string
|
||||||
|
precision string
|
||||||
|
retentionPolicy string
|
||||||
|
writeConsistency string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) AddPoint(p *Point) {
|
||||||
|
bp.points = append(bp.points, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) AddPoints(ps []*Point) {
|
||||||
|
bp.points = append(bp.points, ps...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) Points() []*Point {
|
||||||
|
return bp.points
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) Precision() string {
|
||||||
|
return bp.precision
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) Database() string {
|
||||||
|
return bp.database
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) WriteConsistency() string {
|
||||||
|
return bp.writeConsistency
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) RetentionPolicy() string {
|
||||||
|
return bp.retentionPolicy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetPrecision(p string) error {
|
||||||
|
if _, err := time.ParseDuration("1" + p); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bp.precision = p
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetDatabase(db string) {
|
||||||
|
bp.database = db
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetWriteConsistency(wc string) {
|
||||||
|
bp.writeConsistency = wc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetRetentionPolicy(rp string) {
|
||||||
|
bp.retentionPolicy = rp
|
||||||
|
}
|
||||||
|
|
||||||
|
// Point represents a single data point.
|
||||||
|
type Point struct {
|
||||||
|
pt models.Point
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPoint returns a point with the given timestamp. If a timestamp is not
|
||||||
|
// given, then data is sent to the database without a timestamp, in which case
|
||||||
|
// the server will assign local time upon reception. NOTE: it is recommended to
|
||||||
|
// send data with a timestamp.
|
||||||
|
func NewPoint(
|
||||||
|
name string,
|
||||||
|
tags map[string]string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
t ...time.Time,
|
||||||
|
) (*Point, error) {
|
||||||
|
var T time.Time
|
||||||
|
if len(t) > 0 {
|
||||||
|
T = t[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Point{
|
||||||
|
pt: pt,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns a line-protocol string of the Point.
|
||||||
|
func (p *Point) String() string {
|
||||||
|
return p.pt.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrecisionString returns a line-protocol string of the Point,
|
||||||
|
// with the timestamp formatted for the given precision.
|
||||||
|
func (p *Point) PrecisionString(precision string) string {
|
||||||
|
return p.pt.PrecisionString(precision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name returns the measurement name of the point.
|
||||||
|
func (p *Point) Name() string {
|
||||||
|
return string(p.pt.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tags returns the tags associated with the point.
|
||||||
|
func (p *Point) Tags() map[string]string {
|
||||||
|
return p.pt.Tags().Map()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Time return the timestamp for the point.
|
||||||
|
func (p *Point) Time() time.Time {
|
||||||
|
return p.pt.Time()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
|
||||||
|
func (p *Point) UnixNano() int64 {
|
||||||
|
return p.pt.UnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fields returns the fields for the point.
|
||||||
|
func (p *Point) Fields() (map[string]interface{}, error) {
|
||||||
|
return p.pt.Fields()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPointFrom returns a point from the provided models.Point.
|
||||||
|
func NewPointFrom(pt models.Point) *Point {
|
||||||
|
return &Point{pt: pt}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) Write(bp BatchPoints) error {
|
||||||
|
var b bytes.Buffer
|
||||||
|
|
||||||
|
for _, p := range bp.Points() {
|
||||||
|
if p == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.WriteByte('\n'); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "write")
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), &b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.useragent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("db", bp.Database())
|
||||||
|
params.Set("rp", bp.RetentionPolicy())
|
||||||
|
params.Set("precision", bp.Precision())
|
||||||
|
params.Set("consistency", bp.WriteConsistency())
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
||||||
|
var err = errors.New(string(body))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query defines a query to send to the server.
|
||||||
|
type Query struct {
|
||||||
|
Command string
|
||||||
|
Database string
|
||||||
|
RetentionPolicy string
|
||||||
|
Precision string
|
||||||
|
Chunked bool
|
||||||
|
ChunkSize int
|
||||||
|
Parameters map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQuery returns a query object.
|
||||||
|
// The database and precision arguments can be empty strings if they are not needed for the query.
|
||||||
|
func NewQuery(command, database, precision string) Query {
|
||||||
|
return Query{
|
||||||
|
Command: command,
|
||||||
|
Database: database,
|
||||||
|
Precision: precision,
|
||||||
|
Parameters: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQueryWithRP returns a query object.
|
||||||
|
// The database, retention policy, and precision arguments can be empty strings if they are not needed
|
||||||
|
// for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater.
|
||||||
|
func NewQueryWithRP(command, database, retentionPolicy, precision string) Query {
|
||||||
|
return Query{
|
||||||
|
Command: command,
|
||||||
|
Database: database,
|
||||||
|
RetentionPolicy: retentionPolicy,
|
||||||
|
Precision: precision,
|
||||||
|
Parameters: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQueryWithParameters returns a query object.
|
||||||
|
// The database and precision arguments can be empty strings if they are not needed for the query.
|
||||||
|
// parameters is a map of the parameter names used in the command to their values.
|
||||||
|
func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
|
||||||
|
return Query{
|
||||||
|
Command: command,
|
||||||
|
Database: database,
|
||||||
|
Precision: precision,
|
||||||
|
Parameters: parameters,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response represents a list of statement results.
|
||||||
|
type Response struct {
|
||||||
|
Results []Result
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns the first error from any statement.
|
||||||
|
// It returns nil if no errors occurred on any statements.
|
||||||
|
func (r *Response) Error() error {
|
||||||
|
if r.Err != "" {
|
||||||
|
return errors.New(r.Err)
|
||||||
|
}
|
||||||
|
for _, result := range r.Results {
|
||||||
|
if result.Err != "" {
|
||||||
|
return errors.New(result.Err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Message represents a user message.
|
||||||
|
type Message struct {
|
||||||
|
Level string
|
||||||
|
Text string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Result represents a resultset returned from a single statement.
|
||||||
|
type Result struct {
|
||||||
|
Series []models.Row
|
||||||
|
Messages []*Message
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query sends a command to the server and returns the Response.
|
||||||
|
func (c *client) Query(q Query) (*Response, error) {
|
||||||
|
req, err := c.createDefaultRequest(q)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
params := req.URL.Query()
|
||||||
|
if q.Chunked {
|
||||||
|
params.Set("chunked", "true")
|
||||||
|
if q.ChunkSize > 0 {
|
||||||
|
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
|
||||||
|
}
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
}
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if err := checkResponse(resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response Response
|
||||||
|
if q.Chunked {
|
||||||
|
cr := NewChunkedResponse(resp.Body)
|
||||||
|
for {
|
||||||
|
r, err := cr.NextResponse()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// If we got an error while decoding the response, send that back.
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if r == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
response.Results = append(response.Results, r.Results...)
|
||||||
|
if r.Err != "" {
|
||||||
|
response.Err = r.Err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
dec.UseNumber()
|
||||||
|
decErr := dec.Decode(&response)
|
||||||
|
|
||||||
|
// ignore this error if we got an invalid status code
|
||||||
|
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
|
||||||
|
decErr = nil
|
||||||
|
}
|
||||||
|
// If we got a valid decode error, send that back
|
||||||
|
if decErr != nil {
|
||||||
|
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't have an error in our json response, and didn't get statusOK
|
||||||
|
// then send back an error
|
||||||
|
if resp.StatusCode != http.StatusOK && response.Error() == nil {
|
||||||
|
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryAsChunk sends a command to the server and returns the Response.
|
||||||
|
func (c *client) QueryAsChunk(q Query) (*ChunkedResponse, error) {
|
||||||
|
req, err := c.createDefaultRequest(q)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("chunked", "true")
|
||||||
|
if q.ChunkSize > 0 {
|
||||||
|
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
|
||||||
|
}
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := checkResponse(resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return NewChunkedResponse(resp.Body), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkResponse(resp *http.Response) error {
|
||||||
|
// If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
|
||||||
|
// but instead some other service. If the error code is also a 500+ code, then some
|
||||||
|
// downstream loadbalancer/proxy/etc had an issue and we should report that.
|
||||||
|
if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil || len(body) == 0 {
|
||||||
|
return fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we get an unexpected content type, then it is also not from influx direct and therefore
|
||||||
|
// we want to know what we received and what status code was returned for debugging purposes.
|
||||||
|
if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
|
||||||
|
// Read up to 1kb of the body to help identify downstream errors and limit the impact of things
|
||||||
|
// like downstream serving a large file
|
||||||
|
body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
|
||||||
|
if err != nil || len(body) == 0 {
|
||||||
|
return fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) createDefaultRequest(q Query) (*http.Request, error) {
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "query")
|
||||||
|
|
||||||
|
jsonParameters, err := json.Marshal(q.Parameters)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.useragent)
|
||||||
|
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("q", q.Command)
|
||||||
|
params.Set("db", q.Database)
|
||||||
|
if q.RetentionPolicy != "" {
|
||||||
|
params.Set("rp", q.RetentionPolicy)
|
||||||
|
}
|
||||||
|
params.Set("params", string(jsonParameters))
|
||||||
|
|
||||||
|
if q.Precision != "" {
|
||||||
|
params.Set("epoch", q.Precision)
|
||||||
|
}
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
return req, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// duplexReader reads responses and writes it to another writer while
|
||||||
|
// satisfying the reader interface.
|
||||||
|
type duplexReader struct {
|
||||||
|
r io.ReadCloser
|
||||||
|
w io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *duplexReader) Read(p []byte) (n int, err error) {
|
||||||
|
n, err = r.r.Read(p)
|
||||||
|
if err == nil {
|
||||||
|
r.w.Write(p[:n])
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the response.
|
||||||
|
func (r *duplexReader) Close() error {
|
||||||
|
return r.r.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChunkedResponse represents a response from the server that
|
||||||
|
// uses chunking to stream the output.
|
||||||
|
type ChunkedResponse struct {
|
||||||
|
dec *json.Decoder
|
||||||
|
duplex *duplexReader
|
||||||
|
buf bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChunkedResponse reads a stream and produces responses from the stream.
|
||||||
|
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
|
||||||
|
rc, ok := r.(io.ReadCloser)
|
||||||
|
if !ok {
|
||||||
|
rc = ioutil.NopCloser(r)
|
||||||
|
}
|
||||||
|
resp := &ChunkedResponse{}
|
||||||
|
resp.duplex = &duplexReader{r: rc, w: &resp.buf}
|
||||||
|
resp.dec = json.NewDecoder(resp.duplex)
|
||||||
|
resp.dec.UseNumber()
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextResponse reads the next line of the stream and returns a response.
|
||||||
|
func (r *ChunkedResponse) NextResponse() (*Response, error) {
|
||||||
|
var response Response
|
||||||
|
if err := r.dec.Decode(&response); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// A decoding error happened. This probably means the server crashed
|
||||||
|
// and sent a last-ditch error message to us. Ensure we have read the
|
||||||
|
// entirety of the connection to get any remaining error text.
|
||||||
|
io.Copy(ioutil.Discard, r.duplex)
|
||||||
|
return nil, errors.New(strings.TrimSpace(r.buf.String()))
|
||||||
|
}
|
||||||
|
|
||||||
|
r.buf.Reset()
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the response.
|
||||||
|
func (r *ChunkedResponse) Close() error {
|
||||||
|
return r.duplex.Close()
|
||||||
|
}
|
116
vendor/github.com/influxdata/influxdb1-client/v2/udp.go
generated
vendored
Normal file
116
vendor/github.com/influxdata/influxdb1-client/v2/udp.go
generated
vendored
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// UDPPayloadSize is a reasonable default payload size for UDP packets that
|
||||||
|
// could be travelling over the internet.
|
||||||
|
UDPPayloadSize = 512
|
||||||
|
)
|
||||||
|
|
||||||
|
// UDPConfig is the config data needed to create a UDP Client.
|
||||||
|
type UDPConfig struct {
|
||||||
|
// Addr should be of the form "host:port"
|
||||||
|
// or "[ipv6-host%zone]:port".
|
||||||
|
Addr string
|
||||||
|
|
||||||
|
// PayloadSize is the maximum size of a UDP client message, optional
|
||||||
|
// Tune this based on your network. Defaults to UDPPayloadSize.
|
||||||
|
PayloadSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
|
||||||
|
// service from the given config.
|
||||||
|
func NewUDPClient(conf UDPConfig) (Client, error) {
|
||||||
|
var udpAddr *net.UDPAddr
|
||||||
|
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := net.DialUDP("udp", nil, udpAddr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadSize := conf.PayloadSize
|
||||||
|
if payloadSize == 0 {
|
||||||
|
payloadSize = UDPPayloadSize
|
||||||
|
}
|
||||||
|
|
||||||
|
return &udpclient{
|
||||||
|
conn: conn,
|
||||||
|
payloadSize: payloadSize,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close releases the udpclient's resources.
|
||||||
|
func (uc *udpclient) Close() error {
|
||||||
|
return uc.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type udpclient struct {
|
||||||
|
conn io.WriteCloser
|
||||||
|
payloadSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) Write(bp BatchPoints) error {
|
||||||
|
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
|
||||||
|
var d, _ = time.ParseDuration("1" + bp.Precision())
|
||||||
|
|
||||||
|
var delayedError error
|
||||||
|
|
||||||
|
var checkBuffer = func(n int) {
|
||||||
|
if len(b) > 0 && len(b)+n > uc.payloadSize {
|
||||||
|
if _, err := uc.conn.Write(b); err != nil {
|
||||||
|
delayedError = err
|
||||||
|
}
|
||||||
|
b = b[:0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range bp.Points() {
|
||||||
|
p.pt.Round(d)
|
||||||
|
pointSize := p.pt.StringSize() + 1 // include newline in size
|
||||||
|
//point := p.pt.RoundedString(d) + "\n"
|
||||||
|
|
||||||
|
checkBuffer(pointSize)
|
||||||
|
|
||||||
|
if p.Time().IsZero() || pointSize <= uc.payloadSize {
|
||||||
|
b = p.pt.AppendString(b)
|
||||||
|
b = append(b, '\n')
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
|
||||||
|
for _, sp := range points {
|
||||||
|
checkBuffer(sp.StringSize() + 1)
|
||||||
|
b = sp.AppendString(b)
|
||||||
|
b = append(b, '\n')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(b) > 0 {
|
||||||
|
if _, err := uc.conn.Write(b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return delayedError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) Query(q Query) (*Response, error) {
|
||||||
|
return nil, fmt.Errorf("Querying via UDP is not supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) QueryAsChunk(q Query) (*ChunkedResponse, error) {
|
||||||
|
return nil, fmt.Errorf("Querying via UDP is not supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
|
||||||
|
return 0, "", nil
|
||||||
|
}
|
1
vendor/modules.txt
vendored
1
vendor/modules.txt
vendored
@ -1,5 +1,6 @@
|
|||||||
# github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc
|
# github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc
|
||||||
github.com/influxdata/influxdb1-client
|
github.com/influxdata/influxdb1-client
|
||||||
|
github.com/influxdata/influxdb1-client/v2
|
||||||
github.com/influxdata/influxdb1-client/models
|
github.com/influxdata/influxdb1-client/models
|
||||||
github.com/influxdata/influxdb1-client/pkg/escape
|
github.com/influxdata/influxdb1-client/pkg/escape
|
||||||
# gopkg.in/ini.v1 v1.44.0
|
# gopkg.in/ini.v1 v1.44.0
|
||||||
|
@ -9,10 +9,10 @@ import (
|
|||||||
_ "github.com/influxdata/influxdb1-client"
|
_ "github.com/influxdata/influxdb1-client"
|
||||||
)
|
)
|
||||||
|
|
||||||
var err error
|
|
||||||
var now = time.Now()
|
|
||||||
var wc WeatherConfig
|
var wc WeatherConfig
|
||||||
var configpath string
|
var configpath string
|
||||||
|
var err error
|
||||||
|
var now = time.Now()
|
||||||
|
|
||||||
const kelvin = -273.15
|
const kelvin = -273.15
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user