133 lines
2.6 KiB
Go
133 lines
2.6 KiB
Go
package influx
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/influxdata/influxdb/client/v2"
|
||
)
|
||
|
||
var conf *Config
|
||
|
||
//配置初始化
|
||
func Init(_conf *Config) {
|
||
conf = _conf
|
||
if conf == nil {
|
||
conf = &Config{
|
||
Address: "127.0.0.1",
|
||
Timeout: 3,
|
||
}
|
||
}
|
||
}
|
||
|
||
//实例
|
||
func New() (client.Client, error) {
|
||
cli, err := client.NewHTTPClient(client.HTTPConfig{
|
||
Addr: conf.Address,
|
||
Username: conf.Username,
|
||
Password: conf.Password,
|
||
Timeout: time.Duration(conf.Timeout) * time.Second,
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return cli, nil
|
||
}
|
||
|
||
// 写入例子
|
||
func WriteSample() error {
|
||
cli, err := New()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer cli.Close()
|
||
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
|
||
Database: func() string {
|
||
if conf.DBName == "" {
|
||
return "sample"
|
||
}
|
||
return conf.DBName
|
||
}(),
|
||
Precision: "s",
|
||
})
|
||
|
||
fields := make(map[string]interface{})
|
||
fields["f1"] = "F1"
|
||
fields["f2"] = 1
|
||
|
||
pt, _ := client.NewPoint(
|
||
"heartrate",
|
||
map[string]string{"platform": "ws", "orgid": "a001"},
|
||
fields,
|
||
time.Now(),
|
||
)
|
||
|
||
bp.AddPoint(pt)
|
||
if err := cli.Write(bp); err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func ReadSample() error {
|
||
query, err := New()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer query.Close()
|
||
|
||
resp, err := query.QueryCtx(context.Background(), client.Query{
|
||
Command: `select * from heartrate where time>'` + time.Now().Add(-12*time.Hour).Format("2006-01-02T15:04:05") + `' group by time(10m) tz('Asia/Shanghai')`,
|
||
Database: func() string {
|
||
if conf.DBName == "" {
|
||
return "sample"
|
||
}
|
||
return conf.DBName
|
||
}(),
|
||
})
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
for _, s := range resp.Results[0].Series[0].Values {
|
||
//fmt.Println(reflect.TypeOf(s[1]))
|
||
f1 := s[1].(json.Number).String()
|
||
f2, _ := s[2].(json.Number).Int64()
|
||
platform := s[3].(json.Number).String()
|
||
fmt.Println(f1, f2, platform)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
//创建数据库,并配置默认过期策略(单位:天)
|
||
func CreateDB(dbname string, expired uint) error {
|
||
if dbname == "" || expired == 0 {
|
||
return errors.New("无效参数")
|
||
}
|
||
db, err := New()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer db.Close()
|
||
|
||
createDbSQL := client.NewQuery(fmt.Sprintf("CREATE DATABASE %s", dbname), "", "")
|
||
result, err := db.Query(createDbSQL)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if result.Error() != nil {
|
||
return result.Error()
|
||
}
|
||
// 过期策略,注意策略名称不能为default
|
||
createRPSQL := client.NewQuery(fmt.Sprintf("CREATE RETENTION POLICY def ON %s DURATION %dd REPLICATION 1 DEFAULT", dbname, expired), dbname, "")
|
||
result, err = db.Query(createRPSQL)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return result.Error()
|
||
}
|