influx for docker

This commit is contained in:
suguo.yao 2023-12-24 10:02:30 +08:00
parent 13cd706505
commit a22a83f93c
1 changed files with 136 additions and 0 deletions

136
influx2/influx.go Normal file
View File

@ -0,0 +1,136 @@
package influx
import (
"context"
"fmt"
"os"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
var (
influx_dsn string
influx_token string
influx_org string
)
func init() {
influx_dsn = os.Getenv("INFLUX_DSN")
if influx_dsn == "" {
influx_dsn = "http://influx:8086"
}
influx_token = os.Getenv("INFLUX_TOKEN")
influx_org = os.Getenv("INFLUX_ORG")
}
// 实例
func newInstance() influxdb2.Client {
client := influxdb2.NewClient(influx_dsn, influx_token)
return client
}
// 写入例子
func write(bucket, measurement string, tags map[string]string, fields map[string]interface{}) error {
client := newInstance()
defer client.Close()
writeAPI := client.WriteAPIBlocking(influx_org, bucket)
// Create point using full params constructor
p := influxdb2.NewPoint(measurement,
tags,
fields,
time.Now())
// write point immediately
if err := writeAPI.WritePoint(context.Background(), p); err != nil {
return err
}
// Or write directly line protocol
// line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
// err = writeAPI.WriteRecord(context.Background(), line)
// if err != nil {
// panic(err)
// }
return nil
}
func writeLine(bucket, line string) error {
client := newInstance()
defer client.Close()
writeAPI := client.WriteAPIBlocking(influx_org, bucket)
// Create point using full params constructor
p := influxdb2.NewPoint(line,
nil,
nil,
time.Now())
// write point immediately
if err := writeAPI.WritePoint(context.Background(), p); err != nil {
return err
}
return nil
}
func Query(sql string) (*api.QueryTableResult, error) {
client := newInstance()
defer client.Close()
queryAPI := client.QueryAPI(influx_org)
result, err := queryAPI.Query(context.Background(), sql)
if err != nil {
return nil, err
}
return result, nil
}
func QuerySimple() {
client := newInstance()
defer client.Close()
// Get query client
queryAPI := client.QueryAPI(influx_org)
// Get parser flux query result
result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
if err == nil {
// Use Next() to iterate over query result lines
for result.Next() {
// Observe when there is new grouping key producing new table
if result.TableChanged() {
fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// read result
fmt.Printf("row: %s\n", result.Record().String())
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
} else {
panic(err)
}
// Ensures background processes finishes
client.Close()
}
// 示例,不要进行生产环境调用
func WriteSimple(bucketName string) {
client := newInstance()
writeAPI := client.WriteAPIBlocking(influx_org, bucketName)
// Create point using full params constructor
p := influxdb2.NewPoint("stat",
map[string]string{"unit": "temperature"},
map[string]interface{}{"avg": 24.5, "max": 45.0},
time.Now())
// write point immediately
writeAPI.WritePoint(context.Background(), p)
// Create point using fluent style
p = influxdb2.NewPointWithMeasurement("stat").
AddTag("unit", "temperature").
AddField("avg", 23.2).
AddField("max", 45.0).
SetTime(time.Now())
if err := writeAPI.WritePoint(context.Background(), p); err != nil {
panic(err)
}
// Or write directly line protocol
line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
if err := writeAPI.WriteRecord(context.Background(), line); err != nil {
panic(err)
}
}