Compare commits

...

10 Commits

Author SHA1 Message Date
caiyuzheng 84d1cd38be no message 2021-05-31 00:34:28 +08:00
caiyuzheng 368e49ff71 no message 2020-12-04 22:06:28 +08:00
zcy 129c86e54f 文档更新 2020-08-27 16:49:38 +08:00
caiyuzheng d4a3ca3f8d 修改文档 2020-08-26 22:52:36 +08:00
caiyuzheng 1d463d5988 no message 2020-08-26 20:53:01 +08:00
caiyuzheng 048c3dcc07 no message 2020-08-26 20:52:26 +08:00
caiyuzheng 1af0420ebc no message 2020-08-24 02:07:50 +08:00
caiyuzheng 8a086a77b3 no message 2020-08-20 01:09:18 +08:00
caiyuzheng 801bdd69bd 添加安全tcp的实现 2020-08-19 20:37:47 +08:00
a7458969 47b72da442 添加secure tcp的使用说明 2020-08-18 15:10:55 +08:00
15 changed files with 213 additions and 227 deletions

17
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,17 @@
{
// 使 IntelliSense
//
// 访: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceRoot}/main.go",
"env": {},
"args": []
}
]
}

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"go.inferGopath": false
}

View File

@ -1,5 +1,4 @@
FROM alpine:latest FROM alpine:latest
MAINTAINER caiyu "a7458969@gmail.com"
WORKDIR /home/ubuntu/api/bin WORKDIR /home/ubuntu/api/bin
@ -9,4 +8,4 @@ RUN mkdir /lib64 && ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
EXPOSE 9850 EXPOSE 9850
ENTRYPOINT ["/bin/go-tcpFramework-rudy","--conf_path=/bin/TcpServConfig.json"] ENTRYPOINT ["/bin/go-tcpFramework-rudy","--conf_path=/bin/TcpServConfig.json"]

View File

@ -1,24 +1,26 @@
## go-tcpFramework-rudy
#### 介绍 #### 介绍
基于golang的tcp长连接框架--rudy。 tcp流转包服务端框架。。</br>
this repo is desperated...looks like websocket can do it natually.
#### 软件架构 #### 特性
功能: 功能:
提供连接数管理,支持熔断,热重启 提供连接数管理,支持熔断,热重启。
提供路由管理,支持context化timeout和cancel 支持 context 化回调机制。
支持分布式,和主节点选举 ##### 支持分布式,和主节点选举。
支持apolo配置 主节点的作用
心跳包支持 心跳包支持。
服务端推送支持 服务端推送支持。
##### 可选并行回调或者同步回调方式。
并行回调就是异步回调,回调不保证执行得有序性。
同步回调一定会在上个回调函数执行完毕后执行。
#### How to use #### How to use
```go ```go
package main package main
import ( import (
"flag" "flag"
"go-tcpFramework-rudy/config" "tcptemplate/config"
"go-tcpFramework-rudy/logger" "tcptemplate/logger"
"go-tcpFramework-rudy/network" "tcptemplate/network"
"log" "log"
) )
@ -38,17 +40,30 @@ func main() {
} }
} }
``` ```
server 服务实例 server 服务实例
server.SetHandler 设置处理句柄 server.SetHandler 设置处理句柄
server.Run() 服务启动 server.Run() 服务启动
### 包头格式文档(Package format) ### 包头格式文档(Package format)
```text ```text
0 1 2 3 0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0x40 | 0x41 | length[3] | length[2] | 0x40 | 0x41 | length[3] | length[2]
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| length[1] | length[0] | verify | | length[1] | length[0] | verify |
``` ```
包头格式7个字节首个字节标识符40 41 ,然后包长度最后是verify是包头长度的校验和
包头格式 7 个字节,首个字节标识符 40 41 ,然后包长度,最后是 verify 是包头长度的校验和
### secure tcp
secure tcp 提供了类似于 tls,ssl 的机制的安全 tcp 连接,流程如下
</br>
</br>
![image.png](https://www.testingcloud.club/sapi/api/image_download/692bf256-e121-11ea-bc55-525400dc6cec.png)
</br>

View File

@ -13,6 +13,7 @@ type Config struct {
ID string `json:"id"` ID string `json:"id"`
Ipadress string `json:"ipadress"` //format like ip:port Ipadress string `json:"ipadress"` //format like ip:port
Zkserver []string `json:"zkserver"` Zkserver []string `json:"zkserver"`
Secure bool `json:"secure"` // 是否是安全tcp
} }
var gConf *Config var gConf *Config
@ -39,7 +40,7 @@ func InitConfig(path string) {
if e != nil { if e != nil {
log.Println(e.Error()) log.Println(e.Error())
} }
e = ioutil.WriteFile("TcpServConfig.json", buf, os.ModePerm) e = ioutil.WriteFile("TcpServConfig.json", buf, os.ModePerm)
if nil != e { if nil != e {
logger.LogDebugError(e.Error()) logger.LogDebugError(e.Error())
return return
@ -52,7 +53,11 @@ func InitConfig(path string) {
return return
} }
e := file.Close() e := file.Close()
if nil != e{ if nil != e {
logger.LogDebugError(e.Error()) logger.LogDebugError(e.Error())
} }
} }
func IfSecure() bool {
return gConf.Secure
}

