diff --git a/influx2/influx.go b/influx2/influx.go new file mode 100644 index 0000000..c1b2f0d --- /dev/null +++ b/influx2/influx.go @@ -0,0 +1,136 @@ +package influx + +import ( + "context" + "fmt" + "os" + "time" + + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" +) + +var ( + influx_dsn string + influx_token string + influx_org string +) + +func init() { + influx_dsn = os.Getenv("INFLUX_DSN") + if influx_dsn == "" { + influx_dsn = "http://influx:8086" + } + influx_token = os.Getenv("INFLUX_TOKEN") + influx_org = os.Getenv("INFLUX_ORG") +} + +// 实例 +func newInstance() influxdb2.Client { + client := influxdb2.NewClient(influx_dsn, influx_token) + return client +} + +// 写入例子 +func write(bucket, measurement string, tags map[string]string, fields map[string]interface{}) error { + client := newInstance() + defer client.Close() + writeAPI := client.WriteAPIBlocking(influx_org, bucket) + // Create point using full params constructor + p := influxdb2.NewPoint(measurement, + tags, + fields, + time.Now()) + // write point immediately + if err := writeAPI.WritePoint(context.Background(), p); err != nil { + return err + } + // Or write directly line protocol + // line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0) + // err = writeAPI.WriteRecord(context.Background(), line) + // if err != nil { + // panic(err) + // } + return nil +} + +func writeLine(bucket, line string) error { + client := newInstance() + defer client.Close() + writeAPI := client.WriteAPIBlocking(influx_org, bucket) + // Create point using full params constructor + p := influxdb2.NewPoint(line, + nil, + nil, + time.Now()) + // write point immediately + if err := writeAPI.WritePoint(context.Background(), p); err != nil { + return err + } + return nil +} + +func Query(sql string) (*api.QueryTableResult, error) { + client := newInstance() + defer client.Close() + queryAPI := client.QueryAPI(influx_org) + result, err := queryAPI.Query(context.Background(), sql) + if err != nil { + return nil, err + } + return result, nil +} + +func QuerySimple() { + client := newInstance() + defer client.Close() + // Get query client + queryAPI := client.QueryAPI(influx_org) + // Get parser flux query result + result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`) + if err == nil { + // Use Next() to iterate over query result lines + for result.Next() { + // Observe when there is new grouping key producing new table + if result.TableChanged() { + fmt.Printf("table: %s\n", result.TableMetadata().String()) + } + // read result + fmt.Printf("row: %s\n", result.Record().String()) + } + if result.Err() != nil { + fmt.Printf("Query error: %s\n", result.Err().Error()) + } + } else { + panic(err) + } + // Ensures background processes finishes + client.Close() +} + +// 示例,不要进行生产环境调用 +func WriteSimple(bucketName string) { + client := newInstance() + writeAPI := client.WriteAPIBlocking(influx_org, bucketName) + // Create point using full params constructor + p := influxdb2.NewPoint("stat", + map[string]string{"unit": "temperature"}, + map[string]interface{}{"avg": 24.5, "max": 45.0}, + time.Now()) + // write point immediately + writeAPI.WritePoint(context.Background(), p) + // Create point using fluent style + p = influxdb2.NewPointWithMeasurement("stat"). + AddTag("unit", "temperature"). + AddField("avg", 23.2). + AddField("max", 45.0). + SetTime(time.Now()) + if err := writeAPI.WritePoint(context.Background(), p); err != nil { + panic(err) + } + // Or write directly line protocol + line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0) + if err := writeAPI.WriteRecord(context.Background(), line); err != nil { + panic(err) + } +}