commit 97259319e15fb2c2697cd3c02e26cff7ab8b3f5b Author: suguo.yao Date: Tue Feb 28 11:24:39 2023 +0800 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5293031 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +go.sum +logs/ +*.exe \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..68b400e --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module myschools.me/suguo/mqtt-demo + +go 1.20 + +require github.com/eclipse/paho.mqtt.golang v1.4.2 + +require ( + github.com/gorilla/websocket v1.4.2 // indirect + golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect +) diff --git a/handler/mqtt-handler.go b/handler/mqtt-handler.go new file mode 100644 index 0000000..ef9b56c --- /dev/null +++ b/handler/mqtt-handler.go @@ -0,0 +1,42 @@ +package handler + +import ( + "encoding/json" + "fmt" + "time" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +var MqttConnect MQTT.OnConnectHandler = func(client MQTT.Client) { + token := client.Subscribe("access/ble", 0, serviceHandler) + if token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + } +} + +var MqttConnLost MQTT.ConnectionLostHandler = func(client MQTT.Client, err error) { + fmt.Println(err.Error()) +} + +type Pack struct { + V int `json:"v"` + Mid int `json:"mid"` + Time int `json:"time"` + IP string `json:"ip"` + Mac string `json:"mac"` + Devices [][]interface{} `json:"devices"` +} + +// 具体业务订阅的处理,此处为示例 +var serviceHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { + body := &Pack{} + json.Unmarshal([]byte(msg.Payload()), body) + for _, o := range body.Devices { + if o[1] != "D02EAB2D6D49" { + continue + } + payload := fmt.Sprintf("%s,%s,%s,%f", time.Unix(int64(body.Time), 0), body.Mac, o[1], o[2].(float64)) + fmt.Println(payload) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..14dc119 --- /dev/null +++ b/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "time" + + "myschools.me/suguo/mqtt-demo/handler" + "myschools.me/suguo/mqtt-demo/mqtt" +) + +func main() { + mqtt.Init(&mqtt.Config{ + Host: "47.99.211.113:1883", + Username: "dev", + Password: "99b0Ab842Za3bd5d", + }) + + if err := mqtt.Subscribe(handler.MqttConnect, handler.MqttConnLost); err != nil { + log.Fatal(err) + } + + defer mqtt.UnSubscribe("access/#") + + // 服务停止相应 + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + _, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + os.Exit(0) + +} diff --git a/mqtt/config.go b/mqtt/config.go new file mode 100644 index 0000000..556d895 --- /dev/null +++ b/mqtt/config.go @@ -0,0 +1,8 @@ +package mqtt + +type Config struct { + Host string + Username string + Password string + ClientID string +} diff --git a/mqtt/handler.go b/mqtt/handler.go new file mode 100644 index 0000000..c9a5ae3 --- /dev/null +++ b/mqtt/handler.go @@ -0,0 +1,23 @@ +package mqtt + +import ( + "fmt" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +var connHandler MQTT.OnConnectHandler = func(client MQTT.Client) { + token := client.Subscribe("#", 0, serviceHandler) + if token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + } +} + +var connLostHandler MQTT.ConnectionLostHandler = func(client MQTT.Client, err error) { + fmt.Println(err.Error()) +} + +// 具体业务订阅的处理,此处为示例 +var serviceHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { + fmt.Println(msg.Topic()) +} diff --git a/mqtt/mqtt.go b/mqtt/mqtt.go new file mode 100644 index 0000000..ba9e303 --- /dev/null +++ b/mqtt/mqtt.go @@ -0,0 +1,104 @@ +package mqtt + +import ( + "encoding/json" + "errors" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +var ( + clientSubscribe, clientDistribute MQTT.Client + config *Config +) + +func Init(c *Config) { + config = c + if config == nil { + config = &Config{ + Host: "tcp://127.0.0.1:1883", + } + } +} + +func Options() *MQTT.ClientOptions { + opts := MQTT.NewClientOptions() + opts.AddBroker(config.Host) + opts.Username = config.Username + opts.Password = config.Password + opts.ClientID = config.ClientID + return opts +} + +func New() (MQTT.Client, error) { + if clientDistribute != nil { + return clientDistribute, nil + } + opts := Options() + clientDistribute = MQTT.NewClient(opts) + token := clientDistribute.Connect() + if token.Wait() && token.Error() != nil { + clientDistribute = nil + return nil, token.Error() + } + + return clientDistribute, nil +} + +// 订阅,当事件为nil时可获取client +func Subscribe(connHandler MQTT.OnConnectHandler, lostHandler MQTT.ConnectionLostHandler) error { + if connHandler == nil { + return errors.New("handler is nil") + } + + opts := Options() + opts.OnConnect = connHandler //当连接成功后调用注册 + if lostHandler != nil { + opts.OnConnectionLost = lostHandler + } + + if clientSubscribe == nil { + clientSubscribe = MQTT.NewClient(opts) + } + + token := clientSubscribe.Connect() + if token.Wait() && token.Error() != nil { + clientSubscribe = nil + return token.Error() + } + + return nil +} + +// 取消订阅 +func UnSubscribe(topic string) { + if clientSubscribe == nil { + return + } + c := clientSubscribe + c.Unsubscribe(topic) + c.Disconnect(250) +} + +// 分发 +func Distribute(topic *string, qos int, retained bool, payload *[]byte) error { + c, err := New() + if err != nil { + return err + } + + token := c.Publish(*topic, byte(qos), retained, *payload) + if token.Wait() && token.Error() != nil { + return token.Error() + } + return nil +} + +// 分发Object +func DistributeObject(topic *string, qos int, retained bool, obj interface{}) error { + payload, err := json.Marshal(obj) + if err != nil { + return err + } + return Distribute(topic, qos, retained, &payload) +}