influx group by 使用

This commit is contained in:
wyh 2021-08-16 19:40:34 +08:00
parent 5fa497043d
commit afea23f776
7 changed files with 1489 additions and 68 deletions

25
.gitignore vendored
View File

@ -1 +1,24 @@
.vscode
# ---> Go
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
.vscode
influx-demo
logs/
*.yaml
*.json
go.sum
assets/
.updatetime

7
README.md Normal file
View File

@ -0,0 +1,7 @@
# influx-demo
influx 的使用
### 概述
通过 go 操作 influxdb,实现读取和插入

14
go.mod
View File

@ -2,4 +2,16 @@ module myschools.me/wyh/influx-demo.git
go 1.16
require github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
require (
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6 // indirect
github.com/coreos/etcd v3.3.10+incompatible // indirect
github.com/coreos/go-etcd v2.0.0+incompatible // indirect
github.com/gin-gonic/gin v1.7.4 // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/influxdata/influxdb v1.9.3
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/viper v1.8.1 // indirect
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77 // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v8 v8.18.2 // indirect
)

1299
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -4,23 +4,17 @@ import (
"log"
"time"
client "github.com/influxdata/influxdb1-client/v2"
client "github.com/influxdata/influxdb/client/v2"
"github.com/spf13/viper"
"myschools.me/wyh/influx-demo.git/model"
)
const (
//MyDB 数据库名
MyDB = "wyh"
username = ""
password = ""
)
//ConnInflux 连接influxDB
func ConnInflux() client.Client {
cli, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://127.0.0.1:8086",
Username: username,
Password: password,
Addr: viper.GetString("influx.Addr"),
Username: viper.GetString("influx.UserName"),
Password: viper.GetString("influx.Password"),
})
if err != nil {
log.Fatal(err)
@ -31,22 +25,12 @@ func ConnInflux() client.Client {
//WritesPoints 写入数据
func WritesPoints(points *model.Influx, cli client.Client) {
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: MyDB,
Database: viper.GetString("influx.DbName"),
Precision: "s",
})
if err != nil {
log.Fatal(err)
}
log.Println(points.TableName)
log.Println(points.Tags)
log.Println(points.Fields)
// tags := map[string]string{"cpu": "ih-cpu"}
// fields := map[string]interface{}{
// "idle": 20.1,
// "system": 43.3,
// "user": 86.6,
// }
pt, err := client.NewPoint(
// "cpu_usage",
@ -69,7 +53,7 @@ func WritesPoints(points *model.Influx, cli client.Client) {
func QueryDB(cli client.Client, cmd string) (res []client.Result, err error) {
q := client.Query{
Command: cmd,
Database: MyDB,
Database: viper.GetString("influx.DbName"),
}
if response, err := cli.Query(q); err == nil {
if response.Error() != nil {

60
main.go
View File

@ -1,53 +1,31 @@
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"flag"
"myschools.me/wyh/influx-demo.git/influx"
"myschools.me/wyh/influx-demo.git/model"
)
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
const (
// MyDB wyh
MyDB = "wyh"
// MyMeasurement cpu_usgae
MyMeasurement = "cpu_usage"
"github.com/spf13/viper"
"myschools.me/wyh/influx-demo.git/service"
)
func main() {
conn := influx.ConnInflux()
fmt.Println(conn)
cf := flag.String("config", "config.yaml", "file of config")
flag.Parse()
//insert
var points = &model.Influx{
TableName: "cpu_usage",
Tags: map[string]string{"cpu": "ih-cpu"},
Fields: map[string]interface{}{
"idle": 20.1,
"system": 43.3,
"user": 86.6,
},
}
influx.WritesPoints(points, conn)
//获取10条数据并展示
qs := fmt.Sprintf("SELECT * FROM %s LIMIT %d", MyMeasurement, 10)
res, err := influx.QueryDB(conn, qs)
if err != nil {
log.Fatal(err)
viper.SetConfigFile(*cf)
if err := viper.ReadInConfig(); err != nil {
log.WithFields(log.Fields{
"func": "main",
}).Errorf("%s", err.Error())
return
}
for k, row := range res[0].Series[0].Values {
t, err := time.Parse(time.RFC3339, row[0].(string))
if err != nil {
log.Fatal(err)
}
value := row[2].(json.Number)
valu := row[3].(json.Number)
log.Printf("[%2d] %s: %s %s\n", k, t.Format(time.Stamp), value, valu)
}
// go service.Writepoints()
// go service.Querypoint()
go service.TimeIntervalQuery()
r := gin.New()
r.Run()
}

122
service/cpu-service.go Normal file
View File

@ -0,0 +1,122 @@
package service
import (
"encoding/json"
"fmt"
"time"
client "github.com/influxdata/influxdb/client/v2"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"myschools.me/wyh/influx-demo.git/influx"
)
//Writepoints 写入数据cpu-usage
func Writepoints() {
cli := influx.ConnInflux()
dp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: viper.GetString("influx.DbName"),
Precision: "s",
})
if err != nil {
log.WithFields(log.Fields{
"func": "Writepoints",
}).Warnf("%s", err.Error())
}
var Tags = map[string]string{"cpu": "ih-cpu"}
var fields = map[string]interface{}{
"idle": 20.1,
"system": 43.3,
"user": 86.6,
}
pt, err := client.NewPoint(
viper.GetString("influx.TableName"),
Tags,
fields,
time.Now(),
)
if err != nil {
log.WithFields(log.Fields{
"func": "Writepoints",
}).Warnf("%s", err.Error())
}
dp.AddPoint(pt)
if err := cli.Write(dp); err != nil {
log.WithFields(log.Fields{
"func": "Writepoints",
}).Warnf("%s", err.Error())
}
}
// Querypoint 查询数据
func Querypoint() {
cli := influx.ConnInflux()
qs := fmt.Sprintf("SELECT * FROM %s LIMIT %d", viper.GetString("influx.TableName"), 10)
q := client.Query{
Command: qs,
Database: viper.GetString("influx.DbName"),
}
var res []client.Result
if response, err := cli.Query(q); err == nil {
if response.Error() != nil {
log.WithFields(log.Fields{
"func": "Querypoint",
}).Warnf("%s", err.Error())
}
res = response.Results
} else {
log.WithFields(log.Fields{
"func": "Querypoint",
}).Warnf("%s", err.Error())
}
for k, row := range res[0].Series[0].Values {
t, err := time.Parse(time.RFC3339, row[0].(string))
if err != nil {
log.Fatal(err)
}
value := row[2].(json.Number)
valu := row[3].(json.Number)
fmt.Printf("[%2d] %s: %s %s\n", k, t.Format(time.Stamp), value, valu)
}
}
//TimeIntervalQuery 查询每12分钟system的个数
func TimeIntervalQuery() {
conn := influx.ConnInflux()
qs := fmt.Sprintf(`select count("%s") from %s where time >='2021-08-15T01:27:46Z' and time<='2021-08-16T03:39:45Z' group by time(12m)`, "system", viper.GetString("influx.TableName"))
q := client.Query{
Database: viper.GetString("influx.DbName"),
Command: qs,
}
var res []client.Result
if resopnse, err := conn.Query(q); err == nil {
if resopnse.Error() != nil {
log.WithFields(log.Fields{
"func": "TimeIntervalQuery",
}).Warnf("%s", resopnse.Error())
}
res = resopnse.Results
} else {
log.WithFields(log.Fields{
"func": "TimeIntervalQuery",
}).Warnf("%s", resopnse.Error())
}
for _, row := range res[0].Series[0].Values {
t, err := time.Parse(time.RFC3339, row[0].(string))
if err != nil {
log.Fatal(err)
}
num := row[1]
result := fmt.Sprintf("%s,%s", t, num)
fmt.Println(result)
}
}