Compare commits

...

10 Commits

Author SHA1 Message Date
caiyuzheng 84d1cd38be no message 2021-05-31 00:34:28 +08:00
caiyuzheng 368e49ff71 no message 2020-12-04 22:06:28 +08:00
zcy 129c86e54f 文档更新 2020-08-27 16:49:38 +08:00
caiyuzheng d4a3ca3f8d 修改文档 2020-08-26 22:52:36 +08:00
caiyuzheng 1d463d5988 no message 2020-08-26 20:53:01 +08:00
caiyuzheng 048c3dcc07 no message 2020-08-26 20:52:26 +08:00
caiyuzheng 1af0420ebc no message 2020-08-24 02:07:50 +08:00
caiyuzheng 8a086a77b3 no message 2020-08-20 01:09:18 +08:00
caiyuzheng 801bdd69bd 添加安全tcp的实现 2020-08-19 20:37:47 +08:00
a7458969 47b72da442 添加secure tcp的使用说明 2020-08-18 15:10:55 +08:00
15 changed files with 213 additions and 227 deletions

17
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,17 @@
{
// 使 IntelliSense
//
// 访: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceRoot}/main.go",
"env": {},
"args": []
}
]
}

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"go.inferGopath": false
}

View File

@ -1,5 +1,4 @@
FROM alpine:latest
MAINTAINER caiyu "a7458969@gmail.com"
WORKDIR /home/ubuntu/api/bin
@ -9,4 +8,4 @@ RUN mkdir /lib64 && ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
EXPOSE 9850
ENTRYPOINT ["/bin/go-tcpFramework-rudy","--conf_path=/bin/TcpServConfig.json"]
ENTRYPOINT ["/bin/go-tcpFramework-rudy","--conf_path=/bin/TcpServConfig.json"]

View File

