mq工具添加mqtt的支持

master
a7458969 2020-04-07 18:24:54 +08:00
parent e1fc6f0c0b
commit 049064483f
2 changed files with 37 additions and 0 deletions

View File

@ -19,6 +19,10 @@ func TestMq(t *testing.T) {
}
cli.Subscribe("msg",Callback)
cli.Public("msg","shit")
cli.Public("msg","shit2")
cli.Public("msg","shit3")
for true{
time.Sleep(100)
}

View File

@ -8,6 +8,8 @@ import (
"io"
"qiniupkg.com/x/log.v7"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
@ -26,6 +28,24 @@ type RedisMqClient struct {
Address string
Password string
}
type MqttClient struct {
client MQTT.Client
Address string
ClientId string
Options *MQTT.ClientOptions
}
func (this *MqttClient) Connect(ip string,port int64,id string) error {
opts := MQTT.NewClientOptions().AddBroker(ip)
opts.SetClientID(id)
this.client = MQTT.NewClient(opts)
if token := this.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
option := &redis.Options{
@ -48,7 +68,20 @@ func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
}
return nil
}
func (this *MqttClient) Subscribe(topic string,callback CallBack) error {
handler := func(c MQTT.Client, msg MQTT.Message) {
msg.Payload()
msg.Topic()
callback(msg.Topic(),string(msg.Payload()))
}
opts := MQTT.NewClientOptions().AddBroker(this.Address)
opts.SetClientID(this.ClientId)
opts.SetDefaultPublishHandler(handler)
this.Options = opts
this.client = MQTT.NewClient(opts)
return nil
}
func (this *RedisMqClient) Subscribe(topic string,callback CallBack) error {
if nil == this{
return errors.New(util.ERR_NULL_POINTER)