Compare commits
10 Commits
ef03c1e8e4
...
84d1cd38be
Author | SHA1 | Date |
---|---|---|
caiyuzheng | 84d1cd38be | |
caiyuzheng | 368e49ff71 | |
zcy | 129c86e54f | |
caiyuzheng | d4a3ca3f8d | |
caiyuzheng | 1d463d5988 | |
caiyuzheng | 048c3dcc07 | |
caiyuzheng | 1af0420ebc | |
caiyuzheng | 8a086a77b3 | |
caiyuzheng | 801bdd69bd | |
a7458969 | 47b72da442 |
|
@ -0,0 +1,17 @@
|
|||
{
|
||||
// 使用 IntelliSense 了解相关属性。
|
||||
// 悬停以查看现有属性的描述。
|
||||
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Launch",
|
||||
"type": "go",
|
||||
"request": "launch",
|
||||
"mode": "auto",
|
||||
"program": "${workspaceRoot}/main.go",
|
||||
"env": {},
|
||||
"args": []
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
{
|
||||
"go.inferGopath": false
|
||||
}
|
|
@ -1,5 +1,4 @@
|
|||
FROM alpine:latest
|
||||
MAINTAINER caiyu "a7458969@gmail.com"
|
||||
|
||||
WORKDIR /home/ubuntu/api/bin
|
||||
|
||||
|
@ -9,4 +8,4 @@ RUN mkdir /lib64 && ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
|
|||
|
||||
EXPOSE 9850
|
||||
|
||||
ENTRYPOINT ["/bin/go-tcpFramework-rudy","--conf_path=/bin/TcpServConfig.json"]
|
||||
ENTRYPOINT ["/bin/go-tcpFramework-rudy","--conf_path=/bin/TcpServConfig.json"]
|
||||
|
|
51
README.md
51
README.md
|
@ -1,24 +1,26 @@
|
|||
## go-tcpFramework-rudy
|
||||
#### 介绍
|
||||
基于golang的tcp长连接框架--rudy。
|
||||
|
||||
#### 软件架构
|
||||
tcp流转包服务端框架。。</br>
|
||||
this repo is desperated...looks like websocket can do it natually.
|
||||
#### 特性
|
||||
功能:
|
||||
提供连接数管理,支持熔断,热重启
|
||||
提供路由管理,支持context化timeout和cancel
|
||||
支持分布式,和主节点选举
|
||||
支持apolo配置
|
||||
心跳包支持
|
||||
服务端推送支持
|
||||
提供连接数管理,支持熔断,热重启。
|
||||
支持 context 化回调机制。
|
||||
##### 支持分布式,和主节点选举。
|
||||
主节点的作用
|
||||
心跳包支持。
|
||||
服务端推送支持。
|
||||
##### 可选并行回调或者同步回调方式。
|
||||
并行回调就是异步回调,回调不保证执行得有序性。
|
||||
同步回调一定会在上个回调函数执行完毕后执行。
|
||||
#### How to use
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"go-tcpFramework-rudy/config"
|
||||
"go-tcpFramework-rudy/logger"
|
||||
"go-tcpFramework-rudy/network"
|
||||
"tcptemplate/config"
|
||||
"tcptemplate/logger"
|
||||
"tcptemplate/network"
|
||||
"log"
|
||||
)
|
||||
|
||||
|
@ -38,17 +40,30 @@ func main() {
|
|||
}
|
||||
}
|
||||
```
|
||||
|
||||
server 服务实例
|
||||
server.SetHandler 设置处理句柄
|
||||
server.Run() 服务启动
|
||||
server.Run() 服务启动
|
||||
|
||||
### 包头格式文档(Package format)
|
||||
|
||||
```text
|
||||
0 1 2 3
|
||||
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
|
||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||
| 0x40 | 0x41 | length[3] | length[2]
|
||||
| 0x40 | 0x41 | length[3] | length[2]
|
||||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||
| length[1] | length[0] | verify |
|
||||
|
||||
| length[1] | length[0] | verify |
|
||||
|
||||
```
|
||||
包头格式7个字节,首个字节标识符40 41 ,然后包长度,最后是verify是包头长度的校验和
|
||||
|
||||
包头格式 7 个字节,首个字节标识符 40 41 ,然后包长度,最后是 verify 是包头长度的校验和
|
||||
|
||||
### secure tcp
|
||||
secure tcp 提供了类似于 tls,ssl 的机制的安全 tcp 连接,流程如下
|
||||
</br>
|
||||
</br>
|
||||
|
||||
![image.png](https://www.testingcloud.club/sapi/api/image_download/692bf256-e121-11ea-bc55-525400dc6cec.png)
|
||||
</br>
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ type Config struct {
|
|||
ID string `json:"id"`
|
||||
Ipadress string `json:"ipadress"` //format like ip:port
|
||||
Zkserver []string `json:"zkserver"`
|
||||
Secure bool `json:"secure"` // 是否是安全tcp
|
||||
}
|
||||
|
||||
var gConf *Config
|
||||
|
@ -39,7 +40,7 @@ func InitConfig(path string) {
|
|||
if e != nil {
|
||||
log.Println(e.Error())
|
||||
}
|
||||
e = ioutil.WriteFile("TcpServConfig.json", buf, os.ModePerm)
|
||||
e = ioutil.WriteFile("TcpServConfig.json", buf, os.ModePerm)
|
||||
if nil != e {
|
||||
logger.LogDebugError(e.Error())
|
||||
return
|
||||
|
@ -52,7 +53,11 @@ func InitConfig(path string) {
|
|||
return
|
||||
}
|
||||
e := file.Close()
|
||||
if nil != e{
|
||||
if nil != e {
|
||||
logger.LogDebugError(e.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func IfSecure() bool {
|
||||
return gConf.Secure
|
||||
}
|
||||
|
|
|
@ -10,4 +10,5 @@ ssh -t -i ./id_rsa ubuntu@118.24.238.198 'cp /home/ubuntu/api/bin/go-tcpFramew
|
|||
|
||||
ssh -t -i ./id_rsa root@118.24.238.198 'cd /home/ubuntu/api/bin/;docker build ./ -t "caiyuzheng\rudy"'
|
||||
ssh -t -i ./id_rsa ubuntu@118.24.238.198 'chmod 777 /home/ubuntu/api/bin/rudy'
|
||||
ssh -t -i ./id_rsa ubuntu@118.24.238.198 '/home/ubuntu/api/bin/restart.sh rudy'
|
||||
ssh -t -i ./id_rsa ubuntu@118.24.238.198 '/home/ubuntu/api/bin/restart.sh rudy'
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -1,5 +1,7 @@
|
|||
module go-tcpFramework-rudy
|
||||
|
||||
go 1.14
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 // indirect
|
||||
|
|
10
main.go
10
main.go
|
@ -2,20 +2,20 @@ package main
|
|||
|
||||
import (
|
||||
"flag"
|
||||
"go-tcpFramework-rudy/config"
|
||||
"go-tcpFramework-rudy/logger"
|
||||
"go-tcpFramework-rudy/network"
|
||||
"log"
|
||||
"runtime"
|
||||
"tcptemplate/config"
|
||||
"tcptemplate/logger"
|
||||
"tcptemplate/network"
|
||||
)
|
||||
|
||||
func main() {
|
||||
if runtime.GOOS != `linux`{
|
||||
if runtime.GOOS != `linux` {
|
||||
log.Print("do not support " + runtime.GOOS + " platform")
|
||||
//return
|
||||
}
|
||||
var confPath string
|
||||
flag.StringVar(&confPath,"conf_path","TcpServConfig.json","the path of configuration file")
|
||||
flag.StringVar(&confPath, "conf_path", "TcpServConfig.json", "the path of configuration file")
|
||||
flag.Parse()
|
||||
config.InitConfig(confPath)
|
||||
|
||||
|
|
|
@ -4,30 +4,35 @@ import (
|
|||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"go-tcpFramework-rudy/logger"
|
||||
"go-tcpFramework-rudy/util"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/websocket"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"tcptemplate/config"
|
||||
"tcptemplate/logger"
|
||||
"tcptemplate/util"
|
||||
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
type ConnectionContext struct {
|
||||
context.Context
|
||||
sockId string
|
||||
Type string //连接类型,有websocket,普通的socket
|
||||
Cnntime int64 //连接时间
|
||||
IpAdress string //客户端ip
|
||||
Port int32 //端口号
|
||||
Cnn net.Conn //连接主体代码
|
||||
wsCnn *websocket.Conn //
|
||||
Business interface{} //业务代码,最好是一个结构体的形式.
|
||||
h HandleFunction
|
||||
sockId string
|
||||
Type string //连接类型,有websocket,普通的socket
|
||||
Cnntime int64 //连接时间
|
||||
IpAdress string //客户端ip
|
||||
Port int32 //端口号
|
||||
Cnn net.Conn //连接主体代码
|
||||
wsCnn *websocket.Conn //
|
||||
Business interface{} //业务代码,最好是一个结构体的形式.
|
||||
h HandleFunction
|
||||
SecureAuth bool // secure tcp 是否已经协商秘钥
|
||||
Pri []byte // secure tcp协商的私钥
|
||||
}
|
||||
|
||||
type ConnectionManager struct {
|
||||
|
@ -40,13 +45,16 @@ type ConnectionManager struct {
|
|||
|
||||
func withTimeoutConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
|
||||
ctx, _ := context.WithTimeout(parent, timeOut)
|
||||
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil,DefaultPackageHandler}
|
||||
if config.IfSecure() { // secure tcp
|
||||
|
||||
}
|
||||
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
|
||||
return ret
|
||||
}
|
||||
|
||||
func withCalcelConnectContext(parent context.Context, timeOut time.Duration) ConnectionContext {
|
||||
ctx, _ := context.WithCancel(parent)
|
||||
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil,DefaultPackageHandler}
|
||||
ret := ConnectionContext{ctx, "0", "socket", 0, "", 0, nil, nil, nil, DefaultPackageHandler, false, []byte{}}
|
||||
return ret
|
||||
}
|
||||
|
||||
|
@ -54,7 +62,7 @@ var gConnMgr *ConnectionManager
|
|||
|
||||
func init() {
|
||||
gConnMgr = new(ConnectionManager)
|
||||
gConnMgr.connSet = make(map[string]ConnectionContext,1000)
|
||||
gConnMgr.connSet = make(map[string]ConnectionContext, 1000)
|
||||
gConnMgr.mutex = sync.Mutex{}
|
||||
}
|
||||
|
||||
|
|
136
network/proto.go
136
network/proto.go
|
@ -8,17 +8,18 @@ import (
|
|||
_ "encoding/binary"
|
||||
"errors"
|
||||
_ "fmt"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
_ "github.com/golang/protobuf/proto"
|
||||
"github.com/hprose/hprose-golang/io"
|
||||
"go-tcpFramework-rudy/logger"
|
||||
"go-tcpFramework-rudy/util"
|
||||
"golang.org/x/net/context"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"tcptemplate/logger"
|
||||
"tcptemplate/util"
|
||||
"time"
|
||||
_ "time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
_ "github.com/golang/protobuf/proto"
|
||||
"github.com/hprose/hprose-golang/io"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -122,19 +123,18 @@ func (this *UserError) Error() string {
|
|||
}
|
||||
|
||||
func (this *PackageReceiever) Write(input []byte, inputlen int) error {
|
||||
|
||||
if inputlen == 0 {
|
||||
return nil
|
||||
}
|
||||
// 有缓存中的数据
|
||||
if this.pack_len > 0{
|
||||
if this.pack_len > 0 {
|
||||
copy(this.pack_data[this.pack_len:], input[0:inputlen])
|
||||
this.pack_len += inputlen
|
||||
if this.pack_data[0] != 0x40 || this.pack_data[1] != 0x41{
|
||||
if this.pack_data[0] != 0x40 || this.pack_data[1] != 0x41 {
|
||||
return ErrPackFormat
|
||||
}
|
||||
// verify
|
||||
bpacklen := bytes.NewReader(this.pack_data[2 : 6])
|
||||
bpacklen := bytes.NewReader(this.pack_data[2:6])
|
||||
var packlen int32
|
||||
var verify byte
|
||||
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
|
||||
|
@ -148,13 +148,13 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
|
|||
return ErrorPackVerify
|
||||
}
|
||||
// 还没完全收到一整包数据
|
||||
if packlen < int32(inputlen) - 7{
|
||||
if packlen < int32(inputlen)-7 {
|
||||
return nil
|
||||
}
|
||||
// 刚好等于一包数据
|
||||
if packlen == int32(inputlen) - 7{
|
||||
_,ok := <- this.cache_package
|
||||
if !ok{
|
||||
if packlen == int32(inputlen)-7 {
|
||||
_, ok := <-this.cache_package
|
||||
if !ok {
|
||||
logger.LogDebugError("channel is closed")
|
||||
return nil
|
||||
}
|
||||
|
@ -162,24 +162,24 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
|
|||
this.pack_len = 0
|
||||
}
|
||||
// 有两个包
|
||||
if packlen < int32(inputlen) - 7 {
|
||||
_,ok := <- this.cache_package
|
||||
if !ok{
|
||||
if packlen < int32(inputlen)-7 {
|
||||
_, ok := <-this.cache_package
|
||||
if !ok {
|
||||
logger.LogDebugError("channel is closed")
|
||||
return nil
|
||||
}
|
||||
this.cache_package <- this.pack_data[0:inputlen]
|
||||
this.pack_len = 0
|
||||
this.Write(this.pack_data[inputlen:],this.pack_len - int(packlen)) //下个包再去重组包
|
||||
this.Write(this.pack_data[inputlen:], this.pack_len-int(packlen)) //下个包再去重组包
|
||||
}
|
||||
}
|
||||
// 没有未完成的组包
|
||||
if this.pack_len == 0{
|
||||
if input[0] != 0x40 || input[1] != 0x41{
|
||||
if this.pack_len == 0 {
|
||||
if input[0] != 0x40 || input[1] != 0x41 {
|
||||
return ErrPackFormat
|
||||
}
|
||||
// verify
|
||||
bpacklen := bytes.NewReader(input[2 : 6])
|
||||
bpacklen := bytes.NewReader(input[2:6])
|
||||
var packlen int32
|
||||
var verify byte
|
||||
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
|
||||
|
@ -193,90 +193,24 @@ func (this *PackageReceiever) Write(input []byte, inputlen int) error {
|
|||
return ErrorPackVerify
|
||||
}
|
||||
// 还没完全收到一整包数据
|
||||
if packlen < int32(inputlen) - 7{
|
||||
if packlen < int32(inputlen)-7 {
|
||||
copy(this.pack_data[this.pack_len:], input[0:inputlen])
|
||||
this.pack_len += inputlen
|
||||
return nil;
|
||||
return nil
|
||||
}
|
||||
// 刚好等于一包数据
|
||||
if packlen == int32(inputlen) - 7{
|
||||
if packlen == int32(inputlen)-7 {
|
||||
this.cache_package <- input[0:inputlen]
|
||||
this.pack_len = 0
|
||||
}
|
||||
// 有两个包
|
||||
if packlen < int32(inputlen) - 7 {
|
||||
if packlen < int32(inputlen)-7 {
|
||||
this.cache_package <- input[0:inputlen]
|
||||
this.pack_len = 0
|
||||
this.Write(input[inputlen:],inputlen - int(packlen)) //下个包再去重组包
|
||||
this.Write(input[inputlen:], inputlen-int(packlen)) //下个包再去重组包
|
||||
}
|
||||
}
|
||||
/*
|
||||
log.Println("recv data len:",inputlen)
|
||||
if this.pack_len > 0 || (this.pack_len + inputlen < 7) {
|
||||
//log.Println("unstorage data",this.pack_len)
|
||||
copy(this.pack_data[this.pack_len:], input[0:inputlen])
|
||||
//log.Println(this.pack_data)
|
||||
this.pack_len += inputlen
|
||||
unsort = this.pack_data
|
||||
unsort_len = this.pack_len
|
||||
} else {
|
||||
unsort = input
|
||||
unsort_len = inputlen
|
||||
}
|
||||
log.Print("this.pack_data length:",this.pack_len)
|
||||
|
||||
log.Print("unsort length:",unsort_len)
|
||||
//只有当负载数据包大于包头的时候才进行解包
|
||||
if unsort_len > 6 && unsort_len < util.MAX_PACKAGE_BUFFER {
|
||||
//获取各个包头地址
|
||||
for i := (0); i < unsort_len; {
|
||||
//包头的时候刚好出现粘包的情况
|
||||
if unsort_len - i < 7{
|
||||
//直接拷贝到未解包缓冲区
|
||||
copy(this.pack_data[0:], unsort[pack_start:unsort_len])
|
||||
this.pack_len = unsort_len - pack_start
|
||||
break
|
||||
}
|
||||
|
||||
if unsort[0+i] != 0x40 || unsort[1+i] != 0x41 {
|
||||
//log.Println("unsort",unsort[0:inputlen],i)
|
||||
return ErrPackFormat
|
||||
}
|
||||
pack_start = i
|
||||
bpacklen := bytes.NewReader(unsort[i+2 : i+6])
|
||||
var packlen int32
|
||||
var verify byte
|
||||
e := binary.Read(bpacklen, binary.BigEndian, &packlen)
|
||||
if nil != e {
|
||||
logger.LogDebugError(e.Error())
|
||||
}
|
||||
for _, v := range unsort[i+2 : i+6] {
|
||||
verify += v
|
||||
}
|
||||
if verify != unsort[6] {
|
||||
return ErrorPackVerify
|
||||
}
|
||||
//下个包头的地址
|
||||
//log.Println("packlen:",packlen)
|
||||
i += int(packlen) + 7
|
||||
//如果i小于packlen证明出现了沾包的情况
|
||||
if i <= unsort_len {
|
||||
this.cache_package <- unsort[pack_start:i]
|
||||
pack_start = i
|
||||
this.pack_len = 0
|
||||
} else {
|
||||
//超过了当前缓冲区的话拷贝到缓存以便下次组包
|
||||
copy(this.pack_data[0:], unsort[pack_start:unsort_len])
|
||||
this.pack_len = unsort_len - pack_start
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if unsort_len == 0 {
|
||||
return nil
|
||||
} else {
|
||||
return ErrorPackTooLong
|
||||
}
|
||||
}*/
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -301,7 +235,7 @@ func DefaultPackageHandler(ctx ProcessContext) {
|
|||
}
|
||||
}
|
||||
|
||||
func CloseConnection(c ConnectionContext,receiver *PackageReceiever) {
|
||||
func CloseConnection(c ConnectionContext, receiver *PackageReceiever) {
|
||||
e := c.Cnn.Close()
|
||||
if nil != e {
|
||||
logger.LogDebugError(e.Error())
|
||||
|
@ -317,7 +251,8 @@ func CloseConnection(c ConnectionContext,receiver *PackageReceiever) {
|
|||
func HandleConnection(c ConnectionContext) {
|
||||
buf := make([]byte, PKG_MAX_LENGTH) // 这个1024可以根据你的消息长度来设置
|
||||
packReceiever := PackageReceieverFactory()
|
||||
pCtx := ProcessContext{c, packReceiever, &c.Cnn,c.h}
|
||||
pCtx := ProcessContext{c, packReceiever, &c.Cnn, c.h}
|
||||
|
||||
for {
|
||||
for !packReceiever.PackageCleared() {
|
||||
//修改handler 机制
|
||||
|
@ -331,29 +266,24 @@ func HandleConnection(c ConnectionContext) {
|
|||
var err error
|
||||
recvd, err = c.Cnn.Read(buf)
|
||||
log.Print(recvd)
|
||||
for recvd < 7{
|
||||
for recvd < 7 {
|
||||
smallbuf := make([]byte, 100) // 这个1024可以根据你的消息长度来设置
|
||||
var n int
|
||||
c.Cnn.SetReadDeadline(time.Now().Add(time.Second))
|
||||
n, err = c.Cnn.Read(smallbuf)
|
||||
if err != nil {
|
||||
if nerr, ok := err.(net.Error); ok && !nerr.Temporary() {
|
||||
CloseConnection(c,packReceiever)
|
||||
CloseConnection(c, packReceiever)
|
||||
return
|
||||
}
|
||||
}
|
||||
if (n > 0){
|
||||
log.Print(n)
|
||||
if n > 0 {
|
||||
logger.LogRealeaseInfo(smallbuf)
|
||||
copy(buf[recvd:],smallbuf[0:n])
|
||||
copy(buf[recvd:], smallbuf[0:n])
|
||||
recvd += n
|
||||
}
|
||||
|
||||
}
|
||||
//var req protocol.Request
|
||||
|
||||
logger.LogRealeaseInfo(buf)
|
||||
|
||||
err = packReceiever.Write(buf, recvd)
|
||||
//注意,用了chanel不要在同一个携程里面进行插入和取出,以免造成死锁。
|
||||
if err != nil {
|
||||
|
|
|
@ -4,19 +4,21 @@ import (
|
|||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
zkdriver "github.com/samuel/go-zookeeper/zk"
|
||||
"github.com/satori/go.uuid"
|
||||
"go-tcpFramework-rudy/config"
|
||||
"go-tcpFramework-rudy/logger"
|
||||
"go-tcpFramework-rudy/util"
|
||||
"go-tcpFramework-rudy/zk"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/websocket"
|
||||
"gobase/utils"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"tcptemplate/config"
|
||||
"tcptemplate/logger"
|
||||
"tcptemplate/util"
|
||||
"tcptemplate/zk"
|
||||
"time"
|
||||
|
||||
zkdriver "github.com/samuel/go-zookeeper/zk"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
const TYPE_TCP_SERVER = 1
|
||||
|
@ -26,7 +28,7 @@ type Server struct {
|
|||
listener *net.TCPListener
|
||||
servrType int32
|
||||
cMgr ConnectionManager
|
||||
h HandleFunction
|
||||
h HandleFunction
|
||||
}
|
||||
|
||||
func badCall() {
|
||||
|
@ -34,7 +36,7 @@ func badCall() {
|
|||
}
|
||||
|
||||
func ServerFactory(adress string) Server {
|
||||
ret := Server{Address:adress,listener:nil,servrType:1,cMgr:*GetConnMgr(),h:DefaultPackageHandler}
|
||||
ret := Server{Address: adress, listener: nil, servrType: 1, cMgr: *GetConnMgr(), h: DefaultPackageHandler}
|
||||
return ret
|
||||
}
|
||||
|
||||
|
@ -53,14 +55,19 @@ func websocketHandler(ws *websocket.Conn) {
|
|||
}
|
||||
|
||||
logger.LogRealeaseInfo("accept conn", ws.RemoteAddr().String(), ws.LocalAddr().String())
|
||||
|
||||
if config.IfSecure() {
|
||||
// 首先协商秘钥
|
||||
pub, pri := utils.GenRsaKey(1024)
|
||||
clicnn.Cnn.Write(pub)
|
||||
clicnn.Pri = pri
|
||||
}
|
||||
go HandleConnection(clicnn)
|
||||
}
|
||||
|
||||
func (this *Server) SetIpAddress(Address string) {
|
||||
this.Address = Address
|
||||
}
|
||||
func (this *Server)SetHandler(h HandleFunction) {
|
||||
func (this *Server) SetHandler(h HandleFunction) {
|
||||
this.h = h
|
||||
}
|
||||
|
||||
|
@ -100,7 +107,7 @@ func (this *Server) Run() error {
|
|||
logger.LogDebugError(err.Error())
|
||||
}
|
||||
for {
|
||||
c,err := this.listener.AcceptTCP()
|
||||
c, err := this.listener.AcceptTCP()
|
||||
if err != nil {
|
||||
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
|
||||
fmt.Println("Stop accepting connections")
|
||||
|
@ -113,13 +120,13 @@ func (this *Server) Run() error {
|
|||
ctxCnn.IpAdress = GetIpAddr(c.RemoteAddr().String())
|
||||
ctxCnn.Cnntime = time.Now().Unix()
|
||||
ctxCnn.Cnn = c
|
||||
sockid,_ := uuid.NewV4()
|
||||
sockid, _ := uuid.NewV4()
|
||||
ctxCnn.sockId = sockid.String()
|
||||
e := GetConnMgr().AddConn(ctxCnn)
|
||||
if nil != e {
|
||||
logger.LogDebugError(e.Error())
|
||||
}
|
||||
logger.LogRealeaseInfo("accept conn", c.RemoteAddr().String(), c.LocalAddr().String()," socket id:",ctxCnn.sockId)
|
||||
logger.LogRealeaseInfo("accept conn", c.RemoteAddr().String(), c.LocalAddr().String(), " socket id:", ctxCnn.sockId)
|
||||
if err != nil {
|
||||
logger.LogDebugError(err.Error())
|
||||
continue
|
||||
|
|
|
@ -3,19 +3,19 @@ package test
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
_ "github.com/golang/protobuf/proto"
|
||||
_ "github.com/hprose/hprose-golang/io"
|
||||
_ "github.com/trysh/ttb"
|
||||
_ "go-tcpFramework-rudy/protocol/pb"
|
||||
_ "io"
|
||||
"log"
|
||||
"net"
|
||||
_ "net/http/pprof"
|
||||
_ "runtime"
|
||||
_ "tcptemplate/protocol/pb"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
_ "github.com/golang/protobuf/proto"
|
||||
_ "github.com/hprose/hprose-golang/io"
|
||||
_ "github.com/trysh/ttb"
|
||||
)
|
||||
|
||||
/*
|
||||
tcp 测试程序,测试以下场景
|
||||
|
@ -61,7 +61,7 @@ func SendPackage(writer *net.TCPConn, payload []byte, length int) error {
|
|||
//log.Println("Send length",wb)
|
||||
b = append(b, packbuf.Bytes()...)
|
||||
b = append(b, payload[0:length]...)
|
||||
log.Println(b[0:5],b[5:])
|
||||
log.Println(b[0:5], b[5:])
|
||||
_, e = writer.Write(b[0:5])
|
||||
if e != nil {
|
||||
log.Println(e.Error())
|
||||
|
|
|
@ -1,34 +1,35 @@
|
|||
package test
|
||||
|
||||
import (
|
||||
"go-tcpFramework-rudy/tools/mq"
|
||||
"qiniupkg.com/x/log.v7"
|
||||
"tcptemplate/tools/mq"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"qiniupkg.com/x/log.v7"
|
||||
)
|
||||
|
||||
func Callback(key string,msg string) {
|
||||
log.Print(key,msg)
|
||||
func Callback(key string, msg string) {
|
||||
log.Print(key, msg)
|
||||
}
|
||||
func TestMq(t *testing.T) {
|
||||
func TestMq(t *testing.T) {
|
||||
cli := mq.RedisMqClient{}
|
||||
e := cli.Connect("49.235.25.67:16379",0,"")
|
||||
if nil != e{
|
||||
e := cli.Connect("49.235.25.67:16379", 0, "")
|
||||
if nil != e {
|
||||
log.Print(e.Error())
|
||||
t.Error(e)
|
||||
}
|
||||
cli.Subscribe("msg",Callback)
|
||||
cli.Public("msg","shit")
|
||||
cli.Public("msg","shit2")
|
||||
cli.Public("msg","shit3")
|
||||
cli.Subscribe("msg", Callback)
|
||||
cli.Public("msg", "shit")
|
||||
cli.Public("msg", "shit2")
|
||||
cli.Public("msg", "shit3")
|
||||
|
||||
mqcli := mq.MqttClient{}
|
||||
e = mqcli.Connect("tcp://" + "49.235.25.67:1883",0,"122")
|
||||
e = mqcli.Connect("tcp://"+"49.235.25.67:1883", 0, "122")
|
||||
log.Print(e)
|
||||
e = mqcli.Subscribe("/dev",Callback)
|
||||
e = mqcli.Subscribe("/dev", Callback)
|
||||
log.Print(e)
|
||||
mqcli.Public("/dev","shit")
|
||||
for true{
|
||||
mqcli.Public("/dev", "shit")
|
||||
for true {
|
||||
time.Sleep(100)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,40 +2,37 @@ package mq
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"go-tcpFramework-rudy/util"
|
||||
"gopkg.in/redis.v3"
|
||||
"io"
|
||||
"tcptemplate/util"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gopkg.in/redis.v3"
|
||||
"qiniupkg.com/x/log.v7"
|
||||
|
||||
MQTT "github.com/eclipse/paho.mqtt.golang"
|
||||
|
||||
)
|
||||
|
||||
|
||||
|
||||
type CallBack func(string,string)
|
||||
type CallBack func(string, string)
|
||||
|
||||
//通用mq库,支持发布,订阅
|
||||
type MqClient interface {
|
||||
Subscribe(topic string,callback *CallBack) error
|
||||
Public(topic string,data string) error
|
||||
Subscribe(topic string, callback *CallBack) error
|
||||
Public(topic string, data string) error
|
||||
}
|
||||
|
||||
|
||||
type RedisMqClient struct {
|
||||
client *redis.Client
|
||||
Address string
|
||||
client *redis.Client
|
||||
Address string
|
||||
Password string
|
||||
}
|
||||
type MqttClient struct {
|
||||
client MQTT.Client
|
||||
Address string
|
||||
type MqttClient struct {
|
||||
client MQTT.Client
|
||||
Address string
|
||||
ClientId string
|
||||
Options *MQTT.ClientOptions
|
||||
Options *MQTT.ClientOptions
|
||||
}
|
||||
|
||||
func (this *MqttClient) Connect(ip string,port int64,id string) error {
|
||||
func (this *MqttClient) Connect(ip string, port int64, id string) error {
|
||||
this.Address = ip
|
||||
|
||||
opts := MQTT.NewClientOptions().AddBroker(ip)
|
||||
|
@ -49,7 +46,7 @@ func (this *MqttClient) Connect(ip string,port int64,id string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
|
||||
func (this *RedisMqClient) Connect(ip string, port int64, passwd string) error {
|
||||
option := &redis.Options{
|
||||
Addr: ip,
|
||||
Password: passwd,
|
||||
|
@ -70,11 +67,11 @@ func (this *RedisMqClient) Connect(ip string,port int64,passwd string) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
func (this *MqttClient) Subscribe(topic string,callback CallBack) error {
|
||||
func (this *MqttClient) Subscribe(topic string, callback CallBack) error {
|
||||
handler := func(c MQTT.Client, msg MQTT.Message) {
|
||||
msg.Payload()
|
||||
msg.Topic()
|
||||
callback(msg.Topic(),string(msg.Payload()))
|
||||
callback(msg.Topic(), string(msg.Payload()))
|
||||
}
|
||||
opts := MQTT.NewClientOptions().AddBroker(this.Address)
|
||||
opts.SetClientID(this.ClientId)
|
||||
|
@ -86,33 +83,33 @@ func (this *MqttClient) Subscribe(topic string,callback CallBack) error {
|
|||
if token := this.client.Connect(); token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
this.client.Subscribe(topic,0,handler)
|
||||
this.client.Subscribe(topic, 0, handler)
|
||||
return nil
|
||||
}
|
||||
func (this *RedisMqClient) Subscribe(topic string,callback CallBack) error {
|
||||
if nil == this{
|
||||
func (this *RedisMqClient) Subscribe(topic string, callback CallBack) error {
|
||||
if nil == this {
|
||||
return errors.New(util.ERR_NULL_POINTER)
|
||||
}
|
||||
pubsub, err := this.client.Subscribe(topic)
|
||||
if nil != err{
|
||||
if nil != err {
|
||||
return err
|
||||
}
|
||||
x := make(chan bool,1)
|
||||
x := make(chan bool, 1)
|
||||
go func() {
|
||||
x <- true
|
||||
defer pubsub.Close()
|
||||
for true{
|
||||
for true {
|
||||
msg, err := pubsub.ReceiveMessage()
|
||||
if nil != err{
|
||||
if nil != err {
|
||||
log.Print(err)
|
||||
if err == io.EOF{
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
}
|
||||
callback(topic,msg.Payload)
|
||||
callback(topic, msg.Payload)
|
||||
}
|
||||
}()
|
||||
<- x
|
||||
<-x
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -133,45 +130,45 @@ func creatRedisClient(option *redis.Options) (*redis.Client, error) {
|
|||
return client, nil
|
||||
}
|
||||
|
||||
func (this *RedisMqClient) Public(topic string,data string) error {
|
||||
if nil == this{
|
||||
func (this *RedisMqClient) Public(topic string, data string) error {
|
||||
if nil == this {
|
||||
return errors.New(util.ERR_NULL_POINTER)
|
||||
}
|
||||
// 检测client有效性
|
||||
if nil != this.client {
|
||||
_, err := this.client.Ping().Result()
|
||||
_, err := this.client.Ping().Result()
|
||||
if nil != err {
|
||||
this.client.Close()
|
||||
// 尝试3次重连
|
||||
for i := 0; i < 3; i++ {
|
||||
this.client, err = creatRedisClient(&redis.Options{
|
||||
Addr:this.Address,
|
||||
Password:this.Password,
|
||||
Addr: this.Address,
|
||||
Password: this.Password,
|
||||
})
|
||||
if this.client != nil {
|
||||
if this.client != nil {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_, err := this.client.Publish(topic, data).Result()
|
||||
if nil != err{
|
||||
if nil != err {
|
||||
log.Print(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *MqttClient) Public(topic string,data string) error {
|
||||
if nil == this{
|
||||
func (this *MqttClient) Public(topic string, data string) error {
|
||||
if nil == this {
|
||||
return errors.New(util.ERR_NULL_POINTER)
|
||||
}
|
||||
// 检测client有效性
|
||||
if true != this.client.IsConnected() {
|
||||
|
||||
}
|
||||
token := this.client.Publish(topic,0, false,data)
|
||||
token := this.client.Publish(topic, 0, false, data)
|
||||
token.Wait()
|
||||
if nil != token.Error(){
|
||||
if nil != token.Error() {
|
||||
log.Print(token.Error())
|
||||
return token.Error()
|
||||
}
|
||||
|
|
|
@ -2,12 +2,13 @@ package zk
|
|||
|
||||
import (
|
||||
_ "bytes"
|
||||
zk "github.com/samuel/go-zookeeper/zk"
|
||||
"go-tcpFramework-rudy/config"
|
||||
"log"
|
||||
"os"
|
||||
_ "strings"
|
||||
"tcptemplate/config"
|
||||
"time"
|
||||
|
||||
zk "github.com/samuel/go-zookeeper/zk"
|
||||
)
|
||||
|
||||
var g_zkCnn *zk.Conn
|
||||
|
|
Loading…
Reference in New Issue