From 830b635af74b19881817286d6e1e2871df90156f Mon Sep 17 00:00:00 2001 From: "suguo.yao" Date: Tue, 4 Jan 2022 16:32:40 +0800 Subject: [PATCH] =?UTF-8?q?mosquitto=E6=B5=8B=E8=AF=95=E9=80=9A=E8=BF=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + mosquitto/config.go | 7 +++ mosquitto/handler.go | 23 +++++++++ mosquitto/mosquitto.go | 103 +++++++++++++++++++++++++++++++++++++++++ test/mosquitto_test.go | 37 +++++++++++++++ 5 files changed, 171 insertions(+) create mode 100644 mosquitto/config.go create mode 100644 mosquitto/handler.go create mode 100644 mosquitto/mosquitto.go create mode 100644 test/mosquitto_test.go diff --git a/go.mod b/go.mod index b3af506..e01e1cb 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module myschools.me/suguo/snippet go 1.17 require ( + github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/gin-gonic/gin v1.7.4 github.com/gomodule/redigo v1.8.5 github.com/hashicorp/consul/api v1.10.1 diff --git a/mosquitto/config.go b/mosquitto/config.go new file mode 100644 index 0000000..d1039b8 --- /dev/null +++ b/mosquitto/config.go @@ -0,0 +1,7 @@ +package mosquitto + +type Config struct { + Host string + Username string + Password string +} diff --git a/mosquitto/handler.go b/mosquitto/handler.go new file mode 100644 index 0000000..c361105 --- /dev/null +++ b/mosquitto/handler.go @@ -0,0 +1,23 @@ +package mosquitto + +import ( + "fmt" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +var connHandler MQTT.OnConnectHandler = func(client MQTT.Client) { + token := client.Subscribe("#", 0, serviceHandler) + if token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + } +} + +var connLostHandler MQTT.ConnectionLostHandler = func(client MQTT.Client, err error) { + fmt.Println(err.Error()) +} + +//具体业务订阅的处理,此处为示例 +var serviceHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { + fmt.Println(msg.Topic()) +} diff --git a/mosquitto/mosquitto.go b/mosquitto/mosquitto.go new file mode 100644 index 0000000..611da91 --- /dev/null +++ b/mosquitto/mosquitto.go @@ -0,0 +1,103 @@ +package mosquitto + +import ( + "encoding/json" + "errors" + + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +var ( + clientSubscribe, clientDistribute MQTT.Client + config *Config +) + +func Init(c *Config) { + config = c + if config == nil { + config = &Config{ + Host: "tcp://127.0.0.1:1883", + } + } +} + +func Options() *MQTT.ClientOptions { + opts := MQTT.NewClientOptions() + opts.AddBroker(config.Host) + opts.Username = config.Username + opts.Password = config.Password + return opts +} + +func New() (MQTT.Client, error) { + if clientDistribute != nil { + return clientDistribute, nil + } + opts := Options() + clientDistribute = MQTT.NewClient(opts) + token := clientDistribute.Connect() + if token.Wait() && token.Error() != nil { + clientDistribute = nil + return nil, token.Error() + } + + return clientDistribute, nil +} + +//订阅,当事件为nil时可获取client +func Subscribe(connHandler MQTT.OnConnectHandler, lostHandler MQTT.ConnectionLostHandler) error { + if connHandler == nil { + return errors.New("handler is nil") + } + + opts := Options() + opts.OnConnect = connHandler //当连接成功后调用注册 + if lostHandler != nil { + opts.OnConnectionLost = lostHandler + } + + if clientSubscribe == nil { + clientSubscribe = MQTT.NewClient(opts) + } + + token := clientSubscribe.Connect() + if token.Wait() && token.Error() != nil { + clientSubscribe = nil + return token.Error() + } + + return nil +} + +//取消订阅 +func UnSubscribe(topic string) { + if clientSubscribe == nil { + return + } + c := clientSubscribe + c.Unsubscribe(topic) + c.Disconnect(250) +} + +//分发 +func Distribute(topic *string, qos int, retained bool, payload *[]byte) error { + c, err := New() + if err != nil { + return err + } + + token := c.Publish(*topic, byte(qos), retained, *payload) + if token.Wait() && token.Error() != nil { + return token.Error() + } + return nil +} + +//分发Object +func DistributeObject(topic *string, qos int, retained bool, obj interface{}) error { + payload, err := json.Marshal(obj) + if err != nil { + return err + } + return Distribute(topic, qos, retained, &payload) +} diff --git a/test/mosquitto_test.go b/test/mosquitto_test.go new file mode 100644 index 0000000..4dddaff --- /dev/null +++ b/test/mosquitto_test.go @@ -0,0 +1,37 @@ +package test + +import ( + "fmt" + "testing" + + MQTT "github.com/eclipse/paho.mqtt.golang" + "myschools.me/suguo/snippet/mosquitto" +) + +func TestSubscribe(t *testing.T) { + mosquitto.Init(&mosquitto.Config{}) + if err := mosquitto.Subscribe(connHandler, connLostHandler); err != nil { + t.Fatal(err) + } + + topic := "xtj/aaaa" + payload := []byte("sadfdsaf") + mosquitto.Distribute(&topic, 0, false, &payload) + select {} +} + +var connHandler MQTT.OnConnectHandler = func(client MQTT.Client) { + token := client.Subscribe("#", 0, serviceHandler) + if token.Wait() && token.Error() != nil { + fmt.Println(token.Error()) + } +} + +var connLostHandler MQTT.ConnectionLostHandler = func(client MQTT.Client, err error) { + fmt.Println(err.Error()) +} + +//具体业务订阅的处理,此处为示例 +var serviceHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { + fmt.Println(msg.Topic()) +}