使用influx批量写入性能提高好多好多

This commit is contained in:
suguo.yao 2021-12-17 12:12:21 +08:00
parent fed9e709dd
commit 87048a500e
6 changed files with 93 additions and 113 deletions

View File

@ -5,4 +5,6 @@ arm:
.PHONY: release
release: arm
ssh pi@192.168.0.21 "sudo systemctl stop ble.service"
scp ./ble-ibeacon pi@192.168.0.21:~/
ssh pi@192.168.0.21 "sudo systemctl start ble.service"

2
go.mod
View File

@ -5,12 +5,10 @@ go 1.17
require (
github.com/influxdata/influxdb v1.9.5
github.com/paypal/gatt v0.0.0-20151011220935-4ae819d591cf
gorm.io/driver/sqlite v1.2.6
gorm.io/gorm v1.22.4
)
require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.3 // indirect
github.com/mattn/go-sqlite3 v1.14.9 // indirect
)

View File

@ -3,6 +3,7 @@ package influx
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
@ -102,17 +103,30 @@ func ReadSample() error {
return nil
}
/*
// 建库
createDbSQL := client.NewQuery(fmt.Sprintf("CREATE DATABASE %s", databaseName), "", "")
// 建表
//创建数据库,并配置默认过期策略(单位:天)
func CreateDB(dbname string, expired uint) error {
if dbname == "" || expired == 0 {
return errors.New("无效参数")
}
db, err := New()
if err != nil {
return err
}
defer db.Close()
createInfluxTable()
createCQ2m := client.NewQuery(fmt.Sprintf("CREATE CONTINUOUS QUERY cq_exchanger_2m ON %s "+
"BEGIN SELECT mean(value) AS value INTO raw_data_2m FROM raw_data GROUP BY unique_id, time(2m) END", databaseName), databaseName, "")
createCQ1h := client.NewQuery(fmt.Sprintf("CREATE CONTINUOUS QUERY cq_exchanger_1h ON %s "+
"BEGIN SELECT mean(value) AS value INTO raw_data_1h FROM raw_data GROUP BY unique_id, time(1h) END", databaseName), databaseName, "")
// 过期策略
createRPSQL := client.NewQuery(fmt.Sprintf("CREATE RETENTION POLICY default ON %s DURATION 360d REPLICATION 1 DEFAULT", databaseName), databaseName, "")
*/
createDbSQL := client.NewQuery(fmt.Sprintf("CREATE DATABASE %s", dbname), "", "")
result, err := db.Query(createDbSQL)
if err != nil {
return err
}
if result.Error() != nil {
return result.Error()
}
// 过期策略注意策略名称不能为default
createRPSQL := client.NewQuery(fmt.Sprintf("CREATE RETENTION POLICY def ON %s DURATION %dd REPLICATION 1 DEFAULT", dbname, expired), dbname, "")
result, err = db.Query(createRPSQL)
if err != nil {
return err
}
return result.Error()
}

90
main.go
View File

