From 87048a500e65c73a782eecf963629b9f18a25f71 Mon Sep 17 00:00:00 2001 From: "suguo.yao" Date: Fri, 17 Dec 2021 12:12:21 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8influx=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E5=86=99=E5=85=A5=E6=80=A7=E8=83=BD=E6=8F=90=E9=AB=98=E5=A5=BD?= =?UTF-8?q?=E5=A4=9A=E5=A5=BD=E5=A4=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 4 ++- go.mod | 2 -- influx/influx.go | 40 ++++++++++++++------- main.go | 92 +++++++++++++++++++++++++++++++++--------------- sqlite/config.go | 5 --- sqlite/sqlite.go | 63 --------------------------------- 6 files changed, 93 insertions(+), 113 deletions(-) delete mode 100644 sqlite/config.go delete mode 100644 sqlite/sqlite.go diff --git a/Makefile b/Makefile index 21ba741..bfabcdf 100644 --- a/Makefile +++ b/Makefile @@ -5,4 +5,6 @@ arm: .PHONY: release release: arm - scp ./ble-ibeacon pi@192.168.0.21:~/ \ No newline at end of file + ssh pi@192.168.0.21 "sudo systemctl stop ble.service" + scp ./ble-ibeacon pi@192.168.0.21:~/ + ssh pi@192.168.0.21 "sudo systemctl start ble.service" \ No newline at end of file diff --git a/go.mod b/go.mod index b1f38f7..6960c32 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,10 @@ go 1.17 require ( github.com/influxdata/influxdb v1.9.5 github.com/paypal/gatt v0.0.0-20151011220935-4ae819d591cf - gorm.io/driver/sqlite v1.2.6 gorm.io/gorm v1.22.4 ) require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.3 // indirect - github.com/mattn/go-sqlite3 v1.14.9 // indirect ) diff --git a/influx/influx.go b/influx/influx.go index 271e7df..b8e86ee 100644 --- a/influx/influx.go +++ b/influx/influx.go @@ -3,6 +3,7 @@ package influx import ( "context" "encoding/json" + "errors" "fmt" "time" @@ -102,17 +103,30 @@ func ReadSample() error { return nil } -/* - // 建库 - createDbSQL := client.NewQuery(fmt.Sprintf("CREATE DATABASE %s", databaseName), "", "") - // 建表 +//创建数据库,并配置默认过期策略(单位:天) +func CreateDB(dbname string, expired uint) error { + if dbname == "" || expired == 0 { + return errors.New("无效参数") + } + db, err := New() + if err != nil { + return err + } + defer db.Close() - createInfluxTable() - - createCQ2m := client.NewQuery(fmt.Sprintf("CREATE CONTINUOUS QUERY cq_exchanger_2m ON %s "+ - "BEGIN SELECT mean(value) AS value INTO raw_data_2m FROM raw_data GROUP BY unique_id, time(2m) END", databaseName), databaseName, "") - createCQ1h := client.NewQuery(fmt.Sprintf("CREATE CONTINUOUS QUERY cq_exchanger_1h ON %s "+ - "BEGIN SELECT mean(value) AS value INTO raw_data_1h FROM raw_data GROUP BY unique_id, time(1h) END", databaseName), databaseName, "") - // 过期策略 - createRPSQL := client.NewQuery(fmt.Sprintf("CREATE RETENTION POLICY default ON %s DURATION 360d REPLICATION 1 DEFAULT", databaseName), databaseName, "") -*/ + createDbSQL := client.NewQuery(fmt.Sprintf("CREATE DATABASE %s", dbname), "", "") + result, err := db.Query(createDbSQL) + if err != nil { + return err + } + if result.Error() != nil { + return result.Error() + } + // 过期策略,注意策略名称不能为default + createRPSQL := client.NewQuery(fmt.Sprintf("CREATE RETENTION POLICY def ON %s DURATION %dd REPLICATION 1 DEFAULT", dbname, expired), dbname, "") + result, err = db.Query(createRPSQL) + if err != nil { + return err + } + return result.Error() +} diff --git a/main.go b/main.go index f50ffcd..df8ef77 100644 --- a/main.go +++ b/main.go @@ -4,15 +4,15 @@ import ( "encoding/binary" "encoding/hex" "errors" - "fmt" "log" + "strconv" "strings" "time" + "github.com/influxdata/influxdb/client/v2" "github.com/paypal/gatt" "github.com/paypal/gatt/examples/option" - "myschools.me/suguo/ble-ibeacon/model" - "myschools.me/suguo/ble-ibeacon/sqlite" + "myschools.me/suguo/ble-ibeacon/influx" ) type IBeancon struct { @@ -32,6 +32,14 @@ func NewiBeacon(data []byte) (*IBeancon, error) { return beacon, nil } +var ( + chn chan *client.Point +) + +func init() { + chn = make(chan *client.Point, 500) +} + func onPerhipheralDiscovered(p gatt.Peripheral, a *gatt.Advertisement, rssi int) { // if p.ID() != "78:2C:29:16:F2:78" { // return @@ -39,35 +47,32 @@ func onPerhipheralDiscovered(p gatt.Peripheral, a *gatt.Advertisement, rssi int) // if p.ID() != "90:98:38:F5:DA:60" { // return // } - if p.ID() != "E4:78:41:19:10:98" { - return - } + // if p.ID() != "E4:78:41:19:10:98" { + // return + // } // if !(p.ID() == "C6:1A:05:02:06:C0" || p.ID() == "90:98:38:F5:DA:60") { // return // } - fmt.Println("NAME:", a.LocalName, "T:", time.Now(), "MAC:", p.ID(), "RSSI:", rssi, "LEN:", len(a.ManufacturerData)) + // fmt.Println("NAME:", a.LocalName, "T:", time.Now(), "MAC:", p.ID(), "RSSI:", rssi, "LEN:", len(a.ManufacturerData)) - db, err := sqlite.New() + fields := make(map[string]interface{}) + fields["mac"] = p.ID() + fields["rssi"] = rssi + + tags := make(map[string]string) + tags["name"] = a.LocalName + tags["pl"] = strconv.Itoa(len(a.ManufacturerData)) //包长度 + + point, err := client.NewPoint( + "band", + tags, + fields, + time.Now(), + ) if err != nil { log.Fatalln(err.Error()) } - if err := db.Save(&model.Rssi{ - MAC: p.ID(), - Name: a.LocalName, - Rssi: int32(rssi), - PackageLen: len(a.ManufacturerData), - }).Error; err != nil { - log.Fatalln(err.Error()) - } - // for _, d := range a.ServiceData { - // fmt.Println("uuid:", d.UUID, " data:", d.Data) - // } - // b, err := NewiBeacon(a.ManufacturerData) - // if err != nil { - // fmt.Println(err.Error()) - // return - // } - // fmt.Printf("UUID: %s Major: %d Minor: %d RSSI: %d\n", b.Uuid, b.Major, b.Minor, rssi) + chn <- point } func onStateChanged(device gatt.Device, s gatt.State) { @@ -81,14 +86,43 @@ func onStateChanged(device gatt.Device, s gatt.State) { } func main() { - sqlite.Init(&sqlite.Config{ - DBFile: "./ble.db", + influx.Init(&influx.Config{ + Address: "http://127.0.0.1:8086", + DBName: "wristband", + Username: "root", + Password: "Kerry]", + Timeout: 3, }) - if err := sqlite.Migrate(&model.Rssi{}); err != nil { - log.Fatalln(err.Error()) + if err := influx.CreateDB("wristband", 10); err != nil { + log.Fatalln("influx.CreateDB", err.Error()) } + go func() { + db, err := influx.New() + if err != nil { + log.Fatalln(err.Error()) + } + + for { + bp, err := client.NewBatchPoints(client.BatchPointsConfig{ + Database: "wristband", + Precision: "s", + }) + if err != nil { + log.Fatalln(err.Error()) + } + for i := 1; i < 30; i++ { + if d, ok := <-chn; ok { + bp.AddPoint(d) + } + } + if err := db.Write(bp); err != nil { + log.Println("db.Write: ", err.Error()) + } + } + }() + dev, err := gatt.NewDevice(option.DefaultClientOptions...) if err != nil { log.Fatalf("Failed to open device, err:%s\n", err) diff --git a/sqlite/config.go b/sqlite/config.go deleted file mode 100644 index 115eaa8..0000000 --- a/sqlite/config.go +++ /dev/null @@ -1,5 +0,0 @@ -package sqlite - -type Config struct { - DBFile string //DB文件名 -} diff --git a/sqlite/sqlite.go b/sqlite/sqlite.go deleted file mode 100644 index d377640..0000000 --- a/sqlite/sqlite.go +++ /dev/null @@ -1,63 +0,0 @@ -package sqlite - -import ( - "errors" - "os" - "path/filepath" - - "gorm.io/driver/sqlite" - "gorm.io/gorm" - "gorm.io/gorm/logger" -) - -var ( - _conf *Config - _db *gorm.DB -) - -//Init mysql初始化 -func Init(conf *Config) error { - if conf != nil { - _conf = conf - } - return nil -} - -//New 创建实例 -func New() (*gorm.DB, error) { - if _db != nil { - return _db, nil - } - - if _conf == nil { - return nil, errors.New("组件未初始化,请执行Init!") - } - - dir, _ := filepath.Split(_conf.DBFile) - if dir != "" { - _, err := os.Stat(dir) - if os.IsNotExist(err) { - if err := os.MkdirAll(dir, 0755); err != nil { - return nil, err - } - } - } - - var err error - _db, err = gorm.Open(sqlite.Open(_conf.DBFile), &gorm.Config{ - SkipDefaultTransaction: true, - Logger: logger.Default.LogMode(logger.Silent), - }) - if err != nil { - return nil, err - } - return _db, nil -} - -func Migrate(dest ...interface{}) error { - db, err := New() - if err != nil { - return err - } - return db.AutoMigrate(dest...) -}