@ -1,24 +1,26 @@
## go-tcpFramework-rudy
#### 介绍
基于golang的tcp长连接框架--rudy。
#### 软件架构
tcp流转包服务端框架。。</br>
this repo is desperated...looks like websocket can do it natually.
#### 特性
功能:
提供连接数管理,支持熔断,热重启
提供路由管理,支持context化timeout和cancel
支持分布式,和主节点选举
支持apolo配置
心跳包支持
服务端推送支持
提供连接数管理,支持熔断,热重启。
支持 context 化回调机制。
##### 支持分布式,和主节点选举。
主节点的作用
心跳包支持。
服务端推送支持。
##### 可选并行回调或者同步回调方式。
并行回调就是异步回调,回调不保证执行得有序性。
同步回调一定会在上个回调函数执行完毕后执行。
#### How to use
```go
package main
import (
"flag"
"go-tcpFramework-rudy/config"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/network"
"tcptemplate/config"
"tcptemplate/logger"
"tcptemplate/network"
"log"
)
@ -38,17 +40,30 @@ func main() {
}
}
```
server 服务实例
server.SetHandler 设置处理句柄
server.Run() 服务启动
server.Run() 服务启动
### 包头格式文档(Package format)
```text
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0x40 | 0x41 | length[3] | length[2]
| 0x40 | 0x41 | length[3] | length[2]
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| length[1] | length[0] | verify |
| length[1] | length[0] | verify |
```
包头格式7个字节首个字节标识符40 41 ,然后包长度最后是verify是包头长度的校验和
包头格式 7 个字节,首个字节标识符 40 41 ,然后包长度,最后是 verify 是包头长度的校验和
### secure tcp
secure tcp 提供了类似于 tls,ssl 的机制的安全 tcp 连接,流程如下
</br>
</br>
![image.png](https://www.testingcloud.club/sapi/api/image_download/692bf256-e121-11ea-bc55-525400dc6cec.png)
</br>

View File

@ -13,6 +13,7 @@ type Config struct {
ID string `json:"id"`
Ipadress string `json:"ipadress"` //format like ip:port
Zkserver []string `json:"zkserver"`
Secure bool `json:"secure"` // 是否是安全tcp
}
var gConf *Config
@ -39,7 +40,7 @@ func InitConfig(path string) {
if e != nil {
log.Println(e.Error())
}
e = ioutil.WriteFile("TcpServConfig.json", buf, os.ModePerm)
e = ioutil.WriteFile("TcpServConfig.json", buf, os.ModePerm)
if nil != e {
logger.LogDebugError(e.Error())
return
@ -52,7 +53,11 @@ func InitConfig(path string) {
return
}
e := file.Close()
if nil != e{
if nil != e {
logger.LogDebugError(e.Error())
}
}
func IfSecure() bool {
return gConf.Secure
}

View File

@ -10,4 +10,5 @@ ssh -t -i ./id_rsa ubuntu@118.24.238.198 'cp /home/ubuntu/api/bin/go-tcpFramew
ssh -t -i ./id_rsa root@118.24.238.198 'cd /home/ubuntu/api/bin/;docker build ./ -t "caiyuzheng\rudy"'
ssh -t -i ./id_rsa ubuntu@118.24.238.198 'chmod 777 /home/ubuntu/api/bin/rudy'
ssh -t -i ./id_rsa ubuntu@118.24.238.198 '/home/ubuntu/api/bin/restart.sh rudy'
ssh -t -i ./id_rsa ubuntu@118.24.238.198 '/home/ubuntu/api/bin/restart.sh rudy'

2
go.mod
View File

@ -1,5 +1,7 @@
module go-tcpFramework-rudy
go 1.14
require (
github.com/davecgh/go-spew v1.1.1
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 // indirect

10
main.go
View File

@ -2,20 +2,20 @@ package main
import (
"flag"
"go-tcpFramework-rudy/config"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/network"
"log"
"runtime"
"tcptemplate/config"
"tcptemplate/logger"
"tcptemplate/network"
)
func main() {
if runtime.GOOS != `linux`{
if runtime.GOOS != `linux` {
log.Print("do not support " + runtime.GOOS + " platform")
//return
}
var confPath string
flag.StringVar(&confPath,"conf_path","TcpServConfig.json","the path of configuration file")
flag.StringVar(&confPath, "conf_path", "TcpServConfig.json", "the path of configuration file")
flag.Parse()
config.InitConfig(confPath)

View File

@ -4,30 +4,35 @@ import (
"bytes"
"encoding/binary"
"errors"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/util"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"tcptemplate/config"
"tcptemplate/logger"
"tcptemplate/util"
"time"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
)
type ConnectionContext struct {
context.Context
sockId string
Type string //连接类型有websocket普通的socket
Cnntime int64 //连接时间
IpAdress string //客户端ip
Port int32 //端口号
Cnn net.Conn //连接主体代码
wsCnn *websocket.Conn //
Business interface{} //业务代码,最好是一个结构体的形式.
h HandleFunction
sockId string
Type string //连接类型有websocket普通的socket
Cnntime int64 //连接时间
IpAdress string //客户端ip
Port int32 //端口号
Cnn net.Conn //连接主体代码
wsCnn *websocket.Conn //
Business interface{} //业务代码,最好是一个结构体的形式.
h HandleFunction
SecureAuth bool // secure tcp 是否已经协商秘钥
Pri []byte // secure tcp协商的私钥
}
type ConnectionManager struct {
@ -40,13 +45,16 @@ type ConnectionManager struct {
func withTimeoutConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
ctx, _ := context.WithTimeout(parent, timeOut)
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil,DefaultPackageHandler}
if config.IfSecure() { // secure tcp
}
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
return ret
}
func withCalcelConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
ctx, _ := context.WithCancel(parent)
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil,DefaultPackageHandler}
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
return ret
}
@ -54,7 +62,7 @@ var gConnMgr *ConnectionManager
func init() {
gConnMgr = new(ConnectionManager)
gConnMgr.connSet = make(map[string]ConnectionContext,1000)
gConnMgr.connSet = make(map[string]ConnectionContext, 1000)
gConnMgr.mutex = sync.Mutex{}
}

View File

@ -8,17 +8,18 @@ import (
_ "encoding/binary"
"errors"
_ "fmt"
"github.com/davecgh/go-spew/spew"
_ "github.com/golang/protobuf/proto"
"github.com/hprose/hprose-golang/io"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/util"
"golang.org/x/net/context"
"log"
"net"
"sync"
"tcptemplate/logger"
"tcptemplate/util"
"time"
_ "time"
"github.com/davecgh/go-spew/spew"
_ "github.com/golang/protobuf/proto"
"github.com/hprose/hprose-golang/io"
"golang.org/x/net/context"
)
const (
@ -122,19 +123,18 @@ func (this *UserError) Error() string {
}
func (this *PackageReceiever) Write(input []byte, inputlen int) error {
if inputlen == 0 {
return nil
}
// 有缓存中的数据
if this.pack_len > 0{
if this.pack_len > 0 {
copy(this.pack_data[this.pack_len:], input[0:inputlen])
this.pack_len += inputlen
if this.pack_data[0] != 0x40 || this.pack_data[1] != 0x41{
if this.pack_data[0] != 0x40 || this.pack_data[1] != 0x41 {
return ErrPackFormat
}
// verify
bpacklen := bytes.NewReader(this.pack_data[2 : 6])
bpacklen := bytes.NewReader(this.pack_data[2:6])
var packlen int32
var verify byte
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
@ -148,13 +148,13 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
return ErrorPackVerify
}
// 还没完全收到一整包数据
if packlen < int32(inputlen) - 7{
if packlen < int32(inputlen)-7 {
return nil
}
// 刚好等于一包数据
if packlen == int32(inputlen) - 7{
_,ok := <- this.cache_package
if !ok{
if packlen == int32(inputlen)-7 {
_, ok := <-this.cache_package
if !ok {
logger.LogDebugError("channel is closed")
return nil
}
@ -162,24 +162,24 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
this.pack_len = 0
}
// 有两个包
if packlen < int32(inputlen) - 7 {
_,ok := <- this.cache_package
if !ok{
if packlen < int32(inputlen)-7 {
_, ok := <-this.cache_package
if !ok {
logger.LogDebugError("channel is closed")
return nil
}
this.cache_package <- this.pack_data[0:inputlen]
this.pack_len = 0
this.Write(this.pack_data[inputlen:],this.pack_len - int(packlen)) //下个包再去重组包
this.Write(this.pack_data[inputlen:], this.pack_len-int(packlen)) //下个包再去重组包
}
}
// 没有未完成的组包
if this.pack_len == 0{
if input[0] != 0x40 || input[1] != 0x41{
if this.pack_len == 0 {
if input[0] != 0x40 || input[1] != 0x41 {
return ErrPackFormat
}
// verify
bpacklen := bytes.NewReader(input[2 : 6])
bpacklen := bytes.NewReader(input[2:6])
var packlen int32
var verify byte
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
@ -193,90 +193,24 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
return ErrorPackVerify
}
// 还没完全收到一整包数据
if packlen < int32(inputlen) - 7{
if packlen < int32(inputlen)-7 {
copy(this.pack_data[this.pack_len:], input[0:inputlen])
this.pack_len += inputlen
return nil;
return nil
}
// 刚好等于一包数据
if packlen == int32(inputlen) - 7{
if packlen == int32(inputlen)-7 {
this.cache_package <- input[0:inputlen]
this.pack_len = 0
}
// 有两个包
if packlen < int32(inputlen) - 7 {
if packlen < int32(inputlen)-7 {
this.cache_package <- input[0:inputlen]
this.pack_len = 0
this.Write(input[inputlen:],inputlen - int(packlen)) //下个包再去重组包
this.Write(input[inputlen:], inputlen-int(packlen)) //下个包再去重组包
}
}
/*
log.Println("recv data len:",inputlen)
if this.pack_len > 0 || (this.pack_len + inputlen < 7) {
//log.Println("unstorage data",this.pack_len)
copy(this.pack_data[this.pack_len:], input[0:inputlen])
//log.Println(this.pack_data)
this.pack_len += inputlen
unsort = this.pack_data
unsort_len = this.pack_len
} else {
unsort = input
unsort_len = inputlen
}
log.Print("this.pack_data length:",this.pack_len)
log.Print("unsort length:",unsort_len)
//只有当负载数据包大于包头的时候才进行解包
if unsort_len > 6 && unsort_len < util.MAX_PACKAGE_BUFFER {
//获取各个包头地址
for i := (0); i < unsort_len; {
//包头的时候刚好出现粘包的情况
if unsort_len - i < 7{
//直接拷贝到未解包缓冲区
copy(this.pack_data[0:], unsort[pack_start:unsort_len])
this.pack_len = unsort_len - pack_start
break
}
if unsort[0+i] != 0x40 || unsort[1+i] != 0x41 {
//log.Println("unsort",unsort[0:inputlen],i)
return ErrPackFormat
}
pack_start = i
bpacklen := bytes.NewReader(unsort[i+2 : i+6])
var packlen int32
var verify byte
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
if nil != e {
logger.LogDebugError(e.Error())
}
for _, v := range unsort[i+2 : i+6] {
verify += v
}
if verify != unsort[6] {
return ErrorPackVerify
}
//下个包头的地址
//log.Println("packlen:",packlen)
i += int(packlen) + 7
//如果i小于packlen证明出现了沾包的情况
if i <= unsort_len {
this.cache_package <- unsort[pack_start:i]
pack_start = i
this.pack_len = 0
} else {
//超过了当前缓冲区的话拷贝到缓存以便下次组包
copy(this.pack_data[0:], unsort[pack_start:unsort_len])
this.pack_len = unsort_len - pack_start
}
}
} else {
if unsort_len == 0 {
return nil
} else {
return ErrorPackTooLong
}
}*/
return nil
}
@ -301,7 +235,7 @@ func DefaultPackageHandler(ctx ProcessContext) {
}
}
func CloseConnection(c ConnectionContext,receiver *PackageReceiever) {
func CloseConnection(c ConnectionContext, receiver *PackageReceiever) {
e := c.Cnn.Close()
if nil != e {
logger.LogDebugError(e.Error())
@ -317,7 +251,8 @@ func CloseConnection(c ConnectionContext,receiver *PackageReceiever) {
func HandleConnection(c ConnectionContext) {
buf := make([]byte, PKG_MAX_LENGTH) // 这个1024可以根据你的消息长度来设置
packReceiever := PackageReceieverFactory()
pCtx := ProcessContext{c, packReceiever, &c.Cnn,c.h}
pCtx := ProcessContext{c, packReceiever, &c.Cnn, c.h}
for {
for !packReceiever.PackageCleared() {
//修改handler 机制
@ -331,29 +266,24 @@ func HandleConnection(c ConnectionContext) {
var err error
recvd, err = c.Cnn.Read(buf)
log.Print(recvd)
for recvd < 7{
for recvd < 7 {
smallbuf := make([]byte, 100) // 这个1024可以根据你的消息长度来设置
var n int
c.Cnn.SetReadDeadline(time.Now().Add(time.Second))
n, err = c.Cnn.Read(smallbuf)
if err != nil {
if nerr, ok := err.(net.Error); ok && !nerr.Temporary() {
CloseConnection(c,packReceiever)
CloseConnection(c, packReceiever)
return
}
}
if (n > 0){
log.Print(n)
if n > 0 {
logger.LogRealeaseInfo(smallbuf)
copy(buf[recvd:],smallbuf[0:n])
copy(buf[recvd:], smallbuf[0:n])
recvd += n
}
}
//var req protocol.Request
logger.LogRealeaseInfo(buf)
err = packReceiever.Write(buf, recvd)
//注意用了chanel不要在同一个携程里面进行插入和取出以免造成死锁。
if err != nil {

View File

@ -4,19 +4,21 @@ import (
"bytes"
"encoding/json"
"fmt"
zkdriver "github.com/samuel/go-zookeeper/zk"
"github.com/satori/go.uuid"
"go-tcpFramework-rudy/config"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/util"
"go-tcpFramework-rudy/zk"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
"gobase/utils"
"log"
"net"
"net/http"
"os"
"tcptemplate/config"
"tcptemplate/logger"
"tcptemplate/util"
"tcptemplate/zk"
"time"
zkdriver "github.com/samuel/go-zookeeper/zk"
uuid "github.com/satori/go.uuid"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
)
const TYPE_TCP_SERVER = 1
@ -26,7 +28,7 @@ type Server struct {
listener *net.TCPListener
servrType int32
cMgr ConnectionManager
h HandleFunction
h HandleFunction
}
func badCall() {
@ -34,7 +36,7 @@ func badCall() {
}
func ServerFactory(adress string) Server {
ret := Server{Address:adress,listener:nil,servrType:1,cMgr:*GetConnMgr(),h:DefaultPackageHandler}
ret := Server{Address: adress, listener: nil, servrType: 1, cMgr: *GetConnMgr(), h: DefaultPackageHandler}
return ret
}
@ -53,14 +55,19 @@ func websocketHandler(ws *websocket.Conn) {
}
logger.LogRealeaseInfo("accept conn", ws.RemoteAddr().String(), ws.LocalAddr().String())
if config.IfSecure() {
// 首先协商秘钥
pub, pri := utils.GenRsaKey(1024)
clicnn.Cnn.Write(pub)
clicnn.Pri = pri
}
go HandleConnection(clicnn)
}
func (this *Server) SetIpAddress(Address string) {
this.Address = Address
}
func (this *Server)SetHandler(h HandleFunction) {
func (this *Server) SetHandler(h HandleFunction) {
this.h = h
}
@ -100,7 +107,7 @@ func (this *Server) Run() error {
logger.LogDebugError(err.Error())
}
for {
c,err := this.listener.AcceptTCP()
c, err := this.listener.AcceptTCP()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
fmt.Println("Stop accepting connections")
@ -113,13 +120,13 @@ func (this *Server) Run() error {
ctxCnn.IpAdress = GetIpAddr(c.RemoteAddr().String())
ctxCnn.Cnntime = time.Now().Unix()
ctxCnn.Cnn = c
sockid,_ := uuid.NewV4()
sockid, _ := uuid.NewV4()
ctxCnn.sockId = sockid.String()
e := GetConnMgr().AddConn(ctxCnn)
if nil != e {
logger.LogDebugError(e.Error())
}
logger.LogRealeaseInfo("accept conn", c.RemoteAddr().String(), c.LocalAddr().String()," socket id:",ctxCnn.sockId)
logger.LogRealeaseInfo("accept conn", c.RemoteAddr().String(), c.LocalAddr().String(), " socket id:", ctxCnn.sockId)
if err != nil {
logger.LogDebugError(err.Error())
continue

View File

@ -3,19 +3,19 @@ package test
import (
"bytes"
"encoding/binary"
_ "github.com/golang/protobuf/proto"
_ "github.com/hprose/hprose-golang/io"
_ "github.com/trysh/ttb"
_ "go-tcpFramework-rudy/protocol/pb"
_ "io"
"log"
"net"
_ "net/http/pprof"
_ "runtime"
_ "tcptemplate/protocol/pb"
"testing"
"time"
)
_ "github.com/golang/protobuf/proto"
_ "github.com/hprose/hprose-golang/io"
_ "github.com/trysh/ttb"
)
/*
tcp
@ -61,7 +61,7 @@ func SendPackage(writer *net.TCPConn, payload []byte, length int) error {
//log.Println("Send length",wb)
b = append(b, packbuf.Bytes()...)
b = append(b, payload[0:length]...)
log.Println(b[0:5],b[5:])
log.Println(b[0:5], b[5:])
_, e = writer.Write(b[0:5])
if e != nil {
log.Println(e.Error())

View File

@ -1,34 +1,35 @@
package test
import (
"go-tcpFramework-rudy/tools/mq"
"qiniupkg.com/x/log.v7"
"tcptemplate/tools/mq"
"testing"
"time"
"qiniupkg.com/x/log.v7"
)
func Callback(key string,msg string) {
log.Print(key,msg)
func Callback(key string, msg string) {
log.Print(key, msg)
}
func TestMq(t *testing.T) {
func TestMq(t *testing.T) {
cli := mq.RedisMqClient{}
e := cli.Connect("49.235.25.67:16379",0,"")
if nil != e{
e := cli.Connect("49.235.25.67:16379", 0, "")
if nil != e {
log.Print(e.Error())
t.Error(e)
}
cli.Subscribe("msg",Callback)
cli.Public("msg","shit")
cli.Public("msg","shit2")
cli.Public("msg","shit3")
cli.Subscribe("msg", Callback)
cli.Public("msg", "shit")
cli.Public("msg", "shit2")
cli.Public("msg", "shit3")
mqcli := mq.MqttClient{}
e = mqcli.Connect("tcp://" + "49.235.25.67:1883",0,"122")
e = mqcli.Connect("tcp://"+"49.235.25.67:1883", 0, "122")
log.Print(e)
e = mqcli.Subscribe("/dev",Callback)
e = mqcli.Subscribe("/dev", Callback)
log.Print(e)
mqcli.Public("/dev","shit")
for true{
mqcli.Public("/dev", "shit")
for true {
time.Sleep(100)
}
}
}

View File

@ -2,40 +2,37 @@ package mq
import (
"fmt"
"github.com/pkg/errors"
"go-tcpFramework-rudy/util"
"gopkg.in/redis.v3"
"io"
"tcptemplate/util"
"github.com/pkg/errors"
"gopkg.in/redis.v3"
"qiniupkg.com/x/log.v7"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type CallBack func(string,string)
type CallBack func(string, string)
//通用mq库支持发布订阅
type MqClient interface {
Subscribe(topic string,callback *CallBack) error
Public(topic string,data string) error
Subscribe(topic string, callback *CallBack) error
Public(topic string, data string) error
}
type RedisMqClient struct {
client *redis.Client
Address string
client *redis.Client
Address string
Password string
}
type MqttClient struct {
client MQTT.Client
Address string
type MqttClient struct {
client MQTT.Client
Address string
ClientId string
Options *MQTT.ClientOptions
Options *MQTT.ClientOptions
}
func (this *MqttClient) Connect(ip string,port int64,id string) error {
func (this *MqttClient) Connect(ip string, port int64, id string) error {
this.Address = ip
opts := MQTT.NewClientOptions().AddBroker(ip)
@ -49,7 +46,7 @@ func (this *MqttClient) Connect(ip string,port int64,id string) error {
return nil
}
func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
func (this *RedisMqClient) Connect(ip string, port int64, passwd string) error {
option := &redis.Options{
Addr: ip,
Password: passwd,
@ -70,11 +67,11 @@ func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
}
return nil
}
func (this *MqttClient) Subscribe(topic string,callback CallBack) error {
func (this *MqttClient) Subscribe(topic string, callback CallBack) error {
handler := func(c MQTT.Client, msg MQTT.Message) {
msg.Payload()
msg.Topic()
callback(msg.Topic(),string(msg.Payload()))
callback(msg.Topic(), string(msg.Payload()))
}
opts := MQTT.NewClientOptions().AddBroker(this.Address)
opts.SetClientID(this.ClientId)
@ -86,33 +83,33 @@ func (this *MqttClient) Subscribe(topic string,callback CallBack) error {
if token := this.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
this.client.Subscribe(topic,0,handler)
this.client.Subscribe(topic, 0, handler)
return nil
}
func (this *RedisMqClient) Subscribe(topic string,callback CallBack) error {
if nil == this{
func (this *RedisMqClient) Subscribe(topic string, callback CallBack) error {
if nil == this {
return errors.New(util.ERR_NULL_POINTER)
}
pubsub, err := this.client.Subscribe(topic)
if nil != err{
if nil != err {
return err
}
x := make(chan bool,1)
x := make(chan bool, 1)
go func() {
x <- true
defer pubsub.Close()
for true{
for true {
msg, err := pubsub.ReceiveMessage()
if nil != err{
if nil != err {
log.Print(err)
if err == io.EOF{
if err == io.EOF {
return
}
}
callback(topic,msg.Payload)
callback(topic, msg.Payload)
}
}()
<- x
<-x
return nil
}
@ -133,45 +130,45 @@ func creatRedisClient(option *redis.Options) (*redis.Client, error) {
return client, nil
}
func (this *RedisMqClient) Public(topic string,data string) error {
if nil == this{
func (this *RedisMqClient) Public(topic string, data string) error {
if nil == this {
return errors.New(util.ERR_NULL_POINTER)
}
// 检测client有效性
if nil != this.client {
_, err := this.client.Ping().Result()
_, err := this.client.Ping().Result()
if nil != err {
this.client.Close()
// 尝试3次重连
for i := 0; i < 3; i++ {
this.client, err = creatRedisClient(&redis.Options{
Addr:this.Address,
Password:this.Password,
Addr: this.Address,
Password: this.Password,
})
if this.client != nil {
if this.client != nil {
}
}
}
}
_, err := this.client.Publish(topic, data).Result()
if nil != err{
if nil != err {
log.Print(err)
return err
}
return nil
}
func (this *MqttClient) Public(topic string,data string) error {
if nil == this{
func (this *MqttClient) Public(topic string, data string) error {
if nil == this {
return errors.New(util.ERR_NULL_POINTER)
}
// 检测client有效性
if true != this.client.IsConnected() {
}
token := this.client.Publish(topic,0, false,data)
token := this.client.Publish(topic, 0, false, data)
token.Wait()
if nil != token.Error(){
if nil != token.Error() {
log.Print(token.Error())
return token.Error()
}

View File

@ -2,12 +2,13 @@ package zk
import (
_ "bytes"
zk "github.com/samuel/go-zookeeper/zk"
"go-tcpFramework-rudy/config"
"log"
"os"
_ "strings"
"tcptemplate/config"
"time"
zk "github.com/samuel/go-zookeeper/zk"
)
var g_zkCnn *zk.Conn