This commit is contained in:
suguo.yao 2023-02-28 11:24:39 +08:00
commit 97259319e1
7 changed files with 226 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
go.sum
logs/
*.exe

11
go.mod Normal file
View File

@ -0,0 +1,11 @@
module myschools.me/suguo/mqtt-demo
go 1.20
require github.com/eclipse/paho.mqtt.golang v1.4.2
require (
github.com/gorilla/websocket v1.4.2 // indirect
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
)

42
handler/mqtt-handler.go Normal file
View File

@ -0,0 +1,42 @@
package handler
import (
"encoding/json"
"fmt"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var MqttConnect MQTT.OnConnectHandler = func(client MQTT.Client) {
token := client.Subscribe("access/ble", 0, serviceHandler)
if token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
}
var MqttConnLost MQTT.ConnectionLostHandler = func(client MQTT.Client, err error) {
fmt.Println(err.Error())
}
type Pack struct {
V int `json:"v"`
Mid int `json:"mid"`
Time int `json:"time"`
IP string `json:"ip"`
Mac string `json:"mac"`
Devices [][]interface{} `json:"devices"`
}
// 具体业务订阅的处理,此处为示例
var serviceHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
body := &Pack{}
json.Unmarshal([]byte(msg.Payload()), body)
for _, o := range body.Devices {
if o[1] != "D02EAB2D6D49" {
continue
}
payload := fmt.Sprintf("%s,%s,%s,%f", time.Unix(int64(body.Time), 0), body.Mac, o[1], o[2].(float64))
fmt.Println(payload)
}
}

35
main.go Normal file
View File

@ -0,0 +1,35 @@
package main
import (
"context"
"log"
"os"
"os/signal"
"time"
"myschools.me/suguo/mqtt-demo/handler"
"myschools.me/suguo/mqtt-demo/mqtt"
)
func main() {
mqtt.Init(&mqtt.Config{
Host: "47.99.211.113:1883",
Username: "dev",
Password: "99b0Ab842Za3bd5d",
})
if err := mqtt.Subscribe(handler.MqttConnect, handler.MqttConnLost); err != nil {
log.Fatal(err)
}
defer mqtt.UnSubscribe("access/#")
// 服务停止相应
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
_, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
os.Exit(0)
}

8
mqtt/config.go Normal file
View File

@ -0,0 +1,8 @@
package mqtt
type Config struct {
Host string
Username string
Password string
ClientID string
}

23
mqtt/handler.go Normal file
View File

@ -0,0 +1,23 @@
package mqtt
import (
"fmt"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var connHandler MQTT.OnConnectHandler = func(client MQTT.Client) {
token := client.Subscribe("#", 0, serviceHandler)
if token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
}
var connLostHandler MQTT.ConnectionLostHandler = func(client MQTT.Client, err error) {
fmt.Println(err.Error())
}
// 具体业务订阅的处理,此处为示例
var serviceHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Println(msg.Topic())
}

104
mqtt/mqtt.go Normal file
View File

@ -0,0 +1,104 @@
package mqtt
import (
"encoding/json"
"errors"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var (
clientSubscribe, clientDistribute MQTT.Client
config *Config
)
func Init(c *Config) {
config = c
if config == nil {
config = &Config{
Host: "tcp://127.0.0.1:1883",
}
}
}
func Options() *MQTT.ClientOptions {
opts := MQTT.NewClientOptions()
opts.AddBroker(config.Host)
opts.Username = config.Username
opts.Password = config.Password
opts.ClientID = config.ClientID
return opts
}
func New() (MQTT.Client, error) {
if clientDistribute != nil {
return clientDistribute, nil
}
opts := Options()
clientDistribute = MQTT.NewClient(opts)
token := clientDistribute.Connect()
if token.Wait() && token.Error() != nil {
clientDistribute = nil
return nil, token.Error()
}
return clientDistribute, nil
}
// 订阅当事件为nil时可获取client
func Subscribe(connHandler MQTT.OnConnectHandler, lostHandler MQTT.ConnectionLostHandler) error {
if connHandler == nil {
return errors.New("handler is nil")
}
opts := Options()
opts.OnConnect = connHandler //当连接成功后调用注册
if lostHandler != nil {
opts.OnConnectionLost = lostHandler
}
if clientSubscribe == nil {
clientSubscribe = MQTT.NewClient(opts)
}
token := clientSubscribe.Connect()
if token.Wait() && token.Error() != nil {
clientSubscribe = nil
return token.Error()
}
return nil
}
// 取消订阅
func UnSubscribe(topic string) {
if clientSubscribe == nil {
return
}
c := clientSubscribe
c.Unsubscribe(topic)
c.Disconnect(250)
}
// 分发
func Distribute(topic *string, qos int, retained bool, payload *[]byte) error {
c, err := New()
if err != nil {
return err
}
token := c.Publish(*topic, byte(qos), retained, *payload)
if token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
// 分发Object
func DistributeObject(topic *string, qos int, retained bool, obj interface{}) error {
payload, err := json.Marshal(obj)
if err != nil {
return err
}
return Distribute(topic, qos, retained, &payload)
}