no message

master
caiyuzheng 2020-08-20 01:09:18 +08:00
parent 801bdd69bd
commit 8a086a77b3
9 changed files with 82 additions and 81 deletions

BIN
__debug_bin Normal file

Binary file not shown.

View File

@ -2,11 +2,11 @@ package main
import ( import (
"flag" "flag"
"go-tcpFramework-rudy/config"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/network"
"log" "log"
"runtime" "runtime"
"tcpteamplate/config"
"tcpteamplate/logger"
"tcpteamplate/network"
) )
func main() { func main() {

View File

@ -4,14 +4,16 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/util"
"log" "log"
"net" "net"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"tcpteamplate/config"
"tcpteamplate/logger"
"tcpteamplate/util"
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -43,13 +45,16 @@ type ConnectionManager struct {
func withTimeoutConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext { func withTimeoutConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
ctx, _ := context.WithTimeout(parent, timeOut) ctx, _ := context.WithTimeout(parent, timeOut)
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler} if config.IfSecure() {
}
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
return ret return ret
} }
func withCalcelConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext { func withCalcelConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
ctx, _ := context.WithCancel(parent) ctx, _ := context.WithCancel(parent)
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler} ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
return ret return ret
} }

View File

@ -8,17 +8,14 @@ import (
_ "encoding/binary" _ "encoding/binary"
"errors" "errors"
_ "fmt" _ "fmt"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/util"
"log" "log"
"net" "net"
"sync" "sync"
"tcpteamplate/config" "tcpteamplate/logger"
"tcpteamplate/util"
"time" "time"
_ "time" _ "time"
"gobase/utils"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
_ "github.com/golang/protobuf/proto" _ "github.com/golang/protobuf/proto"
"github.com/hprose/hprose-golang/io" "github.com/hprose/hprose-golang/io"

View File

@ -4,15 +4,15 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"go-tcpFramework-rudy/config"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/util"
"go-tcpFramework-rudy/zk"
"gobase/utils" "gobase/utils"
"log" "log"
"net" "net"
"net/http" "net/http"
"os" "os"
"tcpteamplate/config"
"tcpteamplate/logger"
"tcpteamplate/util"
"tcpteamplate/zk"
"time" "time"
zkdriver "github.com/samuel/go-zookeeper/zk" zkdriver "github.com/samuel/go-zookeeper/zk"

View File

@ -3,19 +3,19 @@ package test
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
_ "github.com/golang/protobuf/proto"
_ "github.com/hprose/hprose-golang/io"
_ "github.com/trysh/ttb"
_ "go-tcpFramework-rudy/protocol/pb"
_ "io" _ "io"
"log" "log"
"net" "net"
_ "net/http/pprof" _ "net/http/pprof"
_ "runtime" _ "runtime"
_ "tcpteamplate/protocol/pb"
"testing" "testing"
"time" "time"
)
_ "github.com/golang/protobuf/proto"
_ "github.com/hprose/hprose-golang/io"
_ "github.com/trysh/ttb"
)
/* /*
tcp tcp
@ -61,7 +61,7 @@ func SendPackage(writer *net.TCPConn, payload []byte, length int) error {
//log.Println("Send length",wb) //log.Println("Send length",wb)
b = append(b, packbuf.Bytes()...) b = append(b, packbuf.Bytes()...)
b = append(b, payload[0:length]...) b = append(b, payload[0:length]...)
log.Println(b[0:5],b[5:]) log.Println(b[0:5], b[5:])
_, e = writer.Write(b[0:5]) _, e = writer.Write(b[0:5])
if e != nil { if e != nil {
log.Println(e.Error()) log.Println(e.Error())

View File

@ -1,34 +1,35 @@
package test package test
import ( import (
"go-tcpFramework-rudy/tools/mq" "tcpteamplate/tools/mq"
"qiniupkg.com/x/log.v7"
"testing" "testing"
"time" "time"
"qiniupkg.com/x/log.v7"
) )
func Callback(key string,msg string) { func Callback(key string, msg string) {
log.Print(key,msg) log.Print(key, msg)
} }
func TestMq(t *testing.T) { func TestMq(t *testing.T) {
cli := mq.RedisMqClient{} cli := mq.RedisMqClient{}
e := cli.Connect("49.235.25.67:16379",0,"") e := cli.Connect("49.235.25.67:16379", 0, "")
if nil != e{ if nil != e {
log.Print(e.Error()) log.Print(e.Error())
t.Error(e) t.Error(e)
} }
cli.Subscribe("msg",Callback) cli.Subscribe("msg", Callback)
cli.Public("msg","shit") cli.Public("msg", "shit")
cli.Public("msg","shit2") cli.Public("msg", "shit2")
cli.Public("msg","shit3") cli.Public("msg", "shit3")
mqcli := mq.MqttClient{} mqcli := mq.MqttClient{}
e = mqcli.Connect("tcp://" + "49.235.25.67:1883",0,"122") e = mqcli.Connect("tcp://"+"49.235.25.67:1883", 0, "122")
log.Print(e) log.Print(e)
e = mqcli.Subscribe("/dev",Callback) e = mqcli.Subscribe("/dev", Callback)
log.Print(e) log.Print(e)
mqcli.Public("/dev","shit") mqcli.Public("/dev", "shit")
for true{ for true {
time.Sleep(100) time.Sleep(100)
} }
} }

View File

@ -2,40 +2,37 @@ package mq
import ( import (
"fmt" "fmt"
"github.com/pkg/errors"
"go-tcpFramework-rudy/util"
"gopkg.in/redis.v3"
"io" "io"
"tcpteamplate/util"
"github.com/pkg/errors"
"gopkg.in/redis.v3"
"qiniupkg.com/x/log.v7" "qiniupkg.com/x/log.v7"
MQTT "github.com/eclipse/paho.mqtt.golang" MQTT "github.com/eclipse/paho.mqtt.golang"
) )
type CallBack func(string, string)
type CallBack func(string,string)
//通用mq库支持发布订阅 //通用mq库支持发布订阅
type MqClient interface { type MqClient interface {
Subscribe(topic string,callback *CallBack) error Subscribe(topic string, callback *CallBack) error
Public(topic string,data string) error Public(topic string, data string) error
} }
type RedisMqClient struct { type RedisMqClient struct {
client *redis.Client client *redis.Client
Address string Address string
Password string Password string
} }
type MqttClient struct { type MqttClient struct {
client MQTT.Client client MQTT.Client
Address string Address string
ClientId string ClientId string
Options *MQTT.ClientOptions Options *MQTT.ClientOptions
} }
func (this *MqttClient) Connect(ip string,port int64,id string) error { func (this *MqttClient) Connect(ip string, port int64, id string) error {
this.Address = ip this.Address = ip
opts := MQTT.NewClientOptions().AddBroker(ip) opts := MQTT.NewClientOptions().AddBroker(ip)
@ -49,7 +46,7 @@ func (this *MqttClient) Connect(ip string,port int64,id string) error {
return nil return nil
} }
func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error { func (this *RedisMqClient) Connect(ip string, port int64, passwd string) error {
option := &redis.Options{ option := &redis.Options{
Addr: ip, Addr: ip,
Password: passwd, Password: passwd,
@ -70,11 +67,11 @@ func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
} }
return nil return nil
} }
func (this *MqttClient) Subscribe(topic string,callback CallBack) error { func (this *MqttClient) Subscribe(topic string, callback CallBack) error {
handler := func(c MQTT.Client, msg MQTT.Message) { handler := func(c MQTT.Client, msg MQTT.Message) {
msg.Payload() msg.Payload()
msg.Topic() msg.Topic()
callback(msg.Topic(),string(msg.Payload())) callback(msg.Topic(), string(msg.Payload()))
} }
opts := MQTT.NewClientOptions().AddBroker(this.Address) opts := MQTT.NewClientOptions().AddBroker(this.Address)
opts.SetClientID(this.ClientId) opts.SetClientID(this.ClientId)
@ -86,33 +83,33 @@ func (this *MqttClient) Subscribe(topic string,callback CallBack) error {
if token := this.client.Connect(); token.Wait() && token.Error() != nil { if token := this.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error() return token.Error()
} }
this.client.Subscribe(topic,0,handler) this.client.Subscribe(topic, 0, handler)
return nil return nil
} }
func (this *RedisMqClient) Subscribe(topic string,callback CallBack) error { func (this *RedisMqClient) Subscribe(topic string, callback CallBack) error {
if nil == this{ if nil == this {
return errors.New(util.ERR_NULL_POINTER) return errors.New(util.ERR_NULL_POINTER)
} }
pubsub, err := this.client.Subscribe(topic) pubsub, err := this.client.Subscribe(topic)
if nil != err{ if nil != err {
return err return err
} }
x := make(chan bool,1) x := make(chan bool, 1)
go func() { go func() {
x <- true x <- true
defer pubsub.Close() defer pubsub.Close()
for true{ for true {
msg, err := pubsub.ReceiveMessage() msg, err := pubsub.ReceiveMessage()
if nil != err{ if nil != err {
log.Print(err) log.Print(err)
if err == io.EOF{ if err == io.EOF {
return return
} }
} }
callback(topic,msg.Payload) callback(topic, msg.Payload)
} }
}() }()
<- x <-x
return nil return nil
} }
@ -133,45 +130,45 @@ func creatRedisClient(option *redis.Options) (*redis.Client, error) {
return client, nil return client, nil
} }
func (this *RedisMqClient) Public(topic string,data string) error { func (this *RedisMqClient) Public(topic string, data string) error {
if nil == this{ if nil == this {
return errors.New(util.ERR_NULL_POINTER) return errors.New(util.ERR_NULL_POINTER)
} }
// 检测client有效性 // 检测client有效性
if nil != this.client { if nil != this.client {
_, err := this.client.Ping().Result() _, err := this.client.Ping().Result()
if nil != err { if nil != err {
this.client.Close() this.client.Close()
// 尝试3次重连 // 尝试3次重连
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
this.client, err = creatRedisClient(&redis.Options{ this.client, err = creatRedisClient(&redis.Options{
Addr:this.Address, Addr: this.Address,
Password:this.Password, Password: this.Password,
}) })
if this.client != nil { if this.client != nil {
} }
} }
} }
} }
_, err := this.client.Publish(topic, data).Result() _, err := this.client.Publish(topic, data).Result()
if nil != err{ if nil != err {
log.Print(err) log.Print(err)
return err return err
} }
return nil return nil
} }
func (this *MqttClient) Public(topic string,data string) error { func (this *MqttClient) Public(topic string, data string) error {
if nil == this{ if nil == this {
return errors.New(util.ERR_NULL_POINTER) return errors.New(util.ERR_NULL_POINTER)
} }
// 检测client有效性 // 检测client有效性
if true != this.client.IsConnected() { if true != this.client.IsConnected() {
} }
token := this.client.Publish(topic,0, false,data) token := this.client.Publish(topic, 0, false, data)
token.Wait() token.Wait()
if nil != token.Error(){ if nil != token.Error() {
log.Print(token.Error()) log.Print(token.Error())
return token.Error() return token.Error()
} }

View File

@ -2,12 +2,13 @@ package zk
import ( import (
_ "bytes" _ "bytes"
zk "github.com/samuel/go-zookeeper/zk"
"go-tcpFramework-rudy/config"
"log" "log"
"os" "os"
_ "strings" _ "strings"
"tcpteamplate/config"
"time" "time"
zk "github.com/samuel/go-zookeeper/zk"
) )
var g_zkCnn *zk.Conn var g_zkCnn *zk.Conn