Compare commits
10 Commits
ef03c1e8e4
...
84d1cd38be
Author | SHA1 | Date |
---|---|---|
caiyuzheng | 84d1cd38be | |
caiyuzheng | 368e49ff71 | |
zcy | 129c86e54f | |
caiyuzheng | d4a3ca3f8d | |
caiyuzheng | 1d463d5988 | |
caiyuzheng | 048c3dcc07 | |
caiyuzheng | 1af0420ebc | |
caiyuzheng | 8a086a77b3 | |
caiyuzheng | 801bdd69bd | |
a7458969 | 47b72da442 |
|
@ -0,0 +1,17 @@
|
||||||
|
{
|
||||||
|
// 使用 IntelliSense 了解相关属性。
|
||||||
|
// 悬停以查看现有属性的描述。
|
||||||
|
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "Launch",
|
||||||
|
"type": "go",
|
||||||
|
"request": "launch",
|
||||||
|
"mode": "auto",
|
||||||
|
"program": "${workspaceRoot}/main.go",
|
||||||
|
"env": {},
|
||||||
|
"args": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -0,0 +1,3 @@
|
||||||
|
{
|
||||||
|
"go.inferGopath": false
|
||||||
|
}
|
|
@ -1,5 +1,4 @@
|
||||||
FROM alpine:latest
|
FROM alpine:latest
|
||||||
MAINTAINER caiyu "a7458969@gmail.com"
|
|
||||||
|
|
||||||
WORKDIR /home/ubuntu/api/bin
|
WORKDIR /home/ubuntu/api/bin
|
||||||
|
|
||||||
|
@ -9,4 +8,4 @@ RUN mkdir /lib64 && ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
|
||||||
|
|
||||||
EXPOSE 9850
|
EXPOSE 9850
|
||||||
|
|
||||||
ENTRYPOINT ["/bin/go-tcpFramework-rudy","--conf_path=/bin/TcpServConfig.json"]
|
ENTRYPOINT ["/bin/go-tcpFramework-rudy","--conf_path=/bin/TcpServConfig.json"]
|
||||||
|
|
51
README.md
51
README.md
|
@ -1,24 +1,26 @@
|
||||||
## go-tcpFramework-rudy
|
|
||||||
#### 介绍
|
#### 介绍
|
||||||
基于golang的tcp长连接框架--rudy。
|
tcp流转包服务端框架。。</br>
|
||||||
|
this repo is desperated...looks like websocket can do it natually.
|
||||||
#### 软件架构
|
#### 特性
|
||||||
功能:
|
功能:
|
||||||
提供连接数管理,支持熔断,热重启
|
提供连接数管理,支持熔断,热重启。
|
||||||
提供路由管理,支持context化timeout和cancel
|
支持 context 化回调机制。
|
||||||
支持分布式,和主节点选举
|
##### 支持分布式,和主节点选举。
|
||||||
支持apolo配置
|
主节点的作用
|
||||||
心跳包支持
|
心跳包支持。
|
||||||
服务端推送支持
|
服务端推送支持。
|
||||||
|
##### 可选并行回调或者同步回调方式。
|
||||||
|
并行回调就是异步回调,回调不保证执行得有序性。
|
||||||
|
同步回调一定会在上个回调函数执行完毕后执行。
|
||||||
#### How to use
|
#### How to use
|
||||||
```go
|
```go
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"go-tcpFramework-rudy/config"
|
"tcptemplate/config"
|
||||||
"go-tcpFramework-rudy/logger"
|
"tcptemplate/logger"
|
||||||
"go-tcpFramework-rudy/network"
|
"tcptemplate/network"
|
||||||
"log"
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -38,17 +40,30 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
server 服务实例
|
server 服务实例
|
||||||
server.SetHandler 设置处理句柄
|
server.SetHandler 设置处理句柄
|
||||||
server.Run() 服务启动
|
server.Run() 服务启动
|
||||||
|
|
||||||
### 包头格式文档(Package format)
|
### 包头格式文档(Package format)
|
||||||
|
|
||||||
```text
|
```text
|
||||||
0 1 2 3
|
0 1 2 3
|
||||||
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||||
| 0x40 | 0x41 | length[3] | length[2]
|
| 0x40 | 0x41 | length[3] | length[2]
|
||||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||||
| length[1] | length[0] | verify |
|
| length[1] | length[0] | verify |
|
||||||
|
|
||||||
```
|
```
|
||||||
包头格式7个字节,首个字节标识符40 41 ,然后包长度,最后是verify是包头长度的校验和
|
|
||||||
|
包头格式 7 个字节,首个字节标识符 40 41 ,然后包长度,最后是 verify 是包头长度的校验和
|
||||||
|
|
||||||
|
### secure tcp
|
||||||
|
secure tcp 提供了类似于 tls,ssl 的机制的安全 tcp 连接,流程如下
|
||||||
|
</br>
|
||||||
|
</br>
|
||||||
|
|
||||||
|
![image.png](https://www.testingcloud.club/sapi/api/image_download/692bf256-e121-11ea-bc55-525400dc6cec.png)
|
||||||
|
</br>
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ type Config struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
Ipadress string `json:"ipadress"` //format like ip:port
|
Ipadress string `json:"ipadress"` //format like ip:port
|
||||||
Zkserver []string `json:"zkserver"`
|
Zkserver []string `json:"zkserver"`
|
||||||
|
Secure bool `json:"secure"` // 是否是安全tcp
|
||||||
}
|
}
|
||||||
|
|
||||||
var gConf *Config
|
var gConf *Config
|
||||||
|
@ -39,7 +40,7 @@ func InitConfig(path string) {
|
||||||
if e != nil {
|
if e != nil {
|
||||||
log.Println(e.Error())
|
log.Println(e.Error())
|
||||||
}
|
}
|
||||||
e = ioutil.WriteFile("TcpServConfig.json", buf, os.ModePerm)
|
e = ioutil.WriteFile("TcpServConfig.json", buf, os.ModePerm)
|
||||||
if nil != e {
|
if nil != e {
|
||||||
logger.LogDebugError(e.Error())
|
logger.LogDebugError(e.Error())
|
||||||
return
|
return
|
||||||
|
@ -52,7 +53,11 @@ func InitConfig(path string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
e := file.Close()
|
e := file.Close()
|
||||||
if nil != e{
|
if nil != e {
|
||||||
logger.LogDebugError(e.Error())
|
logger.LogDebugError(e.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func IfSecure() bool {
|
||||||
|
return gConf.Secure
|
||||||
|
}
|
||||||
|
|
|
@ -10,4 +10,5 @@ ssh -t -i ./id_rsa ubuntu@118.24.238.198 'cp /home/ubuntu/api/bin/go-tcpFramew
|
||||||
|
|
||||||
ssh -t -i ./id_rsa root@118.24.238.198 'cd /home/ubuntu/api/bin/;docker build ./ -t "caiyuzheng\rudy"'
|
ssh -t -i ./id_rsa root@118.24.238.198 'cd /home/ubuntu/api/bin/;docker build ./ -t "caiyuzheng\rudy"'
|
||||||
ssh -t -i ./id_rsa ubuntu@118.24.238.198 'chmod 777 /home/ubuntu/api/bin/rudy'
|
ssh -t -i ./id_rsa ubuntu@118.24.238.198 'chmod 777 /home/ubuntu/api/bin/rudy'
|
||||||
ssh -t -i ./id_rsa ubuntu@118.24.238.198 '/home/ubuntu/api/bin/restart.sh rudy'
|
ssh -t -i ./id_rsa ubuntu@118.24.238.198 '/home/ubuntu/api/bin/restart.sh rudy'
|
||||||
|
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -1,5 +1,7 @@
|
||||||
module go-tcpFramework-rudy
|
module go-tcpFramework-rudy
|
||||||
|
|
||||||
|
go 1.14
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/davecgh/go-spew v1.1.1
|
github.com/davecgh/go-spew v1.1.1
|
||||||
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 // indirect
|
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 // indirect
|
||||||
|
|
10
main.go
10
main.go
|
@ -2,20 +2,20 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"go-tcpFramework-rudy/config"
|
|
||||||
"go-tcpFramework-rudy/logger"
|
|
||||||
"go-tcpFramework-rudy/network"
|
|
||||||
"log"
|
"log"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"tcptemplate/config"
|
||||||
|
"tcptemplate/logger"
|
||||||
|
"tcptemplate/network"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
if runtime.GOOS != `linux`{
|
if runtime.GOOS != `linux` {
|
||||||
log.Print("do not support " + runtime.GOOS + " platform")
|
log.Print("do not support " + runtime.GOOS + " platform")
|
||||||
//return
|
//return
|
||||||
}
|
}
|
||||||
var confPath string
|
var confPath string
|
||||||
flag.StringVar(&confPath,"conf_path","TcpServConfig.json","the path of configuration file")
|
flag.StringVar(&confPath, "conf_path", "TcpServConfig.json", "the path of configuration file")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
config.InitConfig(confPath)
|
config.InitConfig(confPath)
|
||||||
|
|
||||||
|
|
|
@ -4,30 +4,35 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"go-tcpFramework-rudy/logger"
|
|
||||||
"go-tcpFramework-rudy/util"
|
|
||||||
"golang.org/x/net/context"
|
|
||||||
"golang.org/x/net/websocket"
|
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"tcptemplate/config"
|
||||||
|
"tcptemplate/logger"
|
||||||
|
"tcptemplate/util"
|
||||||
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"golang.org/x/net/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ConnectionContext struct {
|
type ConnectionContext struct {
|
||||||
context.Context
|
context.Context
|
||||||
sockId string
|
sockId string
|
||||||
Type string //连接类型,有websocket,普通的socket
|
Type string //连接类型,有websocket,普通的socket
|
||||||
Cnntime int64 //连接时间
|
Cnntime int64 //连接时间
|
||||||
IpAdress string //客户端ip
|
IpAdress string //客户端ip
|
||||||
Port int32 //端口号
|
Port int32 //端口号
|
||||||
Cnn net.Conn //连接主体代码
|
Cnn net.Conn //连接主体代码
|
||||||
wsCnn *websocket.Conn //
|
wsCnn *websocket.Conn //
|
||||||
Business interface{} //业务代码,最好是一个结构体的形式.
|
Business interface{} //业务代码,最好是一个结构体的形式.
|
||||||
h HandleFunction
|
h HandleFunction
|
||||||
|
SecureAuth bool // secure tcp 是否已经协商秘钥
|
||||||
|
Pri []byte // secure tcp协商的私钥
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConnectionManager struct {
|
type ConnectionManager struct {
|
||||||
|
@ -40,13 +45,16 @@ type ConnectionManager struct {
|
||||||
|
|
||||||
func withTimeoutConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
|
func withTimeoutConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
|
||||||
ctx, _ := context.WithTimeout(parent, timeOut)
|
ctx, _ := context.WithTimeout(parent, timeOut)
|
||||||
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil,DefaultPackageHandler}
|
if config.IfSecure() { // secure tcp
|
||||||
|
|
||||||
|
}
|
||||||
|
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func withCalcelConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
|
func withCalcelConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
|
||||||
ctx, _ := context.WithCancel(parent)
|
ctx, _ := context.WithCancel(parent)
|
||||||
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil,DefaultPackageHandler}
|
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +62,7 @@ var gConnMgr *ConnectionManager
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
gConnMgr = new(ConnectionManager)
|
gConnMgr = new(ConnectionManager)
|
||||||
gConnMgr.connSet = make(map[string]ConnectionContext,1000)
|
gConnMgr.connSet = make(map[string]ConnectionContext, 1000)
|
||||||
gConnMgr.mutex = sync.Mutex{}
|
gConnMgr.mutex = sync.Mutex{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
136
network/proto.go
136
network/proto.go
|
@ -8,17 +8,18 @@ import (
|
||||||
_ "encoding/binary"
|
_ "encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
_ "fmt"
|
_ "fmt"
|
||||||
"github.com/davecgh/go-spew/spew"
|
|
||||||
_ "github.com/golang/protobuf/proto"
|
|
||||||
"github.com/hprose/hprose-golang/io"
|
|
||||||
"go-tcpFramework-rudy/logger"
|
|
||||||
"go-tcpFramework-rudy/util"
|
|
||||||
"golang.org/x/net/context"
|
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"tcptemplate/logger"
|
||||||
|
"tcptemplate/util"
|
||||||
"time"
|
"time"
|
||||||
_ "time"
|
_ "time"
|
||||||
|
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
_ "github.com/golang/protobuf/proto"
|
||||||
|
"github.com/hprose/hprose-golang/io"
|
||||||
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -122,19 +123,18 @@ func (this *UserError) Error() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *PackageReceiever) Write(input []byte, inputlen int) error {
|
func (this *PackageReceiever) Write(input []byte, inputlen int) error {
|
||||||
|
|
||||||
if inputlen == 0 {
|
if inputlen == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// 有缓存中的数据
|
// 有缓存中的数据
|
||||||
if this.pack_len > 0{
|
if this.pack_len > 0 {
|
||||||
copy(this.pack_data[this.pack_len:], input[0:inputlen])
|
copy(this.pack_data[this.pack_len:], input[0:inputlen])
|
||||||
this.pack_len += inputlen
|
this.pack_len += inputlen
|
||||||
if this.pack_data[0] != 0x40 || this.pack_data[1] != 0x41{
|
if this.pack_data[0] != 0x40 || this.pack_data[1] != 0x41 {
|
||||||
return ErrPackFormat
|
return ErrPackFormat
|
||||||
}
|
}
|
||||||
// verify
|
// verify
|
||||||
bpacklen := bytes.NewReader(this.pack_data[2 : 6])
|
bpacklen := bytes.NewReader(this.pack_data[2:6])
|
||||||
var packlen int32
|
var packlen int32
|
||||||
var verify byte
|
var verify byte
|
||||||
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
|
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
|
||||||
|
@ -148,13 +148,13 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
|
||||||
return ErrorPackVerify
|
return ErrorPackVerify
|
||||||
}
|
}
|
||||||
// 还没完全收到一整包数据
|
// 还没完全收到一整包数据
|
||||||
if packlen < int32(inputlen) - 7{
|
if packlen < int32(inputlen)-7 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// 刚好等于一包数据
|
// 刚好等于一包数据
|
||||||
if packlen == int32(inputlen) - 7{
|
if packlen == int32(inputlen)-7 {
|
||||||
_,ok := <- this.cache_package
|
_, ok := <-this.cache_package
|
||||||
if !ok{
|
if !ok {
|
||||||
logger.LogDebugError("channel is closed")
|
logger.LogDebugError("channel is closed")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -162,24 +162,24 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
|
||||||
this.pack_len = 0
|
this.pack_len = 0
|
||||||
}
|
}
|
||||||
// 有两个包
|
// 有两个包
|
||||||
if packlen < int32(inputlen) - 7 {
|
if packlen < int32(inputlen)-7 {
|
||||||
_,ok := <- this.cache_package
|
_, ok := <-this.cache_package
|
||||||
if !ok{
|
if !ok {
|
||||||
logger.LogDebugError("channel is closed")
|
logger.LogDebugError("channel is closed")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
this.cache_package <- this.pack_data[0:inputlen]
|
this.cache_package <- this.pack_data[0:inputlen]
|
||||||
this.pack_len = 0
|
this.pack_len = 0
|
||||||
this.Write(this.pack_data[inputlen:],this.pack_len - int(packlen)) //下个包再去重组包
|
this.Write(this.pack_data[inputlen:], this.pack_len-int(packlen)) //下个包再去重组包
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 没有未完成的组包
|
// 没有未完成的组包
|
||||||
if this.pack_len == 0{
|
if this.pack_len == 0 {
|
||||||
if input[0] != 0x40 || input[1] != 0x41{
|
if input[0] != 0x40 || input[1] != 0x41 {
|
||||||
return ErrPackFormat
|
return ErrPackFormat
|
||||||
}
|
}
|
||||||
// verify
|
// verify
|
||||||
bpacklen := bytes.NewReader(input[2 : 6])
|
bpacklen := bytes.NewReader(input[2:6])
|
||||||
var packlen int32
|
var packlen int32
|
||||||
var verify byte
|
var verify byte
|
||||||
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
|
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
|
||||||
|
@ -193,90 +193,24 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
|
||||||
return ErrorPackVerify
|
return ErrorPackVerify
|
||||||
}
|
}
|
||||||
// 还没完全收到一整包数据
|
// 还没完全收到一整包数据
|
||||||
if packlen < int32(inputlen) - 7{
|
if packlen < int32(inputlen)-7 {
|
||||||
copy(this.pack_data[this.pack_len:], input[0:inputlen])
|
copy(this.pack_data[this.pack_len:], input[0:inputlen])
|
||||||
this.pack_len += inputlen
|
this.pack_len += inputlen
|
||||||
return nil;
|
return nil
|
||||||
}
|
}
|
||||||
// 刚好等于一包数据
|
// 刚好等于一包数据
|
||||||
if packlen == int32(inputlen) - 7{
|
if packlen == int32(inputlen)-7 {
|
||||||
this.cache_package <- input[0:inputlen]
|
this.cache_package <- input[0:inputlen]
|
||||||
this.pack_len = 0
|
this.pack_len = 0
|
||||||
}
|
}
|
||||||
// 有两个包
|
// 有两个包
|
||||||
if packlen < int32(inputlen) - 7 {
|
if packlen < int32(inputlen)-7 {
|
||||||
this.cache_package <- input[0:inputlen]
|
this.cache_package <- input[0:inputlen]
|
||||||
this.pack_len = 0
|
this.pack_len = 0
|
||||||
this.Write(input[inputlen:],inputlen - int(packlen)) //下个包再去重组包
|
this.Write(input[inputlen:], inputlen-int(packlen)) //下个包再去重组包
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
log.Println("recv data len:",inputlen)
|
|
||||||
if this.pack_len > 0 || (this.pack_len + inputlen < 7) {
|
|
||||||
//log.Println("unstorage data",this.pack_len)
|
|
||||||
copy(this.pack_data[this.pack_len:], input[0:inputlen])
|
|
||||||
//log.Println(this.pack_data)
|
|
||||||
this.pack_len += inputlen
|
|
||||||
unsort = this.pack_data
|
|
||||||
unsort_len = this.pack_len
|
|
||||||
} else {
|
|
||||||
unsort = input
|
|
||||||
unsort_len = inputlen
|
|
||||||
}
|
|
||||||
log.Print("this.pack_data length:",this.pack_len)
|
|
||||||
|
|
||||||
log.Print("unsort length:",unsort_len)
|
|
||||||
//只有当负载数据包大于包头的时候才进行解包
|
|
||||||
if unsort_len > 6 && unsort_len < util.MAX_PACKAGE_BUFFER {
|
|
||||||
//获取各个包头地址
|
|
||||||
for i := (0); i < unsort_len; {
|
|
||||||
//包头的时候刚好出现粘包的情况
|
|
||||||
if unsort_len - i < 7{
|
|
||||||
//直接拷贝到未解包缓冲区
|
|
||||||
copy(this.pack_data[0:], unsort[pack_start:unsort_len])
|
|
||||||
this.pack_len = unsort_len - pack_start
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if unsort[0+i] != 0x40 || unsort[1+i] != 0x41 {
|
|
||||||
//log.Println("unsort",unsort[0:inputlen],i)
|
|
||||||
return ErrPackFormat
|
|
||||||
}
|
|
||||||
pack_start = i
|
|
||||||
bpacklen := bytes.NewReader(unsort[i+2 : i+6])
|
|
||||||
var packlen int32
|
|
||||||
var verify byte
|
|
||||||
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
|
|
||||||
if nil != e {
|
|
||||||
logger.LogDebugError(e.Error())
|
|
||||||
}
|
|
||||||
for _, v := range unsort[i+2 : i+6] {
|
|
||||||
verify += v
|
|
||||||
}
|
|
||||||
if verify != unsort[6] {
|
|
||||||
return ErrorPackVerify
|
|
||||||
}
|
|
||||||
//下个包头的地址
|
|
||||||
//log.Println("packlen:",packlen)
|
|
||||||
i += int(packlen) + 7
|
|
||||||
//如果i小于packlen证明出现了沾包的情况
|
|
||||||
if i <= unsort_len {
|
|
||||||
this.cache_package <- unsort[pack_start:i]
|
|
||||||
pack_start = i
|
|
||||||
this.pack_len = 0
|
|
||||||
} else {
|
|
||||||
//超过了当前缓冲区的话拷贝到缓存以便下次组包
|
|
||||||
copy(this.pack_data[0:], unsort[pack_start:unsort_len])
|
|
||||||
this.pack_len = unsort_len - pack_start
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if unsort_len == 0 {
|
|
||||||
return nil
|
|
||||||
} else {
|
|
||||||
return ErrorPackTooLong
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -301,7 +235,7 @@ func DefaultPackageHandler(ctx ProcessContext) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func CloseConnection(c ConnectionContext,receiver *PackageReceiever) {
|
func CloseConnection(c ConnectionContext, receiver *PackageReceiever) {
|
||||||
e := c.Cnn.Close()
|
e := c.Cnn.Close()
|
||||||
if nil != e {
|
if nil != e {
|
||||||
logger.LogDebugError(e.Error())
|
logger.LogDebugError(e.Error())
|
||||||
|
@ -317,7 +251,8 @@ func CloseConnection(c ConnectionContext,receiver *PackageReceiever) {
|
||||||
func HandleConnection(c ConnectionContext) {
|
func HandleConnection(c ConnectionContext) {
|
||||||
buf := make([]byte, PKG_MAX_LENGTH) // 这个1024可以根据你的消息长度来设置
|
buf := make([]byte, PKG_MAX_LENGTH) // 这个1024可以根据你的消息长度来设置
|
||||||
packReceiever := PackageReceieverFactory()
|
packReceiever := PackageReceieverFactory()
|
||||||
pCtx := ProcessContext{c, packReceiever, &c.Cnn,c.h}
|
pCtx := ProcessContext{c, packReceiever, &c.Cnn, c.h}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
for !packReceiever.PackageCleared() {
|
for !packReceiever.PackageCleared() {
|
||||||
//修改handler 机制
|
//修改handler 机制
|
||||||
|
@ -331,29 +266,24 @@ func HandleConnection(c ConnectionContext) {
|
||||||
var err error
|
var err error
|
||||||
recvd, err = c.Cnn.Read(buf)
|
recvd, err = c.Cnn.Read(buf)
|
||||||
log.Print(recvd)
|
log.Print(recvd)
|
||||||
for recvd < 7{
|
for recvd < 7 {
|
||||||
smallbuf := make([]byte, 100) // 这个1024可以根据你的消息长度来设置
|
smallbuf := make([]byte, 100) // 这个1024可以根据你的消息长度来设置
|
||||||
var n int
|
var n int
|
||||||
c.Cnn.SetReadDeadline(time.Now().Add(time.Second))
|
c.Cnn.SetReadDeadline(time.Now().Add(time.Second))
|
||||||
n, err = c.Cnn.Read(smallbuf)
|
n, err = c.Cnn.Read(smallbuf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if nerr, ok := err.(net.Error); ok && !nerr.Temporary() {
|
if nerr, ok := err.(net.Error); ok && !nerr.Temporary() {
|
||||||
CloseConnection(c,packReceiever)
|
CloseConnection(c, packReceiever)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (n > 0){
|
if n > 0 {
|
||||||
log.Print(n)
|
|
||||||
logger.LogRealeaseInfo(smallbuf)
|
logger.LogRealeaseInfo(smallbuf)
|
||||||
copy(buf[recvd:],smallbuf[0:n])
|
copy(buf[recvd:], smallbuf[0:n])
|
||||||
recvd += n
|
recvd += n
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
//var req protocol.Request
|
|
||||||
|
|
||||||
logger.LogRealeaseInfo(buf)
|
logger.LogRealeaseInfo(buf)
|
||||||
|
|
||||||
err = packReceiever.Write(buf, recvd)
|
err = packReceiever.Write(buf, recvd)
|
||||||
//注意,用了chanel不要在同一个携程里面进行插入和取出,以免造成死锁。
|
//注意,用了chanel不要在同一个携程里面进行插入和取出,以免造成死锁。
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -4,19 +4,21 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
zkdriver "github.com/samuel/go-zookeeper/zk"
|
"gobase/utils"
|
||||||
"github.com/satori/go.uuid"
|
|
||||||
"go-tcpFramework-rudy/config"
|
|
||||||
"go-tcpFramework-rudy/logger"
|
|
||||||
"go-tcpFramework-rudy/util"
|
|
||||||
"go-tcpFramework-rudy/zk"
|
|
||||||
"golang.org/x/net/context"
|
|
||||||
"golang.org/x/net/websocket"
|
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"tcptemplate/config"
|
||||||
|
"tcptemplate/logger"
|
||||||
|
"tcptemplate/util"
|
||||||
|
"tcptemplate/zk"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
zkdriver "github.com/samuel/go-zookeeper/zk"
|
||||||
|
uuid "github.com/satori/go.uuid"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"golang.org/x/net/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
const TYPE_TCP_SERVER = 1
|
const TYPE_TCP_SERVER = 1
|
||||||
|
@ -26,7 +28,7 @@ type Server struct {
|
||||||
listener *net.TCPListener
|
listener *net.TCPListener
|
||||||
servrType int32
|
servrType int32
|
||||||
cMgr ConnectionManager
|
cMgr ConnectionManager
|
||||||
h HandleFunction
|
h HandleFunction
|
||||||
}
|
}
|
||||||
|
|
||||||
func badCall() {
|
func badCall() {
|
||||||
|
@ -34,7 +36,7 @@ func badCall() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func ServerFactory(adress string) Server {
|
func ServerFactory(adress string) Server {
|
||||||
ret := Server{Address:adress,listener:nil,servrType:1,cMgr:*GetConnMgr(),h:DefaultPackageHandler}
|
ret := Server{Address: adress, listener: nil, servrType: 1, cMgr: *GetConnMgr(), h: DefaultPackageHandler}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,14 +55,19 @@ func websocketHandler(ws *websocket.Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.LogRealeaseInfo("accept conn", ws.RemoteAddr().String(), ws.LocalAddr().String())
|
logger.LogRealeaseInfo("accept conn", ws.RemoteAddr().String(), ws.LocalAddr().String())
|
||||||
|
if config.IfSecure() {
|
||||||
|
// 首先协商秘钥
|
||||||
|
pub, pri := utils.GenRsaKey(1024)
|
||||||
|
clicnn.Cnn.Write(pub)
|
||||||
|
clicnn.Pri = pri
|
||||||
|
}
|
||||||
go HandleConnection(clicnn)
|
go HandleConnection(clicnn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Server) SetIpAddress(Address string) {
|
func (this *Server) SetIpAddress(Address string) {
|
||||||
this.Address = Address
|
this.Address = Address
|
||||||
}
|
}
|
||||||
func (this *Server)SetHandler(h HandleFunction) {
|
func (this *Server) SetHandler(h HandleFunction) {
|
||||||
this.h = h
|
this.h = h
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,7 +107,7 @@ func (this *Server) Run() error {
|
||||||
logger.LogDebugError(err.Error())
|
logger.LogDebugError(err.Error())
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
c,err := this.listener.AcceptTCP()
|
c, err := this.listener.AcceptTCP()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
|
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
|
||||||
fmt.Println("Stop accepting connections")
|
fmt.Println("Stop accepting connections")
|
||||||
|
@ -113,13 +120,13 @@ func (this *Server) Run() error {
|
||||||
ctxCnn.IpAdress = GetIpAddr(c.RemoteAddr().String())
|
ctxCnn.IpAdress = GetIpAddr(c.RemoteAddr().String())
|
||||||
ctxCnn.Cnntime = time.Now().Unix()
|
ctxCnn.Cnntime = time.Now().Unix()
|
||||||
ctxCnn.Cnn = c
|
ctxCnn.Cnn = c
|
||||||
sockid,_ := uuid.NewV4()
|
sockid, _ := uuid.NewV4()
|
||||||
ctxCnn.sockId = sockid.String()
|
ctxCnn.sockId = sockid.String()
|
||||||
e := GetConnMgr().AddConn(ctxCnn)
|
e := GetConnMgr().AddConn(ctxCnn)
|
||||||
if nil != e {
|
if nil != e {
|
||||||
logger.LogDebugError(e.Error())
|
logger.LogDebugError(e.Error())
|
||||||
}
|
}
|
||||||
logger.LogRealeaseInfo("accept conn", c.RemoteAddr().String(), c.LocalAddr().String()," socket id:",ctxCnn.sockId)
|
logger.LogRealeaseInfo("accept conn", c.RemoteAddr().String(), c.LocalAddr().String(), " socket id:", ctxCnn.sockId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogDebugError(err.Error())
|
logger.LogDebugError(err.Error())
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -3,19 +3,19 @@ package test
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
_ "github.com/golang/protobuf/proto"
|
|
||||||
_ "github.com/hprose/hprose-golang/io"
|
|
||||||
_ "github.com/trysh/ttb"
|
|
||||||
_ "go-tcpFramework-rudy/protocol/pb"
|
|
||||||
_ "io"
|
_ "io"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
_ "runtime"
|
_ "runtime"
|
||||||
|
_ "tcptemplate/protocol/pb"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
|
||||||
|
|
||||||
|
_ "github.com/golang/protobuf/proto"
|
||||||
|
_ "github.com/hprose/hprose-golang/io"
|
||||||
|
_ "github.com/trysh/ttb"
|
||||||
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
tcp 测试程序,测试以下场景
|
tcp 测试程序,测试以下场景
|
||||||
|
@ -61,7 +61,7 @@ func SendPackage(writer *net.TCPConn, payload []byte, length int) error {
|
||||||
//log.Println("Send length",wb)
|
//log.Println("Send length",wb)
|
||||||
b = append(b, packbuf.Bytes()...)
|
b = append(b, packbuf.Bytes()...)
|
||||||
b = append(b, payload[0:length]...)
|
b = append(b, payload[0:length]...)
|
||||||
log.Println(b[0:5],b[5:])
|
log.Println(b[0:5], b[5:])
|
||||||
_, e = writer.Write(b[0:5])
|
_, e = writer.Write(b[0:5])
|
||||||
if e != nil {
|
if e != nil {
|
||||||
log.Println(e.Error())
|
log.Println(e.Error())
|
||||||
|
|
|
@ -1,34 +1,35 @@
|
||||||
package test
|
package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"go-tcpFramework-rudy/tools/mq"
|
"tcptemplate/tools/mq"
|
||||||
"qiniupkg.com/x/log.v7"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"qiniupkg.com/x/log.v7"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Callback(key string,msg string) {
|
func Callback(key string, msg string) {
|
||||||
log.Print(key,msg)
|
log.Print(key, msg)
|
||||||
}
|
}
|
||||||
func TestMq(t *testing.T) {
|
func TestMq(t *testing.T) {
|
||||||
cli := mq.RedisMqClient{}
|
cli := mq.RedisMqClient{}
|
||||||
e := cli.Connect("49.235.25.67:16379",0,"")
|
e := cli.Connect("49.235.25.67:16379", 0, "")
|
||||||
if nil != e{
|
if nil != e {
|
||||||
log.Print(e.Error())
|
log.Print(e.Error())
|
||||||
t.Error(e)
|
t.Error(e)
|
||||||
}
|
}
|
||||||
cli.Subscribe("msg",Callback)
|
cli.Subscribe("msg", Callback)
|
||||||
cli.Public("msg","shit")
|
cli.Public("msg", "shit")
|
||||||
cli.Public("msg","shit2")
|
cli.Public("msg", "shit2")
|
||||||
cli.Public("msg","shit3")
|
cli.Public("msg", "shit3")
|
||||||
|
|
||||||
mqcli := mq.MqttClient{}
|
mqcli := mq.MqttClient{}
|
||||||
e = mqcli.Connect("tcp://" + "49.235.25.67:1883",0,"122")
|
e = mqcli.Connect("tcp://"+"49.235.25.67:1883", 0, "122")
|
||||||
log.Print(e)
|
log.Print(e)
|
||||||
e = mqcli.Subscribe("/dev",Callback)
|
e = mqcli.Subscribe("/dev", Callback)
|
||||||
log.Print(e)
|
log.Print(e)
|
||||||
mqcli.Public("/dev","shit")
|
mqcli.Public("/dev", "shit")
|
||||||
for true{
|
for true {
|
||||||
time.Sleep(100)
|
time.Sleep(100)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,40 +2,37 @@ package mq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/pkg/errors"
|
|
||||||
"go-tcpFramework-rudy/util"
|
|
||||||
"gopkg.in/redis.v3"
|
|
||||||
"io"
|
"io"
|
||||||
|
"tcptemplate/util"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gopkg.in/redis.v3"
|
||||||
"qiniupkg.com/x/log.v7"
|
"qiniupkg.com/x/log.v7"
|
||||||
|
|
||||||
MQTT "github.com/eclipse/paho.mqtt.golang"
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type CallBack func(string, string)
|
||||||
|
|
||||||
type CallBack func(string,string)
|
|
||||||
|
|
||||||
//通用mq库,支持发布,订阅
|
//通用mq库,支持发布,订阅
|
||||||
type MqClient interface {
|
type MqClient interface {
|
||||||
Subscribe(topic string,callback *CallBack) error
|
Subscribe(topic string, callback *CallBack) error
|
||||||
Public(topic string,data string) error
|
Public(topic string, data string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
type RedisMqClient struct {
|
type RedisMqClient struct {
|
||||||
client *redis.Client
|
client *redis.Client
|
||||||
Address string
|
Address string
|
||||||
Password string
|
Password string
|
||||||
}
|
}
|
||||||
type MqttClient struct {
|
type MqttClient struct {
|
||||||
client MQTT.Client
|
client MQTT.Client
|
||||||
Address string
|
Address string
|
||||||
ClientId string
|
ClientId string
|
||||||
Options *MQTT.ClientOptions
|
Options *MQTT.ClientOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MqttClient) Connect(ip string,port int64,id string) error {
|
func (this *MqttClient) Connect(ip string, port int64, id string) error {
|
||||||
this.Address = ip
|
this.Address = ip
|
||||||
|
|
||||||
opts := MQTT.NewClientOptions().AddBroker(ip)
|
opts := MQTT.NewClientOptions().AddBroker(ip)
|
||||||
|
@ -49,7 +46,7 @@ func (this *MqttClient) Connect(ip string,port int64,id string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
|
func (this *RedisMqClient) Connect(ip string, port int64, passwd string) error {
|
||||||
option := &redis.Options{
|
option := &redis.Options{
|
||||||
Addr: ip,
|
Addr: ip,
|
||||||
Password: passwd,
|
Password: passwd,
|
||||||
|
@ -70,11 +67,11 @@ func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (this *MqttClient) Subscribe(topic string,callback CallBack) error {
|
func (this *MqttClient) Subscribe(topic string, callback CallBack) error {
|
||||||
handler := func(c MQTT.Client, msg MQTT.Message) {
|
handler := func(c MQTT.Client, msg MQTT.Message) {
|
||||||
msg.Payload()
|
msg.Payload()
|
||||||
msg.Topic()
|
msg.Topic()
|
||||||
callback(msg.Topic(),string(msg.Payload()))
|
callback(msg.Topic(), string(msg.Payload()))
|
||||||
}
|
}
|
||||||
opts := MQTT.NewClientOptions().AddBroker(this.Address)
|
opts := MQTT.NewClientOptions().AddBroker(this.Address)
|
||||||
opts.SetClientID(this.ClientId)
|
opts.SetClientID(this.ClientId)
|
||||||
|
@ -86,33 +83,33 @@ func (this *MqttClient) Subscribe(topic string,callback CallBack) error {
|
||||||
if token := this.client.Connect(); token.Wait() && token.Error() != nil {
|
if token := this.client.Connect(); token.Wait() && token.Error() != nil {
|
||||||
return token.Error()
|
return token.Error()
|
||||||
}
|
}
|
||||||
this.client.Subscribe(topic,0,handler)
|
this.client.Subscribe(topic, 0, handler)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (this *RedisMqClient) Subscribe(topic string,callback CallBack) error {
|
func (this *RedisMqClient) Subscribe(topic string, callback CallBack) error {
|
||||||
if nil == this{
|
if nil == this {
|
||||||
return errors.New(util.ERR_NULL_POINTER)
|
return errors.New(util.ERR_NULL_POINTER)
|
||||||
}
|
}
|
||||||
pubsub, err := this.client.Subscribe(topic)
|
pubsub, err := this.client.Subscribe(topic)
|
||||||
if nil != err{
|
if nil != err {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
x := make(chan bool,1)
|
x := make(chan bool, 1)
|
||||||
go func() {
|
go func() {
|
||||||
x <- true
|
x <- true
|
||||||
defer pubsub.Close()
|
defer pubsub.Close()
|
||||||
for true{
|
for true {
|
||||||
msg, err := pubsub.ReceiveMessage()
|
msg, err := pubsub.ReceiveMessage()
|
||||||
if nil != err{
|
if nil != err {
|
||||||
log.Print(err)
|
log.Print(err)
|
||||||
if err == io.EOF{
|
if err == io.EOF {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
callback(topic,msg.Payload)
|
callback(topic, msg.Payload)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
<- x
|
<-x
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,45 +130,45 @@ func creatRedisClient(option *redis.Options) (*redis.Client, error) {
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *RedisMqClient) Public(topic string,data string) error {
|
func (this *RedisMqClient) Public(topic string, data string) error {
|
||||||
if nil == this{
|
if nil == this {
|
||||||
return errors.New(util.ERR_NULL_POINTER)
|
return errors.New(util.ERR_NULL_POINTER)
|
||||||
}
|
}
|
||||||
// 检测client有效性
|
// 检测client有效性
|
||||||
if nil != this.client {
|
if nil != this.client {
|
||||||
_, err := this.client.Ping().Result()
|
_, err := this.client.Ping().Result()
|
||||||
if nil != err {
|
if nil != err {
|
||||||
this.client.Close()
|
this.client.Close()
|
||||||
// 尝试3次重连
|
// 尝试3次重连
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
this.client, err = creatRedisClient(&redis.Options{
|
this.client, err = creatRedisClient(&redis.Options{
|
||||||
Addr:this.Address,
|
Addr: this.Address,
|
||||||
Password:this.Password,
|
Password: this.Password,
|
||||||
})
|
})
|
||||||
if this.client != nil {
|
if this.client != nil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, err := this.client.Publish(topic, data).Result()
|
_, err := this.client.Publish(topic, data).Result()
|
||||||
if nil != err{
|
if nil != err {
|
||||||
log.Print(err)
|
log.Print(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MqttClient) Public(topic string,data string) error {
|
func (this *MqttClient) Public(topic string, data string) error {
|
||||||
if nil == this{
|
if nil == this {
|
||||||
return errors.New(util.ERR_NULL_POINTER)
|
return errors.New(util.ERR_NULL_POINTER)
|
||||||
}
|
}
|
||||||
// 检测client有效性
|
// 检测client有效性
|
||||||
if true != this.client.IsConnected() {
|
if true != this.client.IsConnected() {
|
||||||
|
|
||||||
}
|
}
|
||||||
token := this.client.Publish(topic,0, false,data)
|
token := this.client.Publish(topic, 0, false, data)
|
||||||
token.Wait()
|
token.Wait()
|
||||||
if nil != token.Error(){
|
if nil != token.Error() {
|
||||||
log.Print(token.Error())
|
log.Print(token.Error())
|
||||||
return token.Error()
|
return token.Error()
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,12 +2,13 @@ package zk
|
||||||
|
|
||||||
import (
|
import (
|
||||||
_ "bytes"
|
_ "bytes"
|
||||||
zk "github.com/samuel/go-zookeeper/zk"
|
|
||||||
"go-tcpFramework-rudy/config"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
_ "strings"
|
_ "strings"
|
||||||
|
"tcptemplate/config"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
zk "github.com/samuel/go-zookeeper/zk"
|
||||||
)
|
)
|
||||||
|
|
||||||
var g_zkCnn *zk.Conn
|
var g_zkCnn *zk.Conn
|
||||||
|
|
Loading…
Reference in New Issue