updated go modules
This commit is contained in:
parent
74d3e615aa
commit
4659714626
4
go.mod
4
go.mod
@ -4,9 +4,9 @@ go 1.12
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/antchfx/xmlquery v1.0.0
|
github.com/antchfx/xmlquery v1.0.0
|
||||||
github.com/antchfx/xpath v0.0.0-20190319080838-ce1d48779e67 // indirect
|
github.com/antchfx/xpath v1.0.0 // indirect
|
||||||
github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc
|
github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc
|
||||||
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect
|
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect
|
||||||
golang.org/x/net v0.0.0-20190603091049-60506f45cf65 // indirect
|
golang.org/x/net v0.0.0-20190607181551-461777fb6f67 // indirect
|
||||||
gopkg.in/ini.v1 v1.42.0
|
gopkg.in/ini.v1 v1.42.0
|
||||||
)
|
)
|
||||||
|
8
go.sum
8
go.sum
@ -1,7 +1,7 @@
|
|||||||
github.com/antchfx/xmlquery v1.0.0 h1:YuEPqexGG2opZKNc9JU3Zw6zFXwC47wNcy6/F8oKsrM=
|
github.com/antchfx/xmlquery v1.0.0 h1:YuEPqexGG2opZKNc9JU3Zw6zFXwC47wNcy6/F8oKsrM=
|
||||||
github.com/antchfx/xmlquery v1.0.0/go.mod h1:/+CnyD/DzHRnv2eRxrVbieRU/FIF6N0C+7oTtyUtCKk=
|
github.com/antchfx/xmlquery v1.0.0/go.mod h1:/+CnyD/DzHRnv2eRxrVbieRU/FIF6N0C+7oTtyUtCKk=
|
||||||
github.com/antchfx/xpath v0.0.0-20190319080838-ce1d48779e67 h1:uj4UuiIs53RhHSySIupR1TEIouckjSfnljF3QbN1yh0=
|
github.com/antchfx/xpath v1.0.0 h1:Q5gFgh2O40VTSwMOVbFE7nFNRBu3tS21Tn0KAWeEjtk=
|
||||||
github.com/antchfx/xpath v0.0.0-20190319080838-ce1d48779e67/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk=
|
github.com/antchfx/xpath v1.0.0/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk=
|
||||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
|
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
|
||||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||||
github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc h1:KpMgaYJRieDkHZJWY3LMafvtqS/U8xX6+lUN+OKpl/Y=
|
github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc h1:KpMgaYJRieDkHZJWY3LMafvtqS/U8xX6+lUN+OKpl/Y=
|
||||||
@ -14,8 +14,8 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2
|
|||||||
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||||
golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ=
|
golang.org/x/net v0.0.0-20190607181551-461777fb6f67 h1:rJJxsykSlULwd2P2+pg/rtnwN2FrWp4IuCxOSyS0V00=
|
||||||
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
|
golang.org/x/net v0.0.0-20190607181551-461777fb6f67/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
|
21
vendor/github.com/influxdata/influxdb1-client/LICENSE
generated
vendored
Normal file
21
vendor/github.com/influxdata/influxdb1-client/LICENSE
generated
vendored
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
MIT License
|
||||||
|
|
||||||
|
Copyright (c) 2019 InfluxData
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
29
vendor/github.com/influxdata/influxdb1-client/README.md
generated
vendored
Normal file
29
vendor/github.com/influxdata/influxdb1-client/README.md
generated
vendored
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
# influxdb1-clientv2
|
||||||
|
influxdb1-clientv2 is the current Go client API for InfluxDB 1.x. A Go client for the 2.0 API will be coming soon.
|
||||||
|
|
||||||
|
InfluxDB is an open-source distributed time series database, find more about [InfluxDB](https://www.influxdata.com/time-series-platform/influxdb/) at https://docs.influxdata.com/influxdb/latest
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
To import into your Go project, run the following command in your terminal:
|
||||||
|
`go get github.com/influxdata/influxdb1-client/v2`
|
||||||
|
Then, in your import declaration section of your Go file, paste the following:
|
||||||
|
`import "github.com/influxdata/influxdb1-client/v2"`
|
||||||
|
|
||||||
|
## Example
|
||||||
|
The following example creates a new client to the InfluxDB host on localhost:8086 and runs a query for the measurement `cpu_load` from the `mydb` database.
|
||||||
|
``` go
|
||||||
|
func ExampleClient_query() {
|
||||||
|
c, err := client.NewHTTPClient(client.HTTPConfig{
|
||||||
|
Addr: "http://localhost:8086",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error creating InfluxDB Client: ", err.Error())
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
q := client.NewQuery("SELECT count(value) FROM cpu_load", "mydb", "")
|
||||||
|
if response, err := c.Query(q); err == nil && response.Error() == nil {
|
||||||
|
fmt.Println(response.Results)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
870
vendor/github.com/influxdata/influxdb1-client/influxdb.go
generated
vendored
Normal file
870
vendor/github.com/influxdata/influxdb1-client/influxdb.go
generated
vendored
Normal file
@ -0,0 +1,870 @@
|
|||||||
|
// Package client implements a now-deprecated client for InfluxDB;
|
||||||
|
// use github.com/influxdata/influxdb1-client/v2 instead.
|
||||||
|
package client // import "github.com/influxdata/influxdb1-client"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb1-client/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultHost is the default host used to connect to an InfluxDB instance
|
||||||
|
DefaultHost = "localhost"
|
||||||
|
|
||||||
|
// DefaultPort is the default port used to connect to an InfluxDB instance
|
||||||
|
DefaultPort = 8086
|
||||||
|
|
||||||
|
// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
|
||||||
|
DefaultTimeout = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
// Query is used to send a command to the server. Both Command and Database are required.
|
||||||
|
type Query struct {
|
||||||
|
Command string
|
||||||
|
Database string
|
||||||
|
|
||||||
|
// RetentionPolicy tells the server which retention policy to use by default.
|
||||||
|
// This option is only effective when querying a server of version 1.6.0 or later.
|
||||||
|
RetentionPolicy string
|
||||||
|
|
||||||
|
// Chunked tells the server to send back chunked responses. This places
|
||||||
|
// less load on the server by sending back chunks of the response rather
|
||||||
|
// than waiting for the entire response all at once.
|
||||||
|
Chunked bool
|
||||||
|
|
||||||
|
// ChunkSize sets the maximum number of rows that will be returned per
|
||||||
|
// chunk. Chunks are either divided based on their series or if they hit
|
||||||
|
// the chunk size limit.
|
||||||
|
//
|
||||||
|
// Chunked must be set to true for this option to be used.
|
||||||
|
ChunkSize int
|
||||||
|
|
||||||
|
// NodeID sets the data node to use for the query results. This option only
|
||||||
|
// has any effect in the enterprise version of the software where there can be
|
||||||
|
// more than one data node and is primarily useful for analyzing differences in
|
||||||
|
// data. The default behavior is to automatically select the appropriate data
|
||||||
|
// nodes to retrieve all of the data. On a database where the number of data nodes
|
||||||
|
// is greater than the replication factor, it is expected that setting this option
|
||||||
|
// will only retrieve partial data.
|
||||||
|
NodeID int
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseConnectionString will parse a string to create a valid connection URL
|
||||||
|
func ParseConnectionString(path string, ssl bool) (url.URL, error) {
|
||||||
|
var host string
|
||||||
|
var port int
|
||||||
|
|
||||||
|
h, p, err := net.SplitHostPort(path)
|
||||||
|
if err != nil {
|
||||||
|
if path == "" {
|
||||||
|
host = DefaultHost
|
||||||
|
} else {
|
||||||
|
host = path
|
||||||
|
}
|
||||||
|
// If they didn't specify a port, always use the default port
|
||||||
|
port = DefaultPort
|
||||||
|
} else {
|
||||||
|
host = h
|
||||||
|
port, err = strconv.Atoi(p)
|
||||||
|
if err != nil {
|
||||||
|
return url.URL{}, fmt.Errorf("invalid port number %q: %s\n", path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
u := url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
Host: host,
|
||||||
|
}
|
||||||
|
if ssl {
|
||||||
|
u.Scheme = "https"
|
||||||
|
if port != 443 {
|
||||||
|
u.Host = net.JoinHostPort(host, strconv.Itoa(port))
|
||||||
|
}
|
||||||
|
} else if port != 80 {
|
||||||
|
u.Host = net.JoinHostPort(host, strconv.Itoa(port))
|
||||||
|
}
|
||||||
|
|
||||||
|
return u, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config is used to specify what server to connect to.
|
||||||
|
// URL: The URL of the server connecting to.
|
||||||
|
// Username/Password are optional. They will be passed via basic auth if provided.
|
||||||
|
// UserAgent: If not provided, will default "InfluxDBClient",
|
||||||
|
// Timeout: If not provided, will default to 0 (no timeout)
|
||||||
|
type Config struct {
|
||||||
|
URL url.URL
|
||||||
|
UnixSocket string
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
UserAgent string
|
||||||
|
Timeout time.Duration
|
||||||
|
Precision string
|
||||||
|
WriteConsistency string
|
||||||
|
UnsafeSsl bool
|
||||||
|
Proxy func(req *http.Request) (*url.URL, error)
|
||||||
|
TLS *tls.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewConfig will create a config to be used in connecting to the client
|
||||||
|
func NewConfig() Config {
|
||||||
|
return Config{
|
||||||
|
Timeout: DefaultTimeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client is used to make calls to the server.
|
||||||
|
type Client struct {
|
||||||
|
url url.URL
|
||||||
|
unixSocket string
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
httpClient *http.Client
|
||||||
|
userAgent string
|
||||||
|
precision string
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ConsistencyOne requires at least one data node acknowledged a write.
|
||||||
|
ConsistencyOne = "one"
|
||||||
|
|
||||||
|
// ConsistencyAll requires all data nodes to acknowledge a write.
|
||||||
|
ConsistencyAll = "all"
|
||||||
|
|
||||||
|
// ConsistencyQuorum requires a quorum of data nodes to acknowledge a write.
|
||||||
|
ConsistencyQuorum = "quorum"
|
||||||
|
|
||||||
|
// ConsistencyAny allows for hinted hand off, potentially no write happened yet.
|
||||||
|
ConsistencyAny = "any"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewClient will instantiate and return a connected client to issue commands to the server.
|
||||||
|
func NewClient(c Config) (*Client, error) {
|
||||||
|
tlsConfig := new(tls.Config)
|
||||||
|
if c.TLS != nil {
|
||||||
|
tlsConfig = c.TLS.Clone()
|
||||||
|
}
|
||||||
|
tlsConfig.InsecureSkipVerify = c.UnsafeSsl
|
||||||
|
|
||||||
|
tr := &http.Transport{
|
||||||
|
Proxy: c.Proxy,
|
||||||
|
TLSClientConfig: tlsConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.UnixSocket != "" {
|
||||||
|
// No need for compression in local communications.
|
||||||
|
tr.DisableCompression = true
|
||||||
|
|
||||||
|
tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
|
||||||
|
return net.Dial("unix", c.UnixSocket)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
client := Client{
|
||||||
|
url: c.URL,
|
||||||
|
unixSocket: c.UnixSocket,
|
||||||
|
username: c.Username,
|
||||||
|
password: c.Password,
|
||||||
|
httpClient: &http.Client{Timeout: c.Timeout, Transport: tr},
|
||||||
|
userAgent: c.UserAgent,
|
||||||
|
precision: c.Precision,
|
||||||
|
}
|
||||||
|
if client.userAgent == "" {
|
||||||
|
client.userAgent = "InfluxDBClient"
|
||||||
|
}
|
||||||
|
return &client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetAuth will update the username and passwords
|
||||||
|
func (c *Client) SetAuth(u, p string) {
|
||||||
|
c.username = u
|
||||||
|
c.password = p
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPrecision will update the precision
|
||||||
|
func (c *Client) SetPrecision(precision string) {
|
||||||
|
c.precision = precision
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query sends a command to the server and returns the Response
|
||||||
|
func (c *Client) Query(q Query) (*Response, error) {
|
||||||
|
return c.QueryContext(context.Background(), q)
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryContext sends a command to the server and returns the Response
|
||||||
|
// It uses a context that can be cancelled by the command line client
|
||||||
|
func (c *Client) QueryContext(ctx context.Context, q Query) (*Response, error) {
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "query")
|
||||||
|
|
||||||
|
values := u.Query()
|
||||||
|
values.Set("q", q.Command)
|
||||||
|
values.Set("db", q.Database)
|
||||||
|
if q.RetentionPolicy != "" {
|
||||||
|
values.Set("rp", q.RetentionPolicy)
|
||||||
|
}
|
||||||
|
if q.Chunked {
|
||||||
|
values.Set("chunked", "true")
|
||||||
|
if q.ChunkSize > 0 {
|
||||||
|
values.Set("chunk_size", strconv.Itoa(q.ChunkSize))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if q.NodeID > 0 {
|
||||||
|
values.Set("node_id", strconv.Itoa(q.NodeID))
|
||||||
|
}
|
||||||
|
if c.precision != "" {
|
||||||
|
values.Set("epoch", c.precision)
|
||||||
|
}
|
||||||
|
u.RawQuery = values.Encode()
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("User-Agent", c.userAgent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var response Response
|
||||||
|
if q.Chunked {
|
||||||
|
cr := NewChunkedResponse(resp.Body)
|
||||||
|
for {
|
||||||
|
r, err := cr.NextResponse()
|
||||||
|
if err != nil {
|
||||||
|
// If we got an error while decoding the response, send that back.
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if r == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
response.Results = append(response.Results, r.Results...)
|
||||||
|
if r.Err != nil {
|
||||||
|
response.Err = r.Err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
dec.UseNumber()
|
||||||
|
if err := dec.Decode(&response); err != nil {
|
||||||
|
// Ignore EOF errors if we got an invalid status code.
|
||||||
|
if !(err == io.EOF && resp.StatusCode != http.StatusOK) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't have an error in our json response, and didn't get StatusOK,
|
||||||
|
// then send back an error.
|
||||||
|
if resp.StatusCode != http.StatusOK && response.Error() == nil {
|
||||||
|
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write takes BatchPoints and allows for writing of multiple points with defaults
|
||||||
|
// If successful, error is nil and Response is nil
|
||||||
|
// If an error occurs, Response may contain additional information if populated.
|
||||||
|
func (c *Client) Write(bp BatchPoints) (*Response, error) {
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "write")
|
||||||
|
|
||||||
|
var b bytes.Buffer
|
||||||
|
for _, p := range bp.Points {
|
||||||
|
err := checkPointTypes(p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if p.Raw != "" {
|
||||||
|
if _, err := b.WriteString(p.Raw); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for k, v := range bp.Tags {
|
||||||
|
if p.Tags == nil {
|
||||||
|
p.Tags = make(map[string]string, len(bp.Tags))
|
||||||
|
}
|
||||||
|
p.Tags[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := b.WriteString(p.MarshalString()); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.WriteByte('\n'); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), &b)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.userAgent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
precision := bp.Precision
|
||||||
|
if precision == "" {
|
||||||
|
precision = c.precision
|
||||||
|
}
|
||||||
|
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("db", bp.Database)
|
||||||
|
params.Set("rp", bp.RetentionPolicy)
|
||||||
|
params.Set("precision", precision)
|
||||||
|
params.Set("consistency", bp.WriteConsistency)
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var response Response
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
||||||
|
var err = fmt.Errorf(string(body))
|
||||||
|
response.Err = err
|
||||||
|
return &response, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteLineProtocol takes a string with line returns to delimit each write
|
||||||
|
// If successful, error is nil and Response is nil
|
||||||
|
// If an error occurs, Response may contain additional information if populated.
|
||||||
|
func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error) {
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "write")
|
||||||
|
|
||||||
|
r := strings.NewReader(data)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.userAgent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("db", database)
|
||||||
|
params.Set("rp", retentionPolicy)
|
||||||
|
params.Set("precision", precision)
|
||||||
|
params.Set("consistency", writeConsistency)
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
var response Response
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
||||||
|
err := fmt.Errorf(string(body))
|
||||||
|
response.Err = err
|
||||||
|
return &response, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping will check to see if the server is up
|
||||||
|
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
|
||||||
|
func (c *Client) Ping() (time.Duration, string, error) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "ping")
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
req.Header.Set("User-Agent", c.userAgent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
version := resp.Header.Get("X-Influxdb-Version")
|
||||||
|
return time.Since(now), version, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Structs
|
||||||
|
|
||||||
|
// Message represents a user message.
|
||||||
|
type Message struct {
|
||||||
|
Level string `json:"level,omitempty"`
|
||||||
|
Text string `json:"text,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Result represents a resultset returned from a single statement.
|
||||||
|
type Result struct {
|
||||||
|
Series []models.Row
|
||||||
|
Messages []*Message
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON encodes the result into JSON.
|
||||||
|
func (r *Result) MarshalJSON() ([]byte, error) {
|
||||||
|
// Define a struct that outputs "error" as a string.
|
||||||
|
var o struct {
|
||||||
|
Series []models.Row `json:"series,omitempty"`
|
||||||
|
Messages []*Message `json:"messages,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy fields to output struct.
|
||||||
|
o.Series = r.Series
|
||||||
|
o.Messages = r.Messages
|
||||||
|
if r.Err != nil {
|
||||||
|
o.Err = r.Err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(&o)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON decodes the data into the Result struct
|
||||||
|
func (r *Result) UnmarshalJSON(b []byte) error {
|
||||||
|
var o struct {
|
||||||
|
Series []models.Row `json:"series,omitempty"`
|
||||||
|
Messages []*Message `json:"messages,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := json.NewDecoder(bytes.NewBuffer(b))
|
||||||
|
dec.UseNumber()
|
||||||
|
err := dec.Decode(&o)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Series = o.Series
|
||||||
|
r.Messages = o.Messages
|
||||||
|
if o.Err != "" {
|
||||||
|
r.Err = errors.New(o.Err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response represents a list of statement results.
|
||||||
|
type Response struct {
|
||||||
|
Results []Result
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON encodes the response into JSON.
|
||||||
|
func (r *Response) MarshalJSON() ([]byte, error) {
|
||||||
|
// Define a struct that outputs "error" as a string.
|
||||||
|
var o struct {
|
||||||
|
Results []Result `json:"results,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy fields to output struct.
|
||||||
|
o.Results = r.Results
|
||||||
|
if r.Err != nil {
|
||||||
|
o.Err = r.Err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(&o)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON decodes the data into the Response struct
|
||||||
|
func (r *Response) UnmarshalJSON(b []byte) error {
|
||||||
|
var o struct {
|
||||||
|
Results []Result `json:"results,omitempty"`
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := json.NewDecoder(bytes.NewBuffer(b))
|
||||||
|
dec.UseNumber()
|
||||||
|
err := dec.Decode(&o)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.Results = o.Results
|
||||||
|
if o.Err != "" {
|
||||||
|
r.Err = errors.New(o.Err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns the first error from any statement.
|
||||||
|
// Returns nil if no errors occurred on any statements.
|
||||||
|
func (r *Response) Error() error {
|
||||||
|
if r.Err != nil {
|
||||||
|
return r.Err
|
||||||
|
}
|
||||||
|
for _, result := range r.Results {
|
||||||
|
if result.Err != nil {
|
||||||
|
return result.Err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// duplexReader reads responses and writes it to another writer while
|
||||||
|
// satisfying the reader interface.
|
||||||
|
type duplexReader struct {
|
||||||
|
r io.Reader
|
||||||
|
w io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *duplexReader) Read(p []byte) (n int, err error) {
|
||||||
|
n, err = r.r.Read(p)
|
||||||
|
if err == nil {
|
||||||
|
r.w.Write(p[:n])
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChunkedResponse represents a response from the server that
|
||||||
|
// uses chunking to stream the output.
|
||||||
|
type ChunkedResponse struct {
|
||||||
|
dec *json.Decoder
|
||||||
|
duplex *duplexReader
|
||||||
|
buf bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChunkedResponse reads a stream and produces responses from the stream.
|
||||||
|
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
|
||||||
|
resp := &ChunkedResponse{}
|
||||||
|
resp.duplex = &duplexReader{r: r, w: &resp.buf}
|
||||||
|
resp.dec = json.NewDecoder(resp.duplex)
|
||||||
|
resp.dec.UseNumber()
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextResponse reads the next line of the stream and returns a response.
|
||||||
|
func (r *ChunkedResponse) NextResponse() (*Response, error) {
|
||||||
|
var response Response
|
||||||
|
if err := r.dec.Decode(&response); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
// A decoding error happened. This probably means the server crashed
|
||||||
|
// and sent a last-ditch error message to us. Ensure we have read the
|
||||||
|
// entirety of the connection to get any remaining error text.
|
||||||
|
io.Copy(ioutil.Discard, r.duplex)
|
||||||
|
return nil, errors.New(strings.TrimSpace(r.buf.String()))
|
||||||
|
}
|
||||||
|
r.buf.Reset()
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Point defines the fields that will be written to the database
|
||||||
|
// Measurement, Time, and Fields are required
|
||||||
|
// Precision can be specified if the time is in epoch format (integer).
|
||||||
|
// Valid values for Precision are n, u, ms, s, m, and h
|
||||||
|
type Point struct {
|
||||||
|
Measurement string
|
||||||
|
Tags map[string]string
|
||||||
|
Time time.Time
|
||||||
|
Fields map[string]interface{}
|
||||||
|
Precision string
|
||||||
|
Raw string
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalJSON will format the time in RFC3339Nano
|
||||||
|
// Precision is also ignored as it is only used for writing, not reading
|
||||||
|
// Or another way to say it is we always send back in nanosecond precision
|
||||||
|
func (p *Point) MarshalJSON() ([]byte, error) {
|
||||||
|
point := struct {
|
||||||
|
Measurement string `json:"measurement,omitempty"`
|
||||||
|
Tags map[string]string `json:"tags,omitempty"`
|
||||||
|
Time string `json:"time,omitempty"`
|
||||||
|
Fields map[string]interface{} `json:"fields,omitempty"`
|
||||||
|
Precision string `json:"precision,omitempty"`
|
||||||
|
}{
|
||||||
|
Measurement: p.Measurement,
|
||||||
|
Tags: p.Tags,
|
||||||
|
Fields: p.Fields,
|
||||||
|
Precision: p.Precision,
|
||||||
|
}
|
||||||
|
// Let it omit empty if it's really zero
|
||||||
|
if !p.Time.IsZero() {
|
||||||
|
point.Time = p.Time.UTC().Format(time.RFC3339Nano)
|
||||||
|
}
|
||||||
|
return json.Marshal(&point)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalString renders string representation of a Point with specified
|
||||||
|
// precision. The default precision is nanoseconds.
|
||||||
|
func (p *Point) MarshalString() string {
|
||||||
|
pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time)
|
||||||
|
if err != nil {
|
||||||
|
return "# ERROR: " + err.Error() + " " + p.Measurement
|
||||||
|
}
|
||||||
|
if p.Precision == "" || p.Precision == "ns" || p.Precision == "n" {
|
||||||
|
return pt.String()
|
||||||
|
}
|
||||||
|
return pt.PrecisionString(p.Precision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON decodes the data into the Point struct
|
||||||
|
func (p *Point) UnmarshalJSON(b []byte) error {
|
||||||
|
var normal struct {
|
||||||
|
Measurement string `json:"measurement"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Time time.Time `json:"time"`
|
||||||
|
Precision string `json:"precision"`
|
||||||
|
Fields map[string]interface{} `json:"fields"`
|
||||||
|
}
|
||||||
|
var epoch struct {
|
||||||
|
Measurement string `json:"measurement"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Time *int64 `json:"time"`
|
||||||
|
Precision string `json:"precision"`
|
||||||
|
Fields map[string]interface{} `json:"fields"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := func() error {
|
||||||
|
var err error
|
||||||
|
dec := json.NewDecoder(bytes.NewBuffer(b))
|
||||||
|
dec.UseNumber()
|
||||||
|
if err = dec.Decode(&epoch); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Convert from epoch to time.Time, but only if Time
|
||||||
|
// was actually set.
|
||||||
|
var ts time.Time
|
||||||
|
if epoch.Time != nil {
|
||||||
|
ts, err = EpochToTime(*epoch.Time, epoch.Precision)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.Measurement = epoch.Measurement
|
||||||
|
p.Tags = epoch.Tags
|
||||||
|
p.Time = ts
|
||||||
|
p.Precision = epoch.Precision
|
||||||
|
p.Fields = normalizeFields(epoch.Fields)
|
||||||
|
return nil
|
||||||
|
}(); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := json.NewDecoder(bytes.NewBuffer(b))
|
||||||
|
dec.UseNumber()
|
||||||
|
if err := dec.Decode(&normal); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
normal.Time = SetPrecision(normal.Time, normal.Precision)
|
||||||
|
p.Measurement = normal.Measurement
|
||||||
|
p.Tags = normal.Tags
|
||||||
|
p.Time = normal.Time
|
||||||
|
p.Precision = normal.Precision
|
||||||
|
p.Fields = normalizeFields(normal.Fields)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove any notion of json.Number
|
||||||
|
func normalizeFields(fields map[string]interface{}) map[string]interface{} {
|
||||||
|
newFields := map[string]interface{}{}
|
||||||
|
|
||||||
|
for k, v := range fields {
|
||||||
|
switch v := v.(type) {
|
||||||
|
case json.Number:
|
||||||
|
jv, e := v.Float64()
|
||||||
|
if e != nil {
|
||||||
|
panic(fmt.Sprintf("unable to convert json.Number to float64: %s", e))
|
||||||
|
}
|
||||||
|
newFields[k] = jv
|
||||||
|
default:
|
||||||
|
newFields[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newFields
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchPoints is used to send batched data in a single write.
|
||||||
|
// Database and Points are required
|
||||||
|
// If no retention policy is specified, it will use the databases default retention policy.
|
||||||
|
// If tags are specified, they will be "merged" with all points. If a point already has that tag, it will be ignored.
|
||||||
|
// If time is specified, it will be applied to any point with an empty time.
|
||||||
|
// Precision can be specified if the time is in epoch format (integer).
|
||||||
|
// Valid values for Precision are n, u, ms, s, m, and h
|
||||||
|
type BatchPoints struct {
|
||||||
|
Points []Point `json:"points,omitempty"`
|
||||||
|
Database string `json:"database,omitempty"`
|
||||||
|
RetentionPolicy string `json:"retentionPolicy,omitempty"`
|
||||||
|
Tags map[string]string `json:"tags,omitempty"`
|
||||||
|
Time time.Time `json:"time,omitempty"`
|
||||||
|
Precision string `json:"precision,omitempty"`
|
||||||
|
WriteConsistency string `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON decodes the data into the BatchPoints struct
|
||||||
|
func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
|
||||||
|
var normal struct {
|
||||||
|
Points []Point `json:"points"`
|
||||||
|
Database string `json:"database"`
|
||||||
|
RetentionPolicy string `json:"retentionPolicy"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Time time.Time `json:"time"`
|
||||||
|
Precision string `json:"precision"`
|
||||||
|
}
|
||||||
|
var epoch struct {
|
||||||
|
Points []Point `json:"points"`
|
||||||
|
Database string `json:"database"`
|
||||||
|
RetentionPolicy string `json:"retentionPolicy"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Time *int64 `json:"time"`
|
||||||
|
Precision string `json:"precision"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := func() error {
|
||||||
|
var err error
|
||||||
|
if err = json.Unmarshal(b, &epoch); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Convert from epoch to time.Time
|
||||||
|
var ts time.Time
|
||||||
|
if epoch.Time != nil {
|
||||||
|
ts, err = EpochToTime(*epoch.Time, epoch.Precision)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bp.Points = epoch.Points
|
||||||
|
bp.Database = epoch.Database
|
||||||
|
bp.RetentionPolicy = epoch.RetentionPolicy
|
||||||
|
bp.Tags = epoch.Tags
|
||||||
|
bp.Time = ts
|
||||||
|
bp.Precision = epoch.Precision
|
||||||
|
return nil
|
||||||
|
}(); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(b, &normal); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
normal.Time = SetPrecision(normal.Time, normal.Precision)
|
||||||
|
bp.Points = normal.Points
|
||||||
|
bp.Database = normal.Database
|
||||||
|
bp.RetentionPolicy = normal.RetentionPolicy
|
||||||
|
bp.Tags = normal.Tags
|
||||||
|
bp.Time = normal.Time
|
||||||
|
bp.Precision = normal.Precision
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// utility functions
|
||||||
|
|
||||||
|
// Addr provides the current url as a string of the server the client is connected to.
|
||||||
|
func (c *Client) Addr() string {
|
||||||
|
if c.unixSocket != "" {
|
||||||
|
return c.unixSocket
|
||||||
|
}
|
||||||
|
return c.url.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkPointTypes ensures no unsupported types are submitted to influxdb, returning error if they are found.
|
||||||
|
func checkPointTypes(p Point) error {
|
||||||
|
for _, v := range p.Fields {
|
||||||
|
switch v.(type) {
|
||||||
|
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool, string, nil:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported point type: %T", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper functions
|
||||||
|
|
||||||
|
// EpochToTime takes a unix epoch time and uses precision to return back a time.Time
|
||||||
|
func EpochToTime(epoch int64, precision string) (time.Time, error) {
|
||||||
|
if precision == "" {
|
||||||
|
precision = "s"
|
||||||
|
}
|
||||||
|
var t time.Time
|
||||||
|
switch precision {
|
||||||
|
case "h":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Hour))
|
||||||
|
case "m":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Minute))
|
||||||
|
case "s":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Second))
|
||||||
|
case "ms":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Millisecond))
|
||||||
|
case "u":
|
||||||
|
t = time.Unix(0, epoch*int64(time.Microsecond))
|
||||||
|
case "n":
|
||||||
|
t = time.Unix(0, epoch)
|
||||||
|
default:
|
||||||
|
return time.Time{}, fmt.Errorf("Unknown precision %q", precision)
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPrecision will round a time to the specified precision
|
||||||
|
func SetPrecision(t time.Time, precision string) time.Time {
|
||||||
|
switch precision {
|
||||||
|
case "n":
|
||||||
|
case "u":
|
||||||
|
return t.Round(time.Microsecond)
|
||||||
|
case "ms":
|
||||||
|
return t.Round(time.Millisecond)
|
||||||
|
case "s":
|
||||||
|
return t.Round(time.Second)
|
||||||
|
case "m":
|
||||||
|
return t.Round(time.Minute)
|
||||||
|
case "h":
|
||||||
|
return t.Round(time.Hour)
|
||||||
|
}
|
||||||
|
return t
|
||||||
|
}
|
32
vendor/github.com/influxdata/influxdb1-client/models/inline_fnv.go
generated
vendored
Normal file
32
vendor/github.com/influxdata/influxdb1-client/models/inline_fnv.go
generated
vendored
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
package models // import "github.com/influxdata/influxdb1-client/models"
|
||||||
|
|
||||||
|
// from stdlib hash/fnv/fnv.go
|
||||||
|
const (
|
||||||
|
prime64 = 1099511628211
|
||||||
|
offset64 = 14695981039346656037
|
||||||
|
)
|
||||||
|
|
||||||
|
// InlineFNV64a is an alloc-free port of the standard library's fnv64a.
|
||||||
|
// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function.
|
||||||
|
type InlineFNV64a uint64
|
||||||
|
|
||||||
|
// NewInlineFNV64a returns a new instance of InlineFNV64a.
|
||||||
|
func NewInlineFNV64a() InlineFNV64a {
|
||||||
|
return offset64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write adds data to the running hash.
|
||||||
|
func (s *InlineFNV64a) Write(data []byte) (int, error) {
|
||||||
|
hash := uint64(*s)
|
||||||
|
for _, c := range data {
|
||||||
|
hash ^= uint64(c)
|
||||||
|
hash *= prime64
|
||||||
|
}
|
||||||
|
*s = InlineFNV64a(hash)
|
||||||
|
return len(data), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sum64 returns the uint64 of the current resulting hash.
|
||||||
|
func (s *InlineFNV64a) Sum64() uint64 {
|
||||||
|
return uint64(*s)
|
||||||
|
}
|
44
vendor/github.com/influxdata/influxdb1-client/models/inline_strconv_parse.go
generated
vendored
Normal file
44
vendor/github.com/influxdata/influxdb1-client/models/inline_strconv_parse.go
generated
vendored
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
package models // import "github.com/influxdata/influxdb1-client/models"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
|
||||||
|
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
|
||||||
|
s := unsafeBytesToString(b)
|
||||||
|
return strconv.ParseInt(s, base, bitSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseUintBytes is a zero-alloc wrapper around strconv.ParseUint.
|
||||||
|
func parseUintBytes(b []byte, base int, bitSize int) (i uint64, err error) {
|
||||||
|
s := unsafeBytesToString(b)
|
||||||
|
return strconv.ParseUint(s, base, bitSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
|
||||||
|
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
|
||||||
|
s := unsafeBytesToString(b)
|
||||||
|
return strconv.ParseFloat(s, bitSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
|
||||||
|
func parseBoolBytes(b []byte) (bool, error) {
|
||||||
|
return strconv.ParseBool(unsafeBytesToString(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
// unsafeBytesToString converts a []byte to a string without a heap allocation.
|
||||||
|
//
|
||||||
|
// It is unsafe, and is intended to prepare input to short-lived functions
|
||||||
|
// that require strings.
|
||||||
|
func unsafeBytesToString(in []byte) string {
|
||||||
|
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
|
||||||
|
dst := reflect.StringHeader{
|
||||||
|
Data: src.Data,
|
||||||
|
Len: src.Len,
|
||||||
|
}
|
||||||
|
s := *(*string)(unsafe.Pointer(&dst))
|
||||||
|
return s
|
||||||
|
}
|
2413
vendor/github.com/influxdata/influxdb1-client/models/points.go
generated
vendored
Normal file
2413
vendor/github.com/influxdata/influxdb1-client/models/points.go
generated
vendored
Normal file
File diff suppressed because it is too large
Load Diff
62
vendor/github.com/influxdata/influxdb1-client/models/rows.go
generated
vendored
Normal file
62
vendor/github.com/influxdata/influxdb1-client/models/rows.go
generated
vendored
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Row represents a single row returned from the execution of a statement.
|
||||||
|
type Row struct {
|
||||||
|
Name string `json:"name,omitempty"`
|
||||||
|
Tags map[string]string `json:"tags,omitempty"`
|
||||||
|
Columns []string `json:"columns,omitempty"`
|
||||||
|
Values [][]interface{} `json:"values,omitempty"`
|
||||||
|
Partial bool `json:"partial,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SameSeries returns true if r contains values for the same series as o.
|
||||||
|
func (r *Row) SameSeries(o *Row) bool {
|
||||||
|
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// tagsHash returns a hash of tag key/value pairs.
|
||||||
|
func (r *Row) tagsHash() uint64 {
|
||||||
|
h := NewInlineFNV64a()
|
||||||
|
keys := r.tagsKeys()
|
||||||
|
for _, k := range keys {
|
||||||
|
h.Write([]byte(k))
|
||||||
|
h.Write([]byte(r.Tags[k]))
|
||||||
|
}
|
||||||
|
return h.Sum64()
|
||||||
|
}
|
||||||
|
|
||||||
|
// tagKeys returns a sorted list of tag keys.
|
||||||
|
func (r *Row) tagsKeys() []string {
|
||||||
|
a := make([]string, 0, len(r.Tags))
|
||||||
|
for k := range r.Tags {
|
||||||
|
a = append(a, k)
|
||||||
|
}
|
||||||
|
sort.Strings(a)
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rows represents a collection of rows. Rows implements sort.Interface.
|
||||||
|
type Rows []*Row
|
||||||
|
|
||||||
|
// Len implements sort.Interface.
|
||||||
|
func (p Rows) Len() int { return len(p) }
|
||||||
|
|
||||||
|
// Less implements sort.Interface.
|
||||||
|
func (p Rows) Less(i, j int) bool {
|
||||||
|
// Sort by name first.
|
||||||
|
if p[i].Name != p[j].Name {
|
||||||
|
return p[i].Name < p[j].Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort by tag set hash. Tags don't have a meaningful sort order so we
|
||||||
|
// just compute a hash and sort by that instead. This allows the tests
|
||||||
|
// to receive rows in a predictable order every time.
|
||||||
|
return p[i].tagsHash() < p[j].tagsHash()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap implements sort.Interface.
|
||||||
|
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
42
vendor/github.com/influxdata/influxdb1-client/models/statistic.go
generated
vendored
Normal file
42
vendor/github.com/influxdata/influxdb1-client/models/statistic.go
generated
vendored
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
// Statistic is the representation of a statistic used by the monitoring service.
|
||||||
|
type Statistic struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags map[string]string `json:"tags"`
|
||||||
|
Values map[string]interface{} `json:"values"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStatistic returns an initialized Statistic.
|
||||||
|
func NewStatistic(name string) Statistic {
|
||||||
|
return Statistic{
|
||||||
|
Name: name,
|
||||||
|
Tags: make(map[string]string),
|
||||||
|
Values: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatisticTags is a map that can be merged with others without causing
|
||||||
|
// mutations to either map.
|
||||||
|
type StatisticTags map[string]string
|
||||||
|
|
||||||
|
// Merge creates a new map containing the merged contents of tags and t.
|
||||||
|
// If both tags and the receiver map contain the same key, the value in tags
|
||||||
|
// is used in the resulting map.
|
||||||
|
//
|
||||||
|
// Merge always returns a usable map.
|
||||||
|
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
|
||||||
|
// Add everything in tags to the result.
|
||||||
|
out := make(map[string]string, len(tags))
|
||||||
|
for k, v := range tags {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only add values from t that don't appear in tags.
|
||||||
|
for k, v := range t {
|
||||||
|
if _, ok := tags[k]; !ok {
|
||||||
|
out[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
74
vendor/github.com/influxdata/influxdb1-client/models/time.go
generated
vendored
Normal file
74
vendor/github.com/influxdata/influxdb1-client/models/time.go
generated
vendored
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
// Helper time methods since parsing time can easily overflow and we only support a
|
||||||
|
// specific time range.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MinNanoTime is the minumum time that can be represented.
|
||||||
|
//
|
||||||
|
// 1677-09-21 00:12:43.145224194 +0000 UTC
|
||||||
|
//
|
||||||
|
// The two lowest minimum integers are used as sentinel values. The
|
||||||
|
// minimum value needs to be used as a value lower than any other value for
|
||||||
|
// comparisons and another separate value is needed to act as a sentinel
|
||||||
|
// default value that is unusable by the user, but usable internally.
|
||||||
|
// Because these two values need to be used for a special purpose, we do
|
||||||
|
// not allow users to write points at these two times.
|
||||||
|
MinNanoTime = int64(math.MinInt64) + 2
|
||||||
|
|
||||||
|
// MaxNanoTime is the maximum time that can be represented.
|
||||||
|
//
|
||||||
|
// 2262-04-11 23:47:16.854775806 +0000 UTC
|
||||||
|
//
|
||||||
|
// The highest time represented by a nanosecond needs to be used for an
|
||||||
|
// exclusive range in the shard group, so the maximum time needs to be one
|
||||||
|
// less than the possible maximum number of nanoseconds representable by an
|
||||||
|
// int64 so that we don't lose a point at that one time.
|
||||||
|
MaxNanoTime = int64(math.MaxInt64) - 1
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
minNanoTime = time.Unix(0, MinNanoTime).UTC()
|
||||||
|
maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
|
||||||
|
|
||||||
|
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
|
||||||
|
ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime)
|
||||||
|
)
|
||||||
|
|
||||||
|
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
|
||||||
|
// supported range.
|
||||||
|
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
|
||||||
|
mult := GetPrecisionMultiplier(precision)
|
||||||
|
if t, ok := safeSignedMult(timestamp, mult); ok {
|
||||||
|
tme := time.Unix(0, t).UTC()
|
||||||
|
return tme, CheckTime(tme)
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Time{}, ErrTimeOutOfRange
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckTime checks that a time is within the safe range.
|
||||||
|
func CheckTime(t time.Time) error {
|
||||||
|
if t.Before(minNanoTime) || t.After(maxNanoTime) {
|
||||||
|
return ErrTimeOutOfRange
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform the multiplication and check to make sure it didn't overflow.
|
||||||
|
func safeSignedMult(a, b int64) (int64, bool) {
|
||||||
|
if a == 0 || b == 0 || a == 1 || b == 1 {
|
||||||
|
return a * b, true
|
||||||
|
}
|
||||||
|
if a == MinNanoTime || b == MaxNanoTime {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
c := a * b
|
||||||
|
return c, c/b == a
|
||||||
|
}
|
7
vendor/github.com/influxdata/influxdb1-client/models/uint_support.go
generated
vendored
Normal file
7
vendor/github.com/influxdata/influxdb1-client/models/uint_support.go
generated
vendored
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
// +build uint uint64
|
||||||
|
|
||||||
|
package models
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
EnableUintSupport()
|
||||||
|
}
|
115
vendor/github.com/influxdata/influxdb1-client/pkg/escape/bytes.go
generated
vendored
Normal file
115
vendor/github.com/influxdata/influxdb1-client/pkg/escape/bytes.go
generated
vendored
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
// Package escape contains utilities for escaping parts of InfluxQL
|
||||||
|
// and InfluxDB line protocol.
|
||||||
|
package escape // import "github.com/influxdata/influxdb1-client/pkg/escape"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Codes is a map of bytes to be escaped.
|
||||||
|
var Codes = map[byte][]byte{
|
||||||
|
',': []byte(`\,`),
|
||||||
|
'"': []byte(`\"`),
|
||||||
|
' ': []byte(`\ `),
|
||||||
|
'=': []byte(`\=`),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bytes escapes characters on the input slice, as defined by Codes.
|
||||||
|
func Bytes(in []byte) []byte {
|
||||||
|
for b, esc := range Codes {
|
||||||
|
in = bytes.Replace(in, []byte{b}, esc, -1)
|
||||||
|
}
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
|
const escapeChars = `," =`
|
||||||
|
|
||||||
|
// IsEscaped returns whether b has any escaped characters,
|
||||||
|
// i.e. whether b seems to have been processed by Bytes.
|
||||||
|
func IsEscaped(b []byte) bool {
|
||||||
|
for len(b) > 0 {
|
||||||
|
i := bytes.IndexByte(b, '\\')
|
||||||
|
if i < 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
b = b[i+1:]
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendUnescaped appends the unescaped version of src to dst
|
||||||
|
// and returns the resulting slice.
|
||||||
|
func AppendUnescaped(dst, src []byte) []byte {
|
||||||
|
var pos int
|
||||||
|
for len(src) > 0 {
|
||||||
|
next := bytes.IndexByte(src[pos:], '\\')
|
||||||
|
if next < 0 || pos+next+1 >= len(src) {
|
||||||
|
return append(dst, src...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
|
||||||
|
if pos+next > 0 {
|
||||||
|
dst = append(dst, src[:pos+next]...)
|
||||||
|
}
|
||||||
|
src = src[pos+next+1:]
|
||||||
|
pos = 0
|
||||||
|
} else {
|
||||||
|
pos += next + 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unescape returns a new slice containing the unescaped version of in.
|
||||||
|
func Unescape(in []byte) []byte {
|
||||||
|
if len(in) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytes.IndexByte(in, '\\') == -1 {
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
|
||||||
|
i := 0
|
||||||
|
inLen := len(in)
|
||||||
|
|
||||||
|
// The output size will be no more than inLen. Preallocating the
|
||||||
|
// capacity of the output is faster and uses less memory than
|
||||||
|
// letting append() do its own (over)allocation.
|
||||||
|
out := make([]byte, 0, inLen)
|
||||||
|
|
||||||
|
for {
|
||||||
|
if i >= inLen {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if in[i] == '\\' && i+1 < inLen {
|
||||||
|
switch in[i+1] {
|
||||||
|
case ',':
|
||||||
|
out = append(out, ',')
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
case '"':
|
||||||
|
out = append(out, '"')
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
case ' ':
|
||||||
|
out = append(out, ' ')
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
case '=':
|
||||||
|
out = append(out, '=')
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out = append(out, in[i])
|
||||||
|
i += 1
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
21
vendor/github.com/influxdata/influxdb1-client/pkg/escape/strings.go
generated
vendored
Normal file
21
vendor/github.com/influxdata/influxdb1-client/pkg/escape/strings.go
generated
vendored
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
package escape
|
||||||
|
|
||||||
|
import "strings"
|
||||||
|
|
||||||
|
var (
|
||||||
|
escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`)
|
||||||
|
unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`)
|
||||||
|
)
|
||||||
|
|
||||||
|
// UnescapeString returns unescaped version of in.
|
||||||
|
func UnescapeString(in string) string {
|
||||||
|
if strings.IndexByte(in, '\\') == -1 {
|
||||||
|
return in
|
||||||
|
}
|
||||||
|
return unescaper.Replace(in)
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns the escaped version of in.
|
||||||
|
func String(in string) string {
|
||||||
|
return escaper.Replace(in)
|
||||||
|
}
|
722
vendor/github.com/influxdata/influxdb1-client/v2/client.go
generated
vendored
Normal file
722
vendor/github.com/influxdata/influxdb1-client/v2/client.go
generated
vendored
Normal file
@ -0,0 +1,722 @@
|
|||||||
|
// Package client (v2) is the current official Go client for InfluxDB.
|
||||||
|
package client // import "github.com/influxdata/influxdb1-client/v2"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"mime"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb1-client/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HTTPConfig is the config data needed to create an HTTP Client.
|
||||||
|
type HTTPConfig struct {
|
||||||
|
// Addr should be of the form "http://host:port"
|
||||||
|
// or "http://[ipv6-host%zone]:port".
|
||||||
|
Addr string
|
||||||
|
|
||||||
|
// Username is the influxdb username, optional.
|
||||||
|
Username string
|
||||||
|
|
||||||
|
// Password is the influxdb password, optional.
|
||||||
|
Password string
|
||||||
|
|
||||||
|
// UserAgent is the http User Agent, defaults to "InfluxDBClient".
|
||||||
|
UserAgent string
|
||||||
|
|
||||||
|
// Timeout for influxdb writes, defaults to no timeout.
|
||||||
|
Timeout time.Duration
|
||||||
|
|
||||||
|
// InsecureSkipVerify gets passed to the http client, if true, it will
|
||||||
|
// skip https certificate verification. Defaults to false.
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
// TLSConfig allows the user to set their own TLS config for the HTTP
|
||||||
|
// Client. If set, this option overrides InsecureSkipVerify.
|
||||||
|
TLSConfig *tls.Config
|
||||||
|
|
||||||
|
// Proxy configures the Proxy function on the HTTP client.
|
||||||
|
Proxy func(req *http.Request) (*url.URL, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
|
||||||
|
type BatchPointsConfig struct {
|
||||||
|
// Precision is the write precision of the points, defaults to "ns".
|
||||||
|
Precision string
|
||||||
|
|
||||||
|
// Database is the database to write points to.
|
||||||
|
Database string
|
||||||
|
|
||||||
|
// RetentionPolicy is the retention policy of the points.
|
||||||
|
RetentionPolicy string
|
||||||
|
|
||||||
|
// Write consistency is the number of servers required to confirm write.
|
||||||
|
WriteConsistency string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Client is a client interface for writing & querying the database.
|
||||||
|
type Client interface {
|
||||||
|
// Ping checks that status of cluster, and will always return 0 time and no
|
||||||
|
// error for UDP clients.
|
||||||
|
Ping(timeout time.Duration) (time.Duration, string, error)
|
||||||
|
|
||||||
|
// Write takes a BatchPoints object and writes all Points to InfluxDB.
|
||||||
|
Write(bp BatchPoints) error
|
||||||
|
|
||||||
|
// Query makes an InfluxDB Query on the database. This will fail if using
|
||||||
|
// the UDP client.
|
||||||
|
Query(q Query) (*Response, error)
|
||||||
|
|
||||||
|
// QueryAsChunk makes an InfluxDB Query on the database. This will fail if using
|
||||||
|
// the UDP client.
|
||||||
|
QueryAsChunk(q Query) (*ChunkedResponse, error)
|
||||||
|
|
||||||
|
// Close releases any resources a Client may be using.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHTTPClient returns a new Client from the provided config.
|
||||||
|
// Client is safe for concurrent use by multiple goroutines.
|
||||||
|
func NewHTTPClient(conf HTTPConfig) (Client, error) {
|
||||||
|
if conf.UserAgent == "" {
|
||||||
|
conf.UserAgent = "InfluxDBClient"
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(conf.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
} else if u.Scheme != "http" && u.Scheme != "https" {
|
||||||
|
m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
|
||||||
|
" must start with http:// or https://", u.Scheme)
|
||||||
|
return nil, errors.New(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
tr := &http.Transport{
|
||||||
|
TLSClientConfig: &tls.Config{
|
||||||
|
InsecureSkipVerify: conf.InsecureSkipVerify,
|
||||||
|
},
|
||||||
|
Proxy: conf.Proxy,
|
||||||
|
}
|
||||||
|
if conf.TLSConfig != nil {
|
||||||
|
tr.TLSClientConfig = conf.TLSConfig
|
||||||
|
}
|
||||||
|
return &client{
|
||||||
|
url: *u,
|
||||||
|
username: conf.Username,
|
||||||
|
password: conf.Password,
|
||||||
|
useragent: conf.UserAgent,
|
||||||
|
httpClient: &http.Client{
|
||||||
|
Timeout: conf.Timeout,
|
||||||
|
Transport: tr,
|
||||||
|
},
|
||||||
|
transport: tr,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
|
||||||
|
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
|
||||||
|
func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "ping")
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("User-Agent", c.useragent)
|
||||||
|
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
if timeout > 0 {
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent {
|
||||||
|
var err = errors.New(string(body))
|
||||||
|
return 0, "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
version := resp.Header.Get("X-Influxdb-Version")
|
||||||
|
return time.Since(now), version, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close releases the client's resources.
|
||||||
|
func (c *client) Close() error {
|
||||||
|
c.transport.CloseIdleConnections()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// client is safe for concurrent use as the fields are all read-only
|
||||||
|
// once the client is instantiated.
|
||||||
|
type client struct {
|
||||||
|
// N.B - if url.UserInfo is accessed in future modifications to the
|
||||||
|
// methods on client, you will need to synchronize access to url.
|
||||||
|
url url.URL
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
useragent string
|
||||||
|
httpClient *http.Client
|
||||||
|
transport *http.Transport
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchPoints is an interface into a batched grouping of points to write into
|
||||||
|
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
|
||||||
|
// batch for each goroutine.
|
||||||
|
type BatchPoints interface {
|
||||||
|
// AddPoint adds the given point to the Batch of points.
|
||||||
|
AddPoint(p *Point)
|
||||||
|
// AddPoints adds the given points to the Batch of points.
|
||||||
|
AddPoints(ps []*Point)
|
||||||
|
// Points lists the points in the Batch.
|
||||||
|
Points() []*Point
|
||||||
|
|
||||||
|
// Precision returns the currently set precision of this Batch.
|
||||||
|
Precision() string
|
||||||
|
// SetPrecision sets the precision of this batch.
|
||||||
|
SetPrecision(s string) error
|
||||||
|
|
||||||
|
// Database returns the currently set database of this Batch.
|
||||||
|
Database() string
|
||||||
|
// SetDatabase sets the database of this Batch.
|
||||||
|
SetDatabase(s string)
|
||||||
|
|
||||||
|
// WriteConsistency returns the currently set write consistency of this Batch.
|
||||||
|
WriteConsistency() string
|
||||||
|
// SetWriteConsistency sets the write consistency of this Batch.
|
||||||
|
SetWriteConsistency(s string)
|
||||||
|
|
||||||
|
// RetentionPolicy returns the currently set retention policy of this Batch.
|
||||||
|
RetentionPolicy() string
|
||||||
|
// SetRetentionPolicy sets the retention policy of this Batch.
|
||||||
|
SetRetentionPolicy(s string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBatchPoints returns a BatchPoints interface based on the given config.
|
||||||
|
func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
|
||||||
|
if conf.Precision == "" {
|
||||||
|
conf.Precision = "ns"
|
||||||
|
}
|
||||||
|
if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
bp := &batchpoints{
|
||||||
|
database: conf.Database,
|
||||||
|
precision: conf.Precision,
|
||||||
|
retentionPolicy: conf.RetentionPolicy,
|
||||||
|
writeConsistency: conf.WriteConsistency,
|
||||||
|
}
|
||||||
|
return bp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type batchpoints struct {
|
||||||
|
points []*Point
|
||||||
|
database string
|
||||||
|
precision string
|
||||||
|
retentionPolicy string
|
||||||
|
writeConsistency string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) AddPoint(p *Point) {
|
||||||
|
bp.points = append(bp.points, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) AddPoints(ps []*Point) {
|
||||||
|
bp.points = append(bp.points, ps...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) Points() []*Point {
|
||||||
|
return bp.points
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) Precision() string {
|
||||||
|
return bp.precision
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) Database() string {
|
||||||
|
return bp.database
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) WriteConsistency() string {
|
||||||
|
return bp.writeConsistency
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) RetentionPolicy() string {
|
||||||
|
return bp.retentionPolicy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetPrecision(p string) error {
|
||||||
|
if _, err := time.ParseDuration("1" + p); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bp.precision = p
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetDatabase(db string) {
|
||||||
|
bp.database = db
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetWriteConsistency(wc string) {
|
||||||
|
bp.writeConsistency = wc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *batchpoints) SetRetentionPolicy(rp string) {
|
||||||
|
bp.retentionPolicy = rp
|
||||||
|
}
|
||||||
|
|
||||||
|
// Point represents a single data point.
|
||||||
|
type Point struct {
|
||||||
|
pt models.Point
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPoint returns a point with the given timestamp. If a timestamp is not
|
||||||
|
// given, then data is sent to the database without a timestamp, in which case
|
||||||
|
// the server will assign local time upon reception. NOTE: it is recommended to
|
||||||
|
// send data with a timestamp.
|
||||||
|
func NewPoint(
|
||||||
|
name string,
|
||||||
|
tags map[string]string,
|
||||||
|
fields map[string]interface{},
|
||||||
|
t ...time.Time,
|
||||||
|
) (*Point, error) {
|
||||||
|
var T time.Time
|
||||||
|
if len(t) > 0 {
|
||||||
|
T = t[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Point{
|
||||||
|
pt: pt,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns a line-protocol string of the Point.
|
||||||
|
func (p *Point) String() string {
|
||||||
|
return p.pt.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrecisionString returns a line-protocol string of the Point,
|
||||||
|
// with the timestamp formatted for the given precision.
|
||||||
|
func (p *Point) PrecisionString(precision string) string {
|
||||||
|
return p.pt.PrecisionString(precision)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name returns the measurement name of the point.
|
||||||
|
func (p *Point) Name() string {
|
||||||
|
return string(p.pt.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tags returns the tags associated with the point.
|
||||||
|
func (p *Point) Tags() map[string]string {
|
||||||
|
return p.pt.Tags().Map()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Time return the timestamp for the point.
|
||||||
|
func (p *Point) Time() time.Time {
|
||||||
|
return p.pt.Time()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
|
||||||
|
func (p *Point) UnixNano() int64 {
|
||||||
|
return p.pt.UnixNano()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fields returns the fields for the point.
|
||||||
|
func (p *Point) Fields() (map[string]interface{}, error) {
|
||||||
|
return p.pt.Fields()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPointFrom returns a point from the provided models.Point.
|
||||||
|
func NewPointFrom(pt models.Point) *Point {
|
||||||
|
return &Point{pt: pt}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) Write(bp BatchPoints) error {
|
||||||
|
var b bytes.Buffer
|
||||||
|
|
||||||
|
for _, p := range bp.Points() {
|
||||||
|
if p == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.WriteByte('\n'); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "write")
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), &b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.useragent)
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("db", bp.Database())
|
||||||
|
params.Set("rp", bp.RetentionPolicy())
|
||||||
|
params.Set("precision", bp.Precision())
|
||||||
|
params.Set("consistency", bp.WriteConsistency())
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
|
||||||
|
var err = errors.New(string(body))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query defines a query to send to the server.
|
||||||
|
type Query struct {
|
||||||
|
Command string
|
||||||
|
Database string
|
||||||
|
RetentionPolicy string
|
||||||
|
Precision string
|
||||||
|
Chunked bool
|
||||||
|
ChunkSize int
|
||||||
|
Parameters map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQuery returns a query object.
|
||||||
|
// The database and precision arguments can be empty strings if they are not needed for the query.
|
||||||
|
func NewQuery(command, database, precision string) Query {
|
||||||
|
return Query{
|
||||||
|
Command: command,
|
||||||
|
Database: database,
|
||||||
|
Precision: precision,
|
||||||
|
Parameters: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQueryWithRP returns a query object.
|
||||||
|
// The database, retention policy, and precision arguments can be empty strings if they are not needed
|
||||||
|
// for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater.
|
||||||
|
func NewQueryWithRP(command, database, retentionPolicy, precision string) Query {
|
||||||
|
return Query{
|
||||||
|
Command: command,
|
||||||
|
Database: database,
|
||||||
|
RetentionPolicy: retentionPolicy,
|
||||||
|
Precision: precision,
|
||||||
|
Parameters: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewQueryWithParameters returns a query object.
|
||||||
|
// The database and precision arguments can be empty strings if they are not needed for the query.
|
||||||
|
// parameters is a map of the parameter names used in the command to their values.
|
||||||
|
func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
|
||||||
|
return Query{
|
||||||
|
Command: command,
|
||||||
|
Database: database,
|
||||||
|
Precision: precision,
|
||||||
|
Parameters: parameters,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Response represents a list of statement results.
|
||||||
|
type Response struct {
|
||||||
|
Results []Result
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns the first error from any statement.
|
||||||
|
// It returns nil if no errors occurred on any statements.
|
||||||
|
func (r *Response) Error() error {
|
||||||
|
if r.Err != "" {
|
||||||
|
return errors.New(r.Err)
|
||||||
|
}
|
||||||
|
for _, result := range r.Results {
|
||||||
|
if result.Err != "" {
|
||||||
|
return errors.New(result.Err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Message represents a user message.
|
||||||
|
type Message struct {
|
||||||
|
Level string
|
||||||
|
Text string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Result represents a resultset returned from a single statement.
|
||||||
|
type Result struct {
|
||||||
|
Series []models.Row
|
||||||
|
Messages []*Message
|
||||||
|
Err string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query sends a command to the server and returns the Response.
|
||||||
|
func (c *client) Query(q Query) (*Response, error) {
|
||||||
|
req, err := c.createDefaultRequest(q)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
params := req.URL.Query()
|
||||||
|
if q.Chunked {
|
||||||
|
params.Set("chunked", "true")
|
||||||
|
if q.ChunkSize > 0 {
|
||||||
|
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
|
||||||
|
}
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
}
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if err := checkResponse(resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var response Response
|
||||||
|
if q.Chunked {
|
||||||
|
cr := NewChunkedResponse(resp.Body)
|
||||||
|
for {
|
||||||
|
r, err := cr.NextResponse()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// If we got an error while decoding the response, send that back.
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if r == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
response.Results = append(response.Results, r.Results...)
|
||||||
|
if r.Err != "" {
|
||||||
|
response.Err = r.Err
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
dec.UseNumber()
|
||||||
|
decErr := dec.Decode(&response)
|
||||||
|
|
||||||
|
// ignore this error if we got an invalid status code
|
||||||
|
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
|
||||||
|
decErr = nil
|
||||||
|
}
|
||||||
|
// If we got a valid decode error, send that back
|
||||||
|
if decErr != nil {
|
||||||
|
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't have an error in our json response, and didn't get statusOK
|
||||||
|
// then send back an error
|
||||||
|
if resp.StatusCode != http.StatusOK && response.Error() == nil {
|
||||||
|
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryAsChunk sends a command to the server and returns the Response.
|
||||||
|
func (c *client) QueryAsChunk(q Query) (*ChunkedResponse, error) {
|
||||||
|
req, err := c.createDefaultRequest(q)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("chunked", "true")
|
||||||
|
if q.ChunkSize > 0 {
|
||||||
|
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
|
||||||
|
}
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := checkResponse(resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return NewChunkedResponse(resp.Body), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkResponse(resp *http.Response) error {
|
||||||
|
// If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
|
||||||
|
// but instead some other service. If the error code is also a 500+ code, then some
|
||||||
|
// downstream loadbalancer/proxy/etc had an issue and we should report that.
|
||||||
|
if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil || len(body) == 0 {
|
||||||
|
return fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we get an unexpected content type, then it is also not from influx direct and therefore
|
||||||
|
// we want to know what we received and what status code was returned for debugging purposes.
|
||||||
|
if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
|
||||||
|
// Read up to 1kb of the body to help identify downstream errors and limit the impact of things
|
||||||
|
// like downstream serving a large file
|
||||||
|
body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
|
||||||
|
if err != nil || len(body) == 0 {
|
||||||
|
return fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) createDefaultRequest(q Query) (*http.Request, error) {
|
||||||
|
u := c.url
|
||||||
|
u.Path = path.Join(u.Path, "query")
|
||||||
|
|
||||||
|
jsonParameters, err := json.Marshal(q.Parameters)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "")
|
||||||
|
req.Header.Set("User-Agent", c.useragent)
|
||||||
|
|
||||||
|
if c.username != "" {
|
||||||
|
req.SetBasicAuth(c.username, c.password)
|
||||||
|
}
|
||||||
|
|
||||||
|
params := req.URL.Query()
|
||||||
|
params.Set("q", q.Command)
|
||||||
|
params.Set("db", q.Database)
|
||||||
|
if q.RetentionPolicy != "" {
|
||||||
|
params.Set("rp", q.RetentionPolicy)
|
||||||
|
}
|
||||||
|
params.Set("params", string(jsonParameters))
|
||||||
|
|
||||||
|
if q.Precision != "" {
|
||||||
|
params.Set("epoch", q.Precision)
|
||||||
|
}
|
||||||
|
req.URL.RawQuery = params.Encode()
|
||||||
|
|
||||||
|
return req, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// duplexReader reads responses and writes it to another writer while
|
||||||
|
// satisfying the reader interface.
|
||||||
|
type duplexReader struct {
|
||||||
|
r io.ReadCloser
|
||||||
|
w io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *duplexReader) Read(p []byte) (n int, err error) {
|
||||||
|
n, err = r.r.Read(p)
|
||||||
|
if err == nil {
|
||||||
|
r.w.Write(p[:n])
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the response.
|
||||||
|
func (r *duplexReader) Close() error {
|
||||||
|
return r.r.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChunkedResponse represents a response from the server that
|
||||||
|
// uses chunking to stream the output.
|
||||||
|
type ChunkedResponse struct {
|
||||||
|
dec *json.Decoder
|
||||||
|
duplex *duplexReader
|
||||||
|
buf bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChunkedResponse reads a stream and produces responses from the stream.
|
||||||
|
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
|
||||||
|
rc, ok := r.(io.ReadCloser)
|
||||||
|
if !ok {
|
||||||
|
rc = ioutil.NopCloser(r)
|
||||||
|
}
|
||||||
|
resp := &ChunkedResponse{}
|
||||||
|
resp.duplex = &duplexReader{r: rc, w: &resp.buf}
|
||||||
|
resp.dec = json.NewDecoder(resp.duplex)
|
||||||
|
resp.dec.UseNumber()
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
|
// NextResponse reads the next line of the stream and returns a response.
|
||||||
|
func (r *ChunkedResponse) NextResponse() (*Response, error) {
|
||||||
|
var response Response
|
||||||
|
if err := r.dec.Decode(&response); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// A decoding error happened. This probably means the server crashed
|
||||||
|
// and sent a last-ditch error message to us. Ensure we have read the
|
||||||
|
// entirety of the connection to get any remaining error text.
|
||||||
|
io.Copy(ioutil.Discard, r.duplex)
|
||||||
|
return nil, errors.New(strings.TrimSpace(r.buf.String()))
|
||||||
|
}
|
||||||
|
|
||||||
|
r.buf.Reset()
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the response.
|
||||||
|
func (r *ChunkedResponse) Close() error {
|
||||||
|
return r.duplex.Close()
|
||||||
|
}
|
116
vendor/github.com/influxdata/influxdb1-client/v2/udp.go
generated
vendored
Normal file
116
vendor/github.com/influxdata/influxdb1-client/v2/udp.go
generated
vendored
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
package client
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// UDPPayloadSize is a reasonable default payload size for UDP packets that
|
||||||
|
// could be travelling over the internet.
|
||||||
|
UDPPayloadSize = 512
|
||||||
|
)
|
||||||
|
|
||||||
|
// UDPConfig is the config data needed to create a UDP Client.
|
||||||
|
type UDPConfig struct {
|
||||||
|
// Addr should be of the form "host:port"
|
||||||
|
// or "[ipv6-host%zone]:port".
|
||||||
|
Addr string
|
||||||
|
|
||||||
|
// PayloadSize is the maximum size of a UDP client message, optional
|
||||||
|
// Tune this based on your network. Defaults to UDPPayloadSize.
|
||||||
|
PayloadSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
|
||||||
|
// service from the given config.
|
||||||
|
func NewUDPClient(conf UDPConfig) (Client, error) {
|
||||||
|
var udpAddr *net.UDPAddr
|
||||||
|
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := net.DialUDP("udp", nil, udpAddr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadSize := conf.PayloadSize
|
||||||
|
if payloadSize == 0 {
|
||||||
|
payloadSize = UDPPayloadSize
|
||||||
|
}
|
||||||
|
|
||||||
|
return &udpclient{
|
||||||
|
conn: conn,
|
||||||
|
payloadSize: payloadSize,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close releases the udpclient's resources.
|
||||||
|
func (uc *udpclient) Close() error {
|
||||||
|
return uc.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type udpclient struct {
|
||||||
|
conn io.WriteCloser
|
||||||
|
payloadSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) Write(bp BatchPoints) error {
|
||||||
|
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
|
||||||
|
var d, _ = time.ParseDuration("1" + bp.Precision())
|
||||||
|
|
||||||
|
var delayedError error
|
||||||
|
|
||||||
|
var checkBuffer = func(n int) {
|
||||||
|
if len(b) > 0 && len(b)+n > uc.payloadSize {
|
||||||
|
if _, err := uc.conn.Write(b); err != nil {
|
||||||
|
delayedError = err
|
||||||
|
}
|
||||||
|
b = b[:0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range bp.Points() {
|
||||||
|
p.pt.Round(d)
|
||||||
|
pointSize := p.pt.StringSize() + 1 // include newline in size
|
||||||
|
//point := p.pt.RoundedString(d) + "\n"
|
||||||
|
|
||||||
|
checkBuffer(pointSize)
|
||||||
|
|
||||||
|
if p.Time().IsZero() || pointSize <= uc.payloadSize {
|
||||||
|
b = p.pt.AppendString(b)
|
||||||
|
b = append(b, '\n')
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
|
||||||
|
for _, sp := range points {
|
||||||
|
checkBuffer(sp.StringSize() + 1)
|
||||||
|
b = sp.AppendString(b)
|
||||||
|
b = append(b, '\n')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(b) > 0 {
|
||||||
|
if _, err := uc.conn.Write(b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return delayedError
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) Query(q Query) (*Response, error) {
|
||||||
|
return nil, fmt.Errorf("Querying via UDP is not supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) QueryAsChunk(q Query) (*ChunkedResponse, error) {
|
||||||
|
return nil, fmt.Errorf("Querying via UDP is not supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
|
||||||
|
return 0, "", nil
|
||||||
|
}
|
9
vendor/modules.txt
vendored
9
vendor/modules.txt
vendored
@ -1,8 +1,13 @@
|
|||||||
# github.com/antchfx/xmlquery v1.0.0
|
# github.com/antchfx/xmlquery v1.0.0
|
||||||
github.com/antchfx/xmlquery
|
github.com/antchfx/xmlquery
|
||||||
# github.com/antchfx/xpath v0.0.0-20190319080838-ce1d48779e67
|
# github.com/antchfx/xpath v1.0.0
|
||||||
github.com/antchfx/xpath
|
github.com/antchfx/xpath
|
||||||
# golang.org/x/net v0.0.0-20190603091049-60506f45cf65
|
# github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc
|
||||||
|
github.com/influxdata/influxdb1-client
|
||||||
|
github.com/influxdata/influxdb1-client/v2
|
||||||
|
github.com/influxdata/influxdb1-client/models
|
||||||
|
github.com/influxdata/influxdb1-client/pkg/escape
|
||||||
|
# golang.org/x/net v0.0.0-20190607181551-461777fb6f67
|
||||||
golang.org/x/net/html/charset
|
golang.org/x/net/html/charset
|
||||||
golang.org/x/net/html
|
golang.org/x/net/html
|
||||||
golang.org/x/net/html/atom
|
golang.org/x/net/html/atom
|
||||||
|
Loading…
Reference in New Issue
Block a user