mosquitto测试通过

This commit is contained in:
suguo.yao 2022-01-04 16:32:40 +08:00
parent fa8dded9f5
commit 830b635af7
5 changed files with 171 additions and 0 deletions

1
go.mod
View File

@ -3,6 +3,7 @@ module myschools.me/suguo/snippet
go 1.17 go 1.17
require ( require (
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/gin-gonic/gin v1.7.4 github.com/gin-gonic/gin v1.7.4
github.com/gomodule/redigo v1.8.5 github.com/gomodule/redigo v1.8.5
github.com/hashicorp/consul/api v1.10.1 github.com/hashicorp/consul/api v1.10.1

7
mosquitto/config.go Normal file
View File

@ -0,0 +1,7 @@
package mosquitto
type Config struct {
Host string
Username string
Password string
}

23
mosquitto/handler.go Normal file
View File

@ -0,0 +1,23 @@
package mosquitto
import (
"fmt"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var connHandler MQTT.OnConnectHandler = func(client MQTT.Client) {
token := client.Subscribe("#", 0, serviceHandler)
if token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
}
var connLostHandler MQTT.ConnectionLostHandler = func(client MQTT.Client, err error) {
fmt.Println(err.Error())
}
//具体业务订阅的处理,此处为示例
var serviceHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Println(msg.Topic())
}

103
mosquitto/mosquitto.go Normal file
View File

@ -0,0 +1,103 @@
package mosquitto
import (
"encoding/json"
"errors"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var (
clientSubscribe, clientDistribute MQTT.Client
config *Config
)
func Init(c *Config) {
config = c
if config == nil {
config = &Config{
Host: "tcp://127.0.0.1:1883",
}
}
}
func Options() *MQTT.ClientOptions {
opts := MQTT.NewClientOptions()
opts.AddBroker(config.Host)
opts.Username = config.Username
opts.Password = config.Password
return opts
}
func New() (MQTT.Client, error) {
if clientDistribute != nil {
return clientDistribute, nil
}
opts := Options()
clientDistribute = MQTT.NewClient(opts)
token := clientDistribute.Connect()
if token.Wait() && token.Error() != nil {
clientDistribute = nil
return nil, token.Error()
}
return clientDistribute, nil
}
//订阅当事件为nil时可获取client
func Subscribe(connHandler MQTT.OnConnectHandler, lostHandler MQTT.ConnectionLostHandler) error {
if connHandler == nil {
return errors.New("handler is nil")
}
opts := Options()
opts.OnConnect = connHandler //当连接成功后调用注册
if lostHandler != nil {
opts.OnConnectionLost = lostHandler
}
if clientSubscribe == nil {
clientSubscribe = MQTT.NewClient(opts)
}
token := clientSubscribe.Connect()
if token.Wait() && token.Error() != nil {
clientSubscribe = nil
return token.Error()
}
return nil
}
//取消订阅
func UnSubscribe(topic string) {
if clientSubscribe == nil {
return
}
c := clientSubscribe
c.Unsubscribe(topic)
c.Disconnect(250)
}
//分发
func Distribute(topic *string, qos int, retained bool, payload *[]byte) error {
c, err := New()
if err != nil {
return err
}
token := c.Publish(*topic, byte(qos), retained, *payload)
if token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
//分发Object
func DistributeObject(topic *string, qos int, retained bool, obj interface{}) error {
payload, err := json.Marshal(obj)
if err != nil {
return err
}
return Distribute(topic, qos, retained, &payload)
}

37
test/mosquitto_test.go Normal file
View File

@ -0,0 +1,37 @@
package test
import (
"fmt"
"testing"
MQTT "github.com/eclipse/paho.mqtt.golang"
"myschools.me/suguo/snippet/mosquitto"
)
func TestSubscribe(t *testing.T) {
mosquitto.Init(&mosquitto.Config{})
if err := mosquitto.Subscribe(connHandler, connLostHandler); err != nil {
t.Fatal(err)
}
topic := "xtj/aaaa"
payload := []byte("sadfdsaf")
mosquitto.Distribute(&topic, 0, false, &payload)
select {}
}
var connHandler MQTT.OnConnectHandler = func(client MQTT.Client) {
token := client.Subscribe("#", 0, serviceHandler)
if token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
}
}
var connLostHandler MQTT.ConnectionLostHandler = func(client MQTT.Client, err error) {
fmt.Println(err.Error())
}
//具体业务订阅的处理,此处为示例
var serviceHandler MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Println(msg.Topic())
}