diff --git a/go.mod b/go.mod index 261fc3c..b1f38f7 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module myschools.me/suguo/ble-ibeacon go 1.17 require ( + github.com/influxdata/influxdb v1.9.5 github.com/paypal/gatt v0.0.0-20151011220935-4ae819d591cf gorm.io/driver/sqlite v1.2.6 gorm.io/gorm v1.22.4 diff --git a/influx/config.go b/influx/config.go new file mode 100644 index 0000000..ec03bfe --- /dev/null +++ b/influx/config.go @@ -0,0 +1,9 @@ +package influx + +type Config struct { + Address string + DBName string + Username string + Password string + Timeout int +} diff --git a/influx/influx.go b/influx/influx.go new file mode 100644 index 0000000..271e7df --- /dev/null +++ b/influx/influx.go @@ -0,0 +1,118 @@ +package influx + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/influxdata/influxdb/client/v2" +) + +var conf *Config + +//配置初始化 +func Init(_conf *Config) { + conf = _conf + if conf == nil { + conf = &Config{ + Address: "127.0.0.1", + Timeout: 3, + } + } +} + +//实例 +func New() (client.Client, error) { + cli, err := client.NewHTTPClient(client.HTTPConfig{ + Addr: conf.Address, + Username: conf.Username, + Password: conf.Password, + Timeout: time.Duration(conf.Timeout) * time.Second, + }) + if err != nil { + return nil, err + } + return cli, nil +} + +// 写入例子 +func WriteSample() error { + cli, err := New() + if err != nil { + return err + } + defer cli.Close() + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ + Database: func() string { + if conf.DBName == "" { + return "sample" + } + return conf.DBName + }(), + Precision: "s", + }) + + fields := make(map[string]interface{}) + fields["f1"] = "F1" + fields["f2"] = 1 + + pt, _ := client.NewPoint( + "heartrate", + map[string]string{"platform": "ws", "orgid": "a001"}, + fields, + time.Now(), + ) + + bp.AddPoint(pt) + if err := cli.Write(bp); err != nil { + return err + } + + return nil +} + +func ReadSample() error { + query, err := New() + if err != nil { + return err + } + defer query.Close() + + resp, err := query.QueryCtx(context.Background(), client.Query{ + Command: `select * from heartrate where time>'` + time.Now().Add(-12*time.Hour).Format("2006-01-02T15:04:05") + `' group by time(10m) tz('Asia/Shanghai')`, + Database: func() string { + if conf.DBName == "" { + return "sample" + } + return conf.DBName + }(), + }) + if err != nil { + return err + } + + for _, s := range resp.Results[0].Series[0].Values { + //fmt.Println(reflect.TypeOf(s[1])) + f1 := s[1].(json.Number).String() + f2, _ := s[2].(json.Number).Int64() + platform := s[3].(json.Number).String() + fmt.Println(f1, f2, platform) + } + return nil +} + +/* + // 建库 + createDbSQL := client.NewQuery(fmt.Sprintf("CREATE DATABASE %s", databaseName), "", "") + // 建表 + + createInfluxTable() + + createCQ2m := client.NewQuery(fmt.Sprintf("CREATE CONTINUOUS QUERY cq_exchanger_2m ON %s "+ + "BEGIN SELECT mean(value) AS value INTO raw_data_2m FROM raw_data GROUP BY unique_id, time(2m) END", databaseName), databaseName, "") + createCQ1h := client.NewQuery(fmt.Sprintf("CREATE CONTINUOUS QUERY cq_exchanger_1h ON %s "+ + "BEGIN SELECT mean(value) AS value INTO raw_data_1h FROM raw_data GROUP BY unique_id, time(1h) END", databaseName), databaseName, "") + // 过期策略 + createRPSQL := client.NewQuery(fmt.Sprintf("CREATE RETENTION POLICY default ON %s DURATION 360d REPLICATION 1 DEFAULT", databaseName), databaseName, "") +*/