137 lines
3.5 KiB
Go
137 lines
3.5 KiB
Go
|
|
package influx
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"fmt"
|
||
|
|
"os"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||
|
|
"github.com/influxdata/influxdb-client-go/v2/api"
|
||
|
|
)
|
||
|
|
|
||
|
|
var (
|
||
|
|
influx_dsn string
|
||
|
|
influx_token string
|
||
|
|
influx_org string
|
||
|
|
)
|
||
|
|
|
||
|
|
func init() {
|
||
|
|
influx_dsn = os.Getenv("INFLUX_DSN")
|
||
|
|
if influx_dsn == "" {
|
||
|
|
influx_dsn = "http://influx:8086"
|
||
|
|
}
|
||
|
|
influx_token = os.Getenv("INFLUX_TOKEN")
|
||
|
|
influx_org = os.Getenv("INFLUX_ORG")
|
||
|
|
}
|
||
|
|
|
||
|
|
// 实例
|
||
|
|
func newInstance() influxdb2.Client {
|
||
|
|
client := influxdb2.NewClient(influx_dsn, influx_token)
|
||
|
|
return client
|
||
|
|
}
|
||
|
|
|
||
|
|
// 写入例子
|
||
|
|
func write(bucket, measurement string, tags map[string]string, fields map[string]interface{}) error {
|
||
|
|
client := newInstance()
|
||
|
|
defer client.Close()
|
||
|
|
writeAPI := client.WriteAPIBlocking(influx_org, bucket)
|
||
|
|
// Create point using full params constructor
|
||
|
|
p := influxdb2.NewPoint(measurement,
|
||
|
|
tags,
|
||
|
|
fields,
|
||
|
|
time.Now())
|
||
|
|
// write point immediately
|
||
|
|
if err := writeAPI.WritePoint(context.Background(), p); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
// Or write directly line protocol
|
||
|
|
// line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
|
||
|
|
// err = writeAPI.WriteRecord(context.Background(), line)
|
||
|
|
// if err != nil {
|
||
|
|
// panic(err)
|
||
|
|
// }
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func writeLine(bucket, line string) error {
|
||
|
|
client := newInstance()
|
||
|
|
defer client.Close()
|
||
|
|
writeAPI := client.WriteAPIBlocking(influx_org, bucket)
|
||
|
|
// Create point using full params constructor
|
||
|
|
p := influxdb2.NewPoint(line,
|
||
|
|
nil,
|
||
|
|
nil,
|
||
|
|
time.Now())
|
||
|
|
// write point immediately
|
||
|
|
if err := writeAPI.WritePoint(context.Background(), p); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func Query(sql string) (*api.QueryTableResult, error) {
|
||
|
|
client := newInstance()
|
||
|
|
defer client.Close()
|
||
|
|
queryAPI := client.QueryAPI(influx_org)
|
||
|
|
result, err := queryAPI.Query(context.Background(), sql)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
return result, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func QuerySimple() {
|
||
|
|
client := newInstance()
|
||
|
|
defer client.Close()
|
||
|
|
// Get query client
|
||
|
|
queryAPI := client.QueryAPI(influx_org)
|
||
|
|
// Get parser flux query result
|
||
|
|
result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
|
||
|
|
if err == nil {
|
||
|
|
// Use Next() to iterate over query result lines
|
||
|
|
for result.Next() {
|
||
|
|
// Observe when there is new grouping key producing new table
|
||
|
|
if result.TableChanged() {
|
||
|
|
fmt.Printf("table: %s\n", result.TableMetadata().String())
|
||
|
|
}
|
||
|
|
// read result
|
||
|
|
fmt.Printf("row: %s\n", result.Record().String())
|
||
|
|
}
|
||
|
|
if result.Err() != nil {
|
||
|
|
fmt.Printf("Query error: %s\n", result.Err().Error())
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
panic(err)
|
||
|
|
}
|
||
|
|
// Ensures background processes finishes
|
||
|
|
client.Close()
|
||
|
|
}
|
||
|
|
|
||
|
|
// 示例,不要进行生产环境调用
|
||
|
|
func WriteSimple(bucketName string) {
|
||
|
|
client := newInstance()
|
||
|
|
writeAPI := client.WriteAPIBlocking(influx_org, bucketName)
|
||
|
|
// Create point using full params constructor
|
||
|
|
p := influxdb2.NewPoint("stat",
|
||
|
|
map[string]string{"unit": "temperature"},
|
||
|
|
map[string]interface{}{"avg": 24.5, "max": 45.0},
|
||
|
|
time.Now())
|
||
|
|
// write point immediately
|
||
|
|
writeAPI.WritePoint(context.Background(), p)
|
||
|
|
// Create point using fluent style
|
||
|
|
p = influxdb2.NewPointWithMeasurement("stat").
|
||
|
|
AddTag("unit", "temperature").
|
||
|
|
AddField("avg", 23.2).
|
||
|
|
AddField("max", 45.0).
|
||
|
|
SetTime(time.Now())
|
||
|
|
if err := writeAPI.WritePoint(context.Background(), p); err != nil {
|
||
|
|
panic(err)
|
||
|
|
}
|
||
|
|
// Or write directly line protocol
|
||
|
|
line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
|
||
|
|
if err := writeAPI.WriteRecord(context.Background(), line); err != nil {
|
||
|
|
panic(err)
|
||
|
|
}
|
||
|
|
}
|