@ -4,15 +4,15 @@ import (
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/client/v2"
"github.com/paypal/gatt"
"github.com/paypal/gatt/examples/option"
"myschools.me/suguo/ble-ibeacon/model"
"myschools.me/suguo/ble-ibeacon/sqlite"
"myschools.me/suguo/ble-ibeacon/influx"
)
type IBeancon struct {
@ -32,6 +32,14 @@ func NewiBeacon(data []byte) (*IBeancon, error) {
return beacon, nil
}
var (
chn chan *client.Point
)
func init() {
chn = make(chan *client.Point, 500)
}
func onPerhipheralDiscovered(p gatt.Peripheral, a *gatt.Advertisement, rssi int) {
// if p.ID() != "78:2C:29:16:F2:78" {
// return
@ -39,35 +47,32 @@ func onPerhipheralDiscovered(p gatt.Peripheral, a *gatt.Advertisement, rssi int)
// if p.ID() != "90:98:38:F5:DA:60" {
// return
// }
if p.ID() != "E4:78:41:19:10:98" {
return
}
// if p.ID() != "E4:78:41:19:10:98" {
// return
// }
// if !(p.ID() == "C6:1A:05:02:06:C0" || p.ID() == "90:98:38:F5:DA:60") {
// return
// }
fmt.Println("NAME:", a.LocalName, "T:", time.Now(), "MAC:", p.ID(), "RSSI:", rssi, "LEN:", len(a.ManufacturerData))
// fmt.Println("NAME:", a.LocalName, "T:", time.Now(), "MAC:", p.ID(), "RSSI:", rssi, "LEN:", len(a.ManufacturerData))
db, err := sqlite.New()
fields := make(map[string]interface{})
fields["mac"] = p.ID()
fields["rssi"] = rssi
tags := make(map[string]string)
tags["name"] = a.LocalName
tags["pl"] = strconv.Itoa(len(a.ManufacturerData)) //包长度
point, err := client.NewPoint(
"band",
tags,
fields,
time.Now(),
)
if err != nil {
log.Fatalln(err.Error())
}
if err := db.Save(&model.Rssi{
MAC: p.ID(),
Name: a.LocalName,
Rssi: int32(rssi),
PackageLen: len(a.ManufacturerData),
}).Error; err != nil {
log.Fatalln(err.Error())
}
// for _, d := range a.ServiceData {
// fmt.Println("uuid:", d.UUID, " data:", d.Data)
// }
// b, err := NewiBeacon(a.ManufacturerData)
// if err != nil {
// fmt.Println(err.Error())
// return
// }
// fmt.Printf("UUID: %s Major: %d Minor: %d RSSI: %d\n", b.Uuid, b.Major, b.Minor, rssi)
chn <- point
}
func onStateChanged(device gatt.Device, s gatt.State) {
@ -81,14 +86,43 @@ func onStateChanged(device gatt.Device, s gatt.State) {
}
func main() {
sqlite.Init(&sqlite.Config{
DBFile: "./ble.db",
influx.Init(&influx.Config{
Address: "http://127.0.0.1:8086",
DBName: "wristband",
Username: "root",
Password: "Kerry]",
Timeout: 3,
})
if err := sqlite.Migrate(&model.Rssi{}); err != nil {
if err := influx.CreateDB("wristband", 10); err != nil {
log.Fatalln("influx.CreateDB", err.Error())
}
go func() {
db, err := influx.New()
if err != nil {
log.Fatalln(err.Error())
}
for {
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "wristband",
Precision: "s",
})
if err != nil {
log.Fatalln(err.Error())
}
for i := 1; i < 30; i++ {
if d, ok := <-chn; ok {
bp.AddPoint(d)
}
}
if err := db.Write(bp); err != nil {
log.Println("db.Write: ", err.Error())
}
}
}()
dev, err := gatt.NewDevice(option.DefaultClientOptions...)
if err != nil {
log.Fatalf("Failed to open device, err:%s\n", err)

View File

@ -1,5 +0,0 @@
package sqlite
type Config struct {
DBFile string //DB文件名
}

View File

@ -1,63 +0,0 @@
package sqlite
import (
"errors"
"os"
"path/filepath"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var (
_conf *Config
_db *gorm.DB
)
//Init mysql初始化
func Init(conf *Config) error {
if conf != nil {
_conf = conf
}
return nil
}
//New 创建实例
func New() (*gorm.DB, error) {
if _db != nil {
return _db, nil
}
if _conf == nil {
return nil, errors.New("组件未初始化请执行Init")
}
dir, _ := filepath.Split(_conf.DBFile)
if dir != "" {
_, err := os.Stat(dir)
if os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
}
}
var err error
_db, err = gorm.Open(sqlite.Open(_conf.DBFile), &gorm.Config{
SkipDefaultTransaction: true,
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, err
}
return _db, nil
}
func Migrate(dest ...interface{}) error {
db, err := New()
if err != nil {
return err
}
return db.AutoMigrate(dest...)
}