View File

@ -10,4 +10,5 @@ ssh -t -i ./id_rsa ubuntu@118.24.238.198 'cp /home/ubuntu/api/bin/go-tcpFramew
ssh -t -i ./id_rsa root@118.24.238.198 'cd /home/ubuntu/api/bin/;docker build ./ -t "caiyuzheng\rudy"' ssh -t -i ./id_rsa root@118.24.238.198 'cd /home/ubuntu/api/bin/;docker build ./ -t "caiyuzheng\rudy"'
ssh -t -i ./id_rsa ubuntu@118.24.238.198 'chmod 777 /home/ubuntu/api/bin/rudy' ssh -t -i ./id_rsa ubuntu@118.24.238.198 'chmod 777 /home/ubuntu/api/bin/rudy'
ssh -t -i ./id_rsa ubuntu@118.24.238.198 '/home/ubuntu/api/bin/restart.sh rudy' ssh -t -i ./id_rsa ubuntu@118.24.238.198 '/home/ubuntu/api/bin/restart.sh rudy'

2
go.mod
View File

@ -1,5 +1,7 @@
module go-tcpFramework-rudy module go-tcpFramework-rudy
go 1.14
require ( require (
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 // indirect github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 // indirect

10
main.go
View File

@ -2,20 +2,20 @@ package main
import ( import (
"flag" "flag"
"go-tcpFramework-rudy/config"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/network"
"log" "log"
"runtime" "runtime"
"tcptemplate/config"
"tcptemplate/logger"
"tcptemplate/network"
) )
func main() { func main() {
if runtime.GOOS != `linux`{ if runtime.GOOS != `linux` {
log.Print("do not support " + runtime.GOOS + " platform") log.Print("do not support " + runtime.GOOS + " platform")
//return //return
} }
var confPath string var confPath string
flag.StringVar(&confPath,"conf_path","TcpServConfig.json","the path of configuration file") flag.StringVar(&confPath, "conf_path", "TcpServConfig.json", "the path of configuration file")
flag.Parse() flag.Parse()
config.InitConfig(confPath) config.InitConfig(confPath)

View File

@ -4,30 +4,35 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/util"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
"log" "log"
"net" "net"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"tcptemplate/config"
"tcptemplate/logger"
"tcptemplate/util"
"time" "time"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
) )
type ConnectionContext struct { type ConnectionContext struct {
context.Context context.Context
sockId string sockId string
Type string //连接类型有websocket普通的socket Type string //连接类型有websocket普通的socket
Cnntime int64 //连接时间 Cnntime int64 //连接时间
IpAdress string //客户端ip IpAdress string //客户端ip
Port int32 //端口号 Port int32 //端口号
Cnn net.Conn //连接主体代码 Cnn net.Conn //连接主体代码
wsCnn *websocket.Conn // wsCnn *websocket.Conn //
Business interface{} //业务代码,最好是一个结构体的形式. Business interface{} //业务代码,最好是一个结构体的形式.
h HandleFunction h HandleFunction
SecureAuth bool // secure tcp 是否已经协商秘钥
Pri []byte // secure tcp协商的私钥
} }
type ConnectionManager struct { type ConnectionManager struct {
@ -40,13 +45,16 @@ type ConnectionManager struct {
func withTimeoutConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext { func withTimeoutConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
ctx, _ := context.WithTimeout(parent, timeOut) ctx, _ := context.WithTimeout(parent, timeOut)
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil,DefaultPackageHandler} if config.IfSecure() { // secure tcp
}
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
return ret return ret
} }
func withCalcelConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext { func withCalcelConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
ctx, _ := context.WithCancel(parent) ctx, _ := context.WithCancel(parent)
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil,DefaultPackageHandler} ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
return ret return ret
} }
@ -54,7 +62,7 @@ var gConnMgr *ConnectionManager
func init() { func init() {
gConnMgr = new(ConnectionManager) gConnMgr = new(ConnectionManager)
gConnMgr.connSet = make(map[string]ConnectionContext,1000) gConnMgr.connSet = make(map[string]ConnectionContext, 1000)
gConnMgr.mutex = sync.Mutex{} gConnMgr.mutex = sync.Mutex{}
} }

View File

@ -8,17 +8,18 @@ import (
_ "encoding/binary" _ "encoding/binary"
"errors" "errors"
_ "fmt" _ "fmt"
"github.com/davecgh/go-spew/spew"
_ "github.com/golang/protobuf/proto"
"github.com/hprose/hprose-golang/io"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/util"
"golang.org/x/net/context"
"log" "log"
"net" "net"
"sync" "sync"
"tcptemplate/logger"
"tcptemplate/util"
"time" "time"
_ "time" _ "time"
"github.com/davecgh/go-spew/spew"
_ "github.com/golang/protobuf/proto"
"github.com/hprose/hprose-golang/io"
"golang.org/x/net/context"
) )
const ( const (
@ -122,19 +123,18 @@ func (this *UserError) Error() string {
} }
func (this *PackageReceiever) Write(input []byte, inputlen int) error { func (this *PackageReceiever) Write(input []byte, inputlen int) error {
if inputlen == 0 { if inputlen == 0 {
return nil return nil
} }
// 有缓存中的数据 // 有缓存中的数据
if this.pack_len > 0{ if this.pack_len > 0 {
copy(this.pack_data[this.pack_len:], input[0:inputlen]) copy(this.pack_data[this.pack_len:], input[0:inputlen])
this.pack_len += inputlen this.pack_len += inputlen
if this.pack_data[0] != 0x40 || this.pack_data[1] != 0x41{ if this.pack_data[0] != 0x40 || this.pack_data[1] != 0x41 {
return ErrPackFormat return ErrPackFormat
} }
// verify // verify
bpacklen := bytes.NewReader(this.pack_data[2 : 6]) bpacklen := bytes.NewReader(this.pack_data[2:6])
var packlen int32 var packlen int32
var verify byte var verify byte
e := binary.Read(bpacklen, binary.BigEndian, &packlen) e := binary.Read(bpacklen, binary.BigEndian, &packlen)
@ -148,13 +148,13 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
return ErrorPackVerify return ErrorPackVerify
} }
// 还没完全收到一整包数据 // 还没完全收到一整包数据
if packlen < int32(inputlen) - 7{ if packlen < int32(inputlen)-7 {
return nil return nil
} }
// 刚好等于一包数据 // 刚好等于一包数据
if packlen == int32(inputlen) - 7{ if packlen == int32(inputlen)-7 {
_,ok := <- this.cache_package _, ok := <-this.cache_package
if !ok{ if !ok {
logger.LogDebugError("channel is closed") logger.LogDebugError("channel is closed")
return nil return nil
} }
@ -162,24 +162,24 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
this.pack_len = 0 this.pack_len = 0
} }
// 有两个包 // 有两个包
if packlen < int32(inputlen) - 7 { if packlen < int32(inputlen)-7 {
_,ok := <- this.cache_package _, ok := <-this.cache_package
if !ok{ if !ok {
logger.LogDebugError("channel is closed") logger.LogDebugError("channel is closed")
return nil return nil
} }
this.cache_package <- this.pack_data[0:inputlen] this.cache_package <- this.pack_data[0:inputlen]
this.pack_len = 0 this.pack_len = 0
this.Write(this.pack_data[inputlen:],this.pack_len - int(packlen)) //下个包再去重组包 this.Write(this.pack_data[inputlen:], this.pack_len-int(packlen)) //下个包再去重组包
} }
} }
// 没有未完成的组包 // 没有未完成的组包
if this.pack_len == 0{ if this.pack_len == 0 {
if input[0] != 0x40 || input[1] != 0x41{ if input[0] != 0x40 || input[1] != 0x41 {
return ErrPackFormat return ErrPackFormat
} }
// verify // verify
bpacklen := bytes.NewReader(input[2 : 6]) bpacklen := bytes.NewReader(input[2:6])
var packlen int32 var packlen int32
var verify byte var verify byte
e := binary.Read(bpacklen, binary.BigEndian, &packlen) e := binary.Read(bpacklen, binary.BigEndian, &packlen)
@ -193,90 +193,24 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
return ErrorPackVerify return ErrorPackVerify
} }
// 还没完全收到一整包数据 // 还没完全收到一整包数据
if packlen < int32(inputlen) - 7{ if packlen < int32(inputlen)-7 {
copy(this.pack_data[this.pack_len:], input[0:inputlen]) copy(this.pack_data[this.pack_len:], input[0:inputlen])
this.pack_len += inputlen this.pack_len += inputlen
return nil; return nil
} }
// 刚好等于一包数据 // 刚好等于一包数据
if packlen == int32(inputlen) - 7{ if packlen == int32(inputlen)-7 {
this.cache_package <- input[0:inputlen] this.cache_package <- input[0:inputlen]
this.pack_len = 0 this.pack_len = 0
} }
// 有两个包 // 有两个包
if packlen < int32(inputlen) - 7 { if packlen < int32(inputlen)-7 {
this.cache_package <- input[0:inputlen] this.cache_package <- input[0:inputlen]
this.pack_len = 0 this.pack_len = 0
this.Write(input[inputlen:],inputlen - int(packlen)) //下个包再去重组包 this.Write(input[inputlen:], inputlen-int(packlen)) //下个包再去重组包
} }
} }
/*
log.Println("recv data len:",inputlen)
if this.pack_len > 0 || (this.pack_len + inputlen < 7) {
//log.Println("unstorage data",this.pack_len)
copy(this.pack_data[this.pack_len:], input[0:inputlen])
//log.Println(this.pack_data)
this.pack_len += inputlen
unsort = this.pack_data
unsort_len = this.pack_len
} else {
unsort = input
unsort_len = inputlen
}
log.Print("this.pack_data length:",this.pack_len)
log.Print("unsort length:",unsort_len)
//只有当负载数据包大于包头的时候才进行解包
if unsort_len > 6 && unsort_len < util.MAX_PACKAGE_BUFFER {
//获取各个包头地址
for i := (0); i < unsort_len; {
//包头的时候刚好出现粘包的情况
if unsort_len - i < 7{
//直接拷贝到未解包缓冲区
copy(this.pack_data[0:], unsort[pack_start:unsort_len])
this.pack_len = unsort_len - pack_start
break
}
if unsort[0+i] != 0x40 || unsort[1+i] != 0x41 {
//log.Println("unsort",unsort[0:inputlen],i)
return ErrPackFormat
}
pack_start = i
bpacklen := bytes.NewReader(unsort[i+2 : i+6])
var packlen int32
var verify byte
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
if nil != e {
logger.LogDebugError(e.Error())
}
for _, v := range unsort[i+2 : i+6] {
verify += v
}
if verify != unsort[6] {
return ErrorPackVerify
}
//下个包头的地址
//log.Println("packlen:",packlen)
i += int(packlen) + 7
//如果i小于packlen证明出现了沾包的情况
if i <= unsort_len {
this.cache_package <- unsort[pack_start:i]
pack_start = i
this.pack_len = 0
} else {
//超过了当前缓冲区的话拷贝到缓存以便下次组包
copy(this.pack_data[0:], unsort[pack_start:unsort_len])
this.pack_len = unsort_len - pack_start
}
}
} else {
if unsort_len == 0 {
return nil
} else {
return ErrorPackTooLong
}
}*/
return nil return nil
} }
@ -301,7 +235,7 @@ func DefaultPackageHandler(ctx ProcessContext) {
} }
} }
func CloseConnection(c ConnectionContext,receiver *PackageReceiever) { func CloseConnection(c ConnectionContext, receiver *PackageReceiever) {
e := c.Cnn.Close() e := c.Cnn.Close()
if nil != e { if nil != e {
logger.LogDebugError(e.Error()) logger.LogDebugError(e.Error())
@ -317,7 +251,8 @@ func CloseConnection(c ConnectionContext,receiver *PackageReceiever) {
func HandleConnection(c ConnectionContext) { func HandleConnection(c ConnectionContext) {
buf := make([]byte, PKG_MAX_LENGTH) // 这个1024可以根据你的消息长度来设置 buf := make([]byte, PKG_MAX_LENGTH) // 这个1024可以根据你的消息长度来设置
packReceiever := PackageReceieverFactory() packReceiever := PackageReceieverFactory()
pCtx := ProcessContext{c, packReceiever, &c.Cnn,c.h} pCtx := ProcessContext{c, packReceiever, &c.Cnn, c.h}
for { for {
for !packReceiever.PackageCleared() { for !packReceiever.PackageCleared() {
//修改handler 机制 //修改handler 机制
@ -331,29 +266,24 @@ func HandleConnection(c ConnectionContext) {
var err error var err error
recvd, err = c.Cnn.Read(buf) recvd, err = c.Cnn.Read(buf)
log.Print(recvd) log.Print(recvd)
for recvd < 7{ for recvd < 7 {
smallbuf := make([]byte, 100) // 这个1024可以根据你的消息长度来设置 smallbuf := make([]byte, 100) // 这个1024可以根据你的消息长度来设置
var n int var n int
c.Cnn.SetReadDeadline(time.Now().Add(time.Second)) c.Cnn.SetReadDeadline(time.Now().Add(time.Second))
n, err = c.Cnn.Read(smallbuf) n, err = c.Cnn.Read(smallbuf)
if err != nil { if err != nil {
if nerr, ok := err.(net.Error); ok && !nerr.Temporary() { if nerr, ok := err.(net.Error); ok && !nerr.Temporary() {
CloseConnection(c,packReceiever) CloseConnection(c, packReceiever)
return return
} }
} }
if (n > 0){ if n > 0 {
log.Print(n)
logger.LogRealeaseInfo(smallbuf) logger.LogRealeaseInfo(smallbuf)
copy(buf[recvd:],smallbuf[0:n]) copy(buf[recvd:], smallbuf[0:n])
recvd += n recvd += n
} }
} }
//var req protocol.Request
logger.LogRealeaseInfo(buf) logger.LogRealeaseInfo(buf)
err = packReceiever.Write(buf, recvd) err = packReceiever.Write(buf, recvd)
//注意用了chanel不要在同一个携程里面进行插入和取出以免造成死锁。 //注意用了chanel不要在同一个携程里面进行插入和取出以免造成死锁。
if err != nil { if err != nil {

View File

@ -4,19 +4,21 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
zkdriver "github.com/samuel/go-zookeeper/zk" "gobase/utils"
"github.com/satori/go.uuid"
"go-tcpFramework-rudy/config"
"go-tcpFramework-rudy/logger"
"go-tcpFramework-rudy/util"
"go-tcpFramework-rudy/zk"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
"log" "log"
"net" "net"
"net/http" "net/http"
"os" "os"
"tcptemplate/config"
"tcptemplate/logger"
"tcptemplate/util"
"tcptemplate/zk"
"time" "time"
zkdriver "github.com/samuel/go-zookeeper/zk"
uuid "github.com/satori/go.uuid"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
) )
const TYPE_TCP_SERVER = 1 const TYPE_TCP_SERVER = 1
@ -26,7 +28,7 @@ type Server struct {
listener *net.TCPListener listener *net.TCPListener
servrType int32 servrType int32
cMgr ConnectionManager cMgr ConnectionManager
h HandleFunction h HandleFunction
} }
func badCall() { func badCall() {
@ -34,7 +36,7 @@ func badCall() {
} }
func ServerFactory(adress string) Server { func ServerFactory(adress string) Server {
ret := Server{Address:adress,listener:nil,servrType:1,cMgr:*GetConnMgr(),h:DefaultPackageHandler} ret := Server{Address: adress, listener: nil, servrType: 1, cMgr: *GetConnMgr(), h: DefaultPackageHandler}
return ret return ret
} }
@ -53,14 +55,19 @@ func websocketHandler(ws *websocket.Conn) {
} }
logger.LogRealeaseInfo("accept conn", ws.RemoteAddr().String(), ws.LocalAddr().String()) logger.LogRealeaseInfo("accept conn", ws.RemoteAddr().String(), ws.LocalAddr().String())
if config.IfSecure() {
// 首先协商秘钥
pub, pri := utils.GenRsaKey(1024)
clicnn.Cnn.Write(pub)
clicnn.Pri = pri
}
go HandleConnection(clicnn) go HandleConnection(clicnn)
} }
func (this *Server) SetIpAddress(Address string) { func (this *Server) SetIpAddress(Address string) {
this.Address = Address this.Address = Address
} }
func (this *Server)SetHandler(h HandleFunction) { func (this *Server) SetHandler(h HandleFunction) {
this.h = h this.h = h
} }
@ -100,7 +107,7 @@ func (this *Server) Run() error {
logger.LogDebugError(err.Error()) logger.LogDebugError(err.Error())
} }
for { for {
c,err := this.listener.AcceptTCP() c, err := this.listener.AcceptTCP()
if err != nil { if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() { if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
fmt.Println("Stop accepting connections") fmt.Println("Stop accepting connections")
@ -113,13 +120,13 @@ func (this *Server) Run() error {
ctxCnn.IpAdress = GetIpAddr(c.RemoteAddr().String()) ctxCnn.IpAdress = GetIpAddr(c.RemoteAddr().String())
ctxCnn.Cnntime = time.Now().Unix() ctxCnn.Cnntime = time.Now().Unix()
ctxCnn.Cnn = c ctxCnn.Cnn = c
sockid,_ := uuid.NewV4() sockid, _ := uuid.NewV4()
ctxCnn.sockId = sockid.String() ctxCnn.sockId = sockid.String()
e := GetConnMgr().AddConn(ctxCnn) e := GetConnMgr().AddConn(ctxCnn)
if nil != e { if nil != e {
logger.LogDebugError(e.Error()) logger.LogDebugError(e.Error())
} }
logger.LogRealeaseInfo("accept conn", c.RemoteAddr().String(), c.LocalAddr().String()," socket id:",ctxCnn.sockId) logger.LogRealeaseInfo("accept conn", c.RemoteAddr().String(), c.LocalAddr().String(), " socket id:", ctxCnn.sockId)
if err != nil { if err != nil {
logger.LogDebugError(err.Error()) logger.LogDebugError(err.Error())
continue continue

View File

@ -3,19 +3,19 @@ package test
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
_ "github.com/golang/protobuf/proto"
_ "github.com/hprose/hprose-golang/io"
_ "github.com/trysh/ttb"
_ "go-tcpFramework-rudy/protocol/pb"
_ "io" _ "io"
"log" "log"
"net" "net"
_ "net/http/pprof" _ "net/http/pprof"
_ "runtime" _ "runtime"
_ "tcptemplate/protocol/pb"
"testing" "testing"
"time" "time"
)
_ "github.com/golang/protobuf/proto"
_ "github.com/hprose/hprose-golang/io"
_ "github.com/trysh/ttb"
)
/* /*
tcp tcp
@ -61,7 +61,7 @@ func SendPackage(writer *net.TCPConn, payload []byte, length int) error {
//log.Println("Send length",wb) //log.Println("Send length",wb)
b = append(b, packbuf.Bytes()...) b = append(b, packbuf.Bytes()...)
b = append(b, payload[0:length]...) b = append(b, payload[0:length]...)
log.Println(b[0:5],b[5:]) log.Println(b[0:5], b[5:])
_, e = writer.Write(b[0:5]) _, e = writer.Write(b[0:5])
if e != nil { if e != nil {
log.Println(e.Error()) log.Println(e.Error())

View File

@ -1,34 +1,35 @@
package test package test
import ( import (
"go-tcpFramework-rudy/tools/mq" "tcptemplate/tools/mq"
"qiniupkg.com/x/log.v7"
"testing" "testing"
"time" "time"
"qiniupkg.com/x/log.v7"
) )
func Callback(key string,msg string) { func Callback(key string, msg string) {
log.Print(key,msg) log.Print(key, msg)
} }
func TestMq(t *testing.T) { func TestMq(t *testing.T) {
cli := mq.RedisMqClient{} cli := mq.RedisMqClient{}
e := cli.Connect("49.235.25.67:16379",0,"") e := cli.Connect("49.235.25.67:16379", 0, "")
if nil != e{ if nil != e {
log.Print(e.Error()) log.Print(e.Error())
t.Error(e) t.Error(e)
} }
cli.Subscribe("msg",Callback) cli.Subscribe("msg", Callback)
cli.Public("msg","shit") cli.Public("msg", "shit")
cli.Public("msg","shit2") cli.Public("msg", "shit2")
cli.Public("msg","shit3") cli.Public("msg", "shit3")
mqcli := mq.MqttClient{} mqcli := mq.MqttClient{}
e = mqcli.Connect("tcp://" + "49.235.25.67:1883",0,"122") e = mqcli.Connect("tcp://"+"49.235.25.67:1883", 0, "122")
log.Print(e) log.Print(e)
e = mqcli.Subscribe("/dev",Callback) e = mqcli.Subscribe("/dev", Callback)
log.Print(e) log.Print(e)
mqcli.Public("/dev","shit") mqcli.Public("/dev", "shit")
for true{ for true {
time.Sleep(100) time.Sleep(100)
} }
} }

View File

@ -2,40 +2,37 @@ package mq
import ( import (
"fmt" "fmt"
"github.com/pkg/errors"
"go-tcpFramework-rudy/util"
"gopkg.in/redis.v3"
"io" "io"
"tcptemplate/util"
"github.com/pkg/errors"
"gopkg.in/redis.v3"
"qiniupkg.com/x/log.v7" "qiniupkg.com/x/log.v7"
MQTT "github.com/eclipse/paho.mqtt.golang" MQTT "github.com/eclipse/paho.mqtt.golang"
) )
type CallBack func(string, string)
type CallBack func(string,string)
//通用mq库支持发布订阅 //通用mq库支持发布订阅
type MqClient interface { type MqClient interface {
Subscribe(topic string,callback *CallBack) error Subscribe(topic string, callback *CallBack) error
Public(topic string,data string) error Public(topic string, data string) error
} }
type RedisMqClient struct { type RedisMqClient struct {
client *redis.Client client *redis.Client
Address string Address string
Password string Password string
} }
type MqttClient struct { type MqttClient struct {
client MQTT.Client client MQTT.Client
Address string Address string
ClientId string ClientId string
Options *MQTT.ClientOptions Options *MQTT.ClientOptions
} }
func (this *MqttClient) Connect(ip string,port int64,id string) error { func (this *MqttClient) Connect(ip string, port int64, id string) error {
this.Address = ip this.Address = ip
opts := MQTT.NewClientOptions().AddBroker(ip) opts := MQTT.NewClientOptions().AddBroker(ip)
@ -49,7 +46,7 @@ func (this *MqttClient) Connect(ip string,port int64,id string) error {
return nil return nil
} }
func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error { func (this *RedisMqClient) Connect(ip string, port int64, passwd string) error {
option := &redis.Options{ option := &redis.Options{
Addr: ip, Addr: ip,
Password: passwd, Password: passwd,
@ -70,11 +67,11 @@ func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
} }
return nil return nil
} }
func (this *MqttClient) Subscribe(topic string,callback CallBack) error { func (this *MqttClient) Subscribe(topic string, callback CallBack) error {
handler := func(c MQTT.Client, msg MQTT.Message) { handler := func(c MQTT.Client, msg MQTT.Message) {
msg.Payload() msg.Payload()
msg.Topic() msg.Topic()
callback(msg.Topic(),string(msg.Payload())) callback(msg.Topic(), string(msg.Payload()))
} }
opts := MQTT.NewClientOptions().AddBroker(this.Address) opts := MQTT.NewClientOptions().AddBroker(this.Address)
opts.SetClientID(this.ClientId) opts.SetClientID(this.ClientId)
@ -86,33 +83,33 @@ func (this *MqttClient) Subscribe(topic string,callback CallBack) error {
if token := this.client.Connect(); token.Wait() && token.Error() != nil { if token := this.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error() return token.Error()
} }
this.client.Subscribe(topic,0,handler) this.client.Subscribe(topic, 0, handler)
return nil return nil
} }
func (this *RedisMqClient) Subscribe(topic string,callback CallBack) error { func (this *RedisMqClient) Subscribe(topic string, callback CallBack) error {
if nil == this{ if nil == this {
return errors.New(util.ERR_NULL_POINTER) return errors.New(util.ERR_NULL_POINTER)
} }
pubsub, err := this.client.Subscribe(topic) pubsub, err := this.client.Subscribe(topic)
if nil != err{ if nil != err {
return err return err
} }
x := make(chan bool,1) x := make(chan bool, 1)
go func() { go func() {
x <- true x <- true
defer pubsub.Close() defer pubsub.Close()
for true{ for true {
msg, err := pubsub.ReceiveMessage() msg, err := pubsub.ReceiveMessage()
if nil != err{ if nil != err {
log.Print(err) log.Print(err)
if err == io.EOF{ if err == io.EOF {
return return
} }
} }
callback(topic,msg.Payload) callback(topic, msg.Payload)
} }
}() }()
<- x <-x
return nil return nil
} }
@ -133,45 +130,45 @@ func creatRedisClient(option *redis.Options) (*redis.Client, error) {
return client, nil return client, nil
} }
func (this *RedisMqClient) Public(topic string,data string) error { func (this *RedisMqClient) Public(topic string, data string) error {
if nil == this{ if nil == this {
return errors.New(util.ERR_NULL_POINTER) return errors.New(util.ERR_NULL_POINTER)
} }
// 检测client有效性 // 检测client有效性
if nil != this.client { if nil != this.client {
_, err := this.client.Ping().Result() _, err := this.client.Ping().Result()
if nil != err { if nil != err {
this.client.Close() this.client.Close()
// 尝试3次重连 // 尝试3次重连
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
this.client, err = creatRedisClient(&redis.Options{ this.client, err = creatRedisClient(&redis.Options{
Addr:this.Address, Addr: this.Address,
Password:this.Password, Password: this.Password,
}) })
if this.client != nil { if this.client != nil {
} }
} }
} }
} }
_, err := this.client.Publish(topic, data).Result() _, err := this.client.Publish(topic, data).Result()
if nil != err{ if nil != err {
log.Print(err) log.Print(err)
return err return err
} }
return nil return nil
} }
func (this *MqttClient) Public(topic string,data string) error { func (this *MqttClient) Public(topic string, data string) error {
if nil == this{ if nil == this {
return errors.New(util.ERR_NULL_POINTER) return errors.New(util.ERR_NULL_POINTER)
} }
// 检测client有效性 // 检测client有效性
if true != this.client.IsConnected() { if true != this.client.IsConnected() {
} }
token := this.client.Publish(topic,0, false,data) token := this.client.Publish(topic, 0, false, data)
token.Wait() token.Wait()
if nil != token.Error(){ if nil != token.Error() {
log.Print(token.Error()) log.Print(token.Error())
return token.Error() return token.Error()
} }

View File

@ -2,12 +2,13 @@ package zk
import ( import (
_ "bytes" _ "bytes"
zk "github.com/samuel/go-zookeeper/zk"
"go-tcpFramework-rudy/config"
"log" "log"
"os" "os"
_ "strings" _ "strings"
"tcptemplate/config"
"time" "time"
zk "github.com/samuel/go-zookeeper/zk"
) )
var g_zkCnn *zk.Conn var g_zkCnn *zk.Conn