parent
491d589775
commit
a08da42c9b
|
@ -0,0 +1,104 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"log"
|
||||
_ "io"
|
||||
_ "tcpserv/protocol/pb"
|
||||
"time"
|
||||
_ "github.com/golang/protobuf/proto"
|
||||
_ "github.com/trysh/ttb"
|
||||
"encoding/binary"
|
||||
"bytes"
|
||||
_ "github.com/hprose/hprose-golang/io"
|
||||
_ "runtime"
|
||||
_ "net/http/pprof"
|
||||
_ "runtime"
|
||||
)
|
||||
|
||||
type PackHeader struct {
|
||||
Identify [2]byte
|
||||
Length int32
|
||||
Verify byte
|
||||
}
|
||||
|
||||
var rBuf []byte
|
||||
|
||||
func init() {
|
||||
rBuf = make([]byte,1024)
|
||||
}
|
||||
func SendPackage(writer *net.TCPConn,payload []byte,length int) error{
|
||||
var header PackHeader
|
||||
header.Identify[0] = 0x40
|
||||
header.Identify[1] = 0x41
|
||||
header.Length = 4
|
||||
packbuf := bytes.Buffer{}
|
||||
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
e := binary.Write(buf,binary.BigEndian,header.Length)
|
||||
if e != nil{
|
||||
log.Println(e.Error())
|
||||
}
|
||||
|
||||
for _,v := range buf.Bytes(){
|
||||
header.Verify += v
|
||||
}
|
||||
err := binary.Write(&packbuf,binary.BigEndian, header.Identify)
|
||||
err = binary.Write(&packbuf,binary.BigEndian, header.Length)
|
||||
err = binary.Write(&packbuf,binary.BigEndian, header.Verify)
|
||||
if err != nil{
|
||||
log.Println(err.Error())
|
||||
}
|
||||
b := []byte{}
|
||||
|
||||
//log.Println("Send length",wb)
|
||||
b = append(b,packbuf.Bytes()...)
|
||||
b = append(b,payload[0:length]...)
|
||||
log.Println(b)
|
||||
|
||||
_,e = writer.Write(b)
|
||||
if e != nil{
|
||||
log.Println(e.Error())
|
||||
return e
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
for i:=0;i<5000;i++{
|
||||
go func() {
|
||||
seq := int32(0)
|
||||
payload := bytes.NewBuffer([]byte{})
|
||||
server := "127.0.0.1:8052"
|
||||
servAddr, err := net.ResolveTCPAddr("tcp", server)
|
||||
if err != nil{
|
||||
log.Println(err.Error())
|
||||
}
|
||||
conn,err := net.DialTCP("tcp",nil,servAddr)
|
||||
if err != nil{
|
||||
log.Println(err.Error())
|
||||
return
|
||||
}
|
||||
for {
|
||||
seq ++
|
||||
payload.Reset()
|
||||
binary.Write(payload,binary.BigEndian,seq)
|
||||
log.Println("start send",seq)
|
||||
e :=SendPackage(conn,payload.Bytes(),payload.Len())
|
||||
//n ,e := conn.Write([]byte{'1','2','3','4'})
|
||||
|
||||
if e != nil{
|
||||
log.Println("client exit")
|
||||
log.Println(e)
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
log.Println("client exit")
|
||||
}()
|
||||
}
|
||||
for{
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"log"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Type string `json:"type"`
|
||||
ID string `json:"id"`
|
||||
Ipadress string `json:"ipadress"` //format like ip:port
|
||||
Zkserver []string `json:"zkserver"`
|
||||
}
|
||||
var gConf *Config
|
||||
|
||||
func init() {
|
||||
gConf = new(Config)
|
||||
}
|
||||
func Configs() Config {
|
||||
if gConf != nil{
|
||||
return *gConf
|
||||
}else {
|
||||
return Config{
|
||||
ID:"s0",
|
||||
Ipadress:"127.0.0.1",
|
||||
Zkserver:[]string{
|
||||
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
func InitConfig() {
|
||||
file,err := os.Open("TcpServConfig.json")
|
||||
if err != nil{
|
||||
log.Println(err.Error())
|
||||
buf,e := json.Marshal(gConf)
|
||||
if e!=nil{
|
||||
log.Println(e.Error())
|
||||
}
|
||||
ioutil.WriteFile("TcpServConfig.json",buf,os.ModePerm)
|
||||
}
|
||||
ret,err := ioutil.ReadAll(file)
|
||||
err = json.Unmarshal(ret,gConf)
|
||||
if err != nil{
|
||||
log.Println(err.Error())
|
||||
}
|
||||
file.Close()
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"tcpserv/protocol/pb"
|
||||
"log"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"errors"
|
||||
"tcpserv/logger"
|
||||
"net"
|
||||
"tcpserv/network"
|
||||
)
|
||||
|
||||
var (
|
||||
SESSION_LOGIN int32 = 1001
|
||||
SESSION_OFFLINE int32 = 1002
|
||||
ERROR_PB_FORMAT = errors.New("pb payload format error")
|
||||
ERROR_CNN_SEND = errors.New("pb payload format error")
|
||||
)
|
||||
|
||||
func init() {
|
||||
gCallBacks = make(map[int32] func(r protocol.Request,c net.Conn) (protocol.Response,error),100)
|
||||
Register_SeqenceAll()
|
||||
}
|
||||
|
||||
func OnLogin(req protocol.Request,c net.Conn) (protocol.Response,error){
|
||||
var resp protocol.Response
|
||||
resp.LoginResp = &protocol.LoginResp{}
|
||||
log.Println(*req.LoginReq.Channel,*req.LoginReq.Username,*req.LoginReq.Passwd)
|
||||
//r := req.Login
|
||||
return resp,nil
|
||||
}
|
||||
|
||||
func OnOffline( req protocol.Request,c net.Conn) (protocol.Response,error){
|
||||
retok := "ok"
|
||||
var status int32 = 100
|
||||
var resp protocol.Response
|
||||
resp.OfflineResp = &protocol.OfflineResp{}
|
||||
resp.OfflineResp.Message = &retok
|
||||
resp.OfflineResp.Code = &status
|
||||
|
||||
return resp,nil
|
||||
}
|
||||
|
||||
|
||||
var gCallBacks map[int32] func(r protocol.Request,c net.Conn) (protocol.Response,error)
|
||||
|
||||
func DisPatch(r protocol.Request,c net.Conn) error {
|
||||
var sequence int32 = *r.Sequence + 1
|
||||
var session int32 = *r.Session
|
||||
if handle,ok := gCallBacks[*r.Session];ok{
|
||||
resp,e := handle(r,c)
|
||||
if e != nil{
|
||||
log.Println(e.Error())
|
||||
return e
|
||||
}
|
||||
resp.Sequence = &sequence
|
||||
resp.Session = &session
|
||||
b,e := proto.Marshal(&resp)
|
||||
if e != nil{
|
||||
log.Println(e.Error())
|
||||
return ERROR_PB_FORMAT
|
||||
}
|
||||
_,e = c.Write(b)
|
||||
if e != nil{
|
||||
return e;
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func HandlePack(receiever *network.PackageReceiever,c net.Conn) {
|
||||
for{
|
||||
pack,e := receiever.ReadNonBlock()
|
||||
if e !=nil{
|
||||
log.Println(e.Error())
|
||||
return
|
||||
}
|
||||
if pack != nil{
|
||||
log.Println(c.RemoteAddr()," [send] ",pack)
|
||||
var req protocol.Request
|
||||
e := proto.Unmarshal(pack,&req)
|
||||
if e !=nil{
|
||||
logger.LogRealeaseErrorS(e.Error())
|
||||
continue
|
||||
}
|
||||
logger.LogRealeaseInfo(*req.Sequence)
|
||||
logger.LogRealeaseInfo(*req.Session)
|
||||
e = DisPatch(req,c)
|
||||
if e != nil{
|
||||
//LogDebugError([]interface{}{e.Error()})
|
||||
}
|
||||
log.Println("Packet Receiever receieve data ",pack)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Register_SeqenceAll() {
|
||||
gCallBacks[SESSION_LOGIN] = OnLogin
|
||||
gCallBacks[SESSION_OFFLINE] = OnOffline
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
package logger
|
||||
|
||||
import (
|
||||
"log"
|
||||
"runtime"
|
||||
_ "os"
|
||||
)
|
||||
|
||||
func PanicHandler() {
|
||||
if e := recover(); e != nil {
|
||||
for i := 3;i < 5;i++{
|
||||
_,file,line,ok := runtime.Caller(i+1)
|
||||
if ok {
|
||||
log.Println("[DEBUG:PANIC]"," at",file," ",line)
|
||||
}
|
||||
}
|
||||
log.Println("[DEBUG:PANIC] ", e)
|
||||
}
|
||||
}
|
||||
func LogDebugError(c interface{}) {
|
||||
for i:=0;i<1;i++{
|
||||
_,file,line,ok := runtime.Caller(i+1)
|
||||
if ok {
|
||||
log.Println("[DEBUG:ERROR]",file," ",line)
|
||||
}
|
||||
}
|
||||
log.Println("[DEBUG:ERROR]",c)
|
||||
}
|
||||
func LogRealeaseError(c ...[]interface{}) {
|
||||
log.Println("[INFO:ERROR]",c)
|
||||
}
|
||||
func LogRealeaseErrorS(c string) {
|
||||
for i:=0;i<1;i++{
|
||||
_,file,line,ok := runtime.Caller(i+1)
|
||||
if ok {
|
||||
log.Println("[DEBUG:ERROR]",file," ",line)
|
||||
}
|
||||
}
|
||||
log.Println("[DEBUG:ERROR]",c)
|
||||
|
||||
}
|
||||
func LogRealeaseInfo(c ...interface{}) {
|
||||
for i:=0;i<1;i++{
|
||||
_,file,line,ok := runtime.Caller(i+1)
|
||||
if ok {
|
||||
log.Println("[Release:INFO]",file," ",line)
|
||||
}
|
||||
}
|
||||
log.Print("[Release:INFO]",c)
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"tcpserv/logger"
|
||||
"tcpserv/network"
|
||||
"runtime/trace"
|
||||
"os"
|
||||
"tcpserv/handler"
|
||||
)
|
||||
|
||||
func main() {
|
||||
f, err := os.Create("trace.txt")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = trace.Start(f)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer f.Close()
|
||||
defer trace.Stop()
|
||||
network.SetHandler(handler.HandlePack)
|
||||
e := network.ServerRun()
|
||||
|
||||
if nil != e {
|
||||
logger.LogDebugError(e.Error())
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package model
|
||||
|
||||
|
||||
//desgined to adapte sql and nosql like mongo
|
||||
import (
|
||||
_ "github.com/jinzhu/gorm"
|
||||
)
|
||||
|
||||
type ModelAdapter interface {
|
||||
Create(c interface{}) string
|
||||
Update(interface{}) string
|
||||
Read(interface{}) string
|
||||
Delete(interface{}) string
|
||||
}
|
|
@ -0,0 +1,130 @@
|
|||
package network
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"strconv"
|
||||
"log"
|
||||
"net"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
)
|
||||
|
||||
type Connection struct {
|
||||
Type string //连接类型,有websocket,普通的socket
|
||||
Cnntime int64 //连接时间
|
||||
IpAdress string //客户端ip
|
||||
Port int32 //端口号
|
||||
Cnn net.Conn //连接主体代码
|
||||
Business interface{} //业务代码,最好是一个结构体的形式
|
||||
}
|
||||
|
||||
type ConnectionManager struct {
|
||||
allCount int32 //在线连接
|
||||
dayCount int32 //当天在线的用户量,累计的数据
|
||||
connSet map[string] []Connection
|
||||
}
|
||||
|
||||
var gConnMgr *ConnectionManager
|
||||
|
||||
func init() {
|
||||
gConnMgr = new(ConnectionManager)
|
||||
gConnMgr.connSet = make(map[string] []Connection,1)
|
||||
}
|
||||
|
||||
func (this *Connection)SendPackage(payload []byte,length int) error{
|
||||
var header PackHeader
|
||||
header.Identify[0] = 0x40
|
||||
header.Identify[1] = 0x41
|
||||
header.Length = 4
|
||||
packbuf := bytes.Buffer{}
|
||||
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
e := binary.Write(buf,binary.BigEndian,header.Length)
|
||||
if e != nil{
|
||||
log.Println(e.Error())
|
||||
}
|
||||
|
||||
for _,v := range buf.Bytes(){
|
||||
header.Verify += v
|
||||
}
|
||||
//log.Println(header.Verify)
|
||||
err := binary.Write(&packbuf,binary.BigEndian, header.Identify)
|
||||
err = binary.Write(&packbuf,binary.BigEndian, header.Length)
|
||||
err = binary.Write(&packbuf,binary.BigEndian, header.Verify)
|
||||
if err != nil{
|
||||
log.Println(err.Error())
|
||||
}
|
||||
b := []byte{}
|
||||
b = append(b,packbuf.Bytes()...)
|
||||
b = append(b,payload[0:length]...)
|
||||
log.Println(b)
|
||||
_,e = this.Cnn.Write(b)
|
||||
if e != nil{
|
||||
log.Println(e.Error())
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetConnMgr() *ConnectionManager{
|
||||
return gConnMgr
|
||||
}
|
||||
//绑定业务数据代码
|
||||
func (this *Connection)BindBusiness(clientData interface{}) {
|
||||
this.Business = clientData
|
||||
}
|
||||
|
||||
func (this *ConnectionManager)GetCount() int32{
|
||||
return this.allCount
|
||||
}
|
||||
|
||||
func (this *ConnectionManager) AddConn(connection Connection) {
|
||||
this.allCount += 1;
|
||||
if _,ok := this.connSet[connection.IpAdress];ok{
|
||||
this.connSet[connection.IpAdress] = append(this.connSet[connection.IpAdress], connection)
|
||||
}else {
|
||||
this.connSet[connection.IpAdress] = make([]Connection,1)
|
||||
copy(this.connSet[connection.IpAdress],[]Connection{connection})
|
||||
}
|
||||
}
|
||||
|
||||
func (this *ConnectionManager) CheckIfExisted(connection Connection) bool{
|
||||
//判断当前的ip地址段是否已经存在,用于判断例如网游中的多客户端的情况。
|
||||
if _,ok := this.connSet[connection.IpAdress];ok{
|
||||
return true
|
||||
}else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (this *ConnectionManager) DeleteConn(IpAdress string) error{
|
||||
this.allCount -= 1
|
||||
if _,ok := this.connSet[IpAdress];ok{
|
||||
delete( this.connSet,IpAdress)
|
||||
}else {
|
||||
return errors.New("Not Connection")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetIpAddr(ip string) string{
|
||||
s := strings.Split(ip,":")
|
||||
if s!= nil{
|
||||
return s[0];
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func GetPort(ipaddr string) int32 {
|
||||
s := strings.Split(ipaddr,":")
|
||||
if s!= nil{
|
||||
ret,e := strconv.Atoi(s[1])
|
||||
if e != nil{
|
||||
log.Println(e.Error())
|
||||
return -1;
|
||||
}
|
||||
return int32(ret)
|
||||
}
|
||||
return -1
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package network
|
||||
|
||||
|
||||
type ClientMgr struct {
|
||||
clients map[string] *Client
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
Ip string //登陆Ip
|
||||
ID string //用户id
|
||||
UserName string //用户名
|
||||
Password string //密码
|
||||
Channel string //渠道 1 app 2 h5
|
||||
}
|
||||
|
||||
var gClientMgr *ClientMgr
|
||||
|
||||
func init() {
|
||||
gClientMgr = new(ClientMgr)
|
||||
gClientMgr.clients = make(map[string] * Client,100)
|
||||
|
||||
}
|
||||
|
||||
func (this * ClientMgr) GetInstance() *ClientMgr{
|
||||
return gClientMgr
|
||||
}
|
||||
|
||||
func (this * ClientMgr)Insert(c *Client) bool{
|
||||
if _,ok := this.clients[c.ID];ok{
|
||||
return false
|
||||
}
|
||||
this.clients[c.ID] = c
|
||||
return true
|
||||
}
|
||||
|
||||
func (this * ClientMgr) Delete(c *Client) bool{
|
||||
if _,ok := this.clients[c.ID];ok{
|
||||
delete(this.clients,c.ID)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (this * ClientMgr) GetClientById(id string) *Client{
|
||||
if v,ok := this.clients[id];ok{
|
||||
return v
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,195 @@
|
|||
package network
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
_ "bytes"
|
||||
_ "encoding"
|
||||
"encoding/binary"
|
||||
_ "encoding/binary"
|
||||
"errors"
|
||||
_ "fmt"
|
||||
_ "github.com/golang/protobuf/proto"
|
||||
"github.com/hprose/hprose-golang/io"
|
||||
"golang.org/x/net/context"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"tcpserv/logger"
|
||||
"tcpserv/util/const"
|
||||
_ "time"
|
||||
)
|
||||
|
||||
var(
|
||||
ERR_CHAN_CLOSE = errors.New("CHANEL CLOSED")
|
||||
)
|
||||
type PackHeader struct {
|
||||
Identify [2]byte
|
||||
Length int32
|
||||
Verify byte
|
||||
}
|
||||
|
||||
type PackageHandler interface{
|
||||
Read() []byte
|
||||
Write([]byte)
|
||||
}
|
||||
//todo 如果需要接收方多携程,还需要实现一个mutex
|
||||
type PackageReceiever struct {
|
||||
unpack_data []byte //还未组包的数据
|
||||
pack_data []byte
|
||||
pack_len int
|
||||
cache_package chan []byte
|
||||
reader io.Reader
|
||||
PackageHandler
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func (this *PackageReceiever)ReadNonBlock() ([]byte,error){
|
||||
ret,ok := <- this.cache_package
|
||||
if ok != true{
|
||||
return nil,ERR_CHAN_CLOSE
|
||||
}
|
||||
return ret[7:],nil
|
||||
}
|
||||
/*
|
||||
type FrameHeader struct {
|
||||
Segment [4]byte
|
||||
Length int64
|
||||
Verify byte
|
||||
}
|
||||
*/
|
||||
var(
|
||||
ErrPackFormat = errors.New("Error Package format error")
|
||||
ErrorPackTooLong = errors.New("Package too long")
|
||||
ErrorPackVerify = errors.New("Package verify fail")
|
||||
)
|
||||
|
||||
func PackageReceieverFactory() *PackageReceiever{
|
||||
obj := new(PackageReceiever)
|
||||
obj.unpack_data = make([]byte,_const.MAX_PACKAGE_BUFFER * 2)
|
||||
obj.pack_data = make([]byte,_const.MAX_PACKAGE_BUFFER)
|
||||
//最大缓存的包数据
|
||||
obj.cache_package = make(chan []byte,1024)
|
||||
return obj
|
||||
}
|
||||
|
||||
type UserError struct {
|
||||
Err string
|
||||
}
|
||||
func (this *UserError)Error() string{
|
||||
return this.Err
|
||||
}
|
||||
|
||||
func (this *PackageReceiever) Write(input []byte,inputlen int) error {
|
||||
var unsort []byte
|
||||
var unsort_len int
|
||||
pack_start := 0;
|
||||
//log.Println("recv data len:",inputlen)
|
||||
if this.pack_len > 0{
|
||||
//log.Println("unstorage data",this.pack_len)
|
||||
copy(this.pack_data[this.pack_len:],input[0:inputlen])
|
||||
//log.Println(this.pack_data)
|
||||
this.pack_len += inputlen
|
||||
unsort = this.pack_data
|
||||
unsort_len = this.pack_len
|
||||
}else {
|
||||
unsort = input
|
||||
unsort_len = inputlen
|
||||
}
|
||||
//只有当负载数据包大于包头的时候才进行解包
|
||||
if unsort_len > 6 && unsort_len < _const.MAX_PACKAGE_BUFFER{
|
||||
//获取各个包头地址
|
||||
for i := (0);i < unsort_len;{
|
||||
if unsort[0+i] != 0x40 || unsort[1+i]!=0x41{
|
||||
//log.Println("unsort",unsort[0:inputlen],i)
|
||||
return ErrPackFormat
|
||||
}
|
||||
pack_start = i;
|
||||
bpacklen := bytes.NewReader(unsort[i+2:i+6])
|
||||
var packlen int32
|
||||
var verify byte
|
||||
binary.Read(bpacklen, binary.BigEndian, &packlen)
|
||||
for _,v := range unsort[i+2:i+6]{
|
||||
verify += v
|
||||
}
|
||||
if verify != unsort[6]{
|
||||
return ErrorPackVerify
|
||||
}
|
||||
//下个包头的地址
|
||||
//log.Println("packlen:",packlen)
|
||||
i += int(packlen) + 7
|
||||
if i <= unsort_len{
|
||||
this.cache_package <- unsort[pack_start:i]
|
||||
pack_start = i
|
||||
this.pack_len = 0
|
||||
}else {
|
||||
//超过了当前缓冲区的话拷贝到缓存以便下次组包
|
||||
copy(this.pack_data[0:],unsort[pack_start:unsort_len])
|
||||
this.pack_len = unsort_len - pack_start
|
||||
}
|
||||
}
|
||||
}else {
|
||||
return ErrorPackTooLong
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Context struct {
|
||||
context context.Context
|
||||
receiever *PackageReceiever
|
||||
c net.Conn
|
||||
}
|
||||
|
||||
type HandleFunction func(receiever *PackageReceiever,c net.Conn)
|
||||
var gHandler HandleFunction = nil
|
||||
|
||||
func SetHandler(h HandleFunction) {
|
||||
gHandler = h
|
||||
}
|
||||
|
||||
func DefaultHandler(receiver *PackageReceiever,c net.Conn) {
|
||||
for true{
|
||||
pack,e := receiver.ReadNonBlock()
|
||||
if e != nil{
|
||||
log.Println(e.Error())
|
||||
return
|
||||
}
|
||||
logger.LogDebugError(c.RemoteAddr().String() + "[send]" + string(pack))
|
||||
_,e = c.Write(pack)
|
||||
if e != nil{
|
||||
log.Println(e.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func HandleConnection(c Connection) {
|
||||
buf := make([]byte,1024) // 这个1024可以根据你的消息长度来设置
|
||||
pack_receiever := PackageReceieverFactory()
|
||||
pack_receiever.cache_package = make(chan []byte,512)
|
||||
if nil != gHandler{
|
||||
go gHandler(pack_receiever,c.Cnn)
|
||||
}else {
|
||||
go DefaultHandler(pack_receiever,c.Cnn)
|
||||
}
|
||||
for {
|
||||
//var req protocol.Request
|
||||
n, err := c.Cnn.Read(buf) // n为一次Read实际得到的消息长度
|
||||
if err != nil{
|
||||
log.Println(err.Error())
|
||||
c.Cnn.Close()
|
||||
close(pack_receiever.cache_package)
|
||||
GetConnMgr().DeleteConn(c.IpAdress)
|
||||
break
|
||||
}
|
||||
err = pack_receiever.Write(buf,n)
|
||||
//注意,用了chanel不要在同一个携程里面进行插入和取出,以免造成死锁。
|
||||
if err != nil{
|
||||
log.Println(err.Error())
|
||||
//log.Println("conection drop down")
|
||||
c.Cnn.Close()
|
||||
close(pack_receiever.cache_package)
|
||||
GetConnMgr().DeleteConn(c.IpAdress)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,152 @@
|
|||
package network
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
zkdriver "github.com/samuel/go-zookeeper/zk"
|
||||
"golang.org/x/net/websocket"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
"tcpserv/config"
|
||||
"tcpserv/logger"
|
||||
"tcpserv/zk"
|
||||
"time"
|
||||
)
|
||||
|
||||
const TYPE_TCP_SERVER = 1
|
||||
|
||||
type Server struct {
|
||||
servrType int32
|
||||
cMgr ConnectionManager
|
||||
}
|
||||
|
||||
func badCall() {
|
||||
panic("bad end")
|
||||
}
|
||||
|
||||
func (this *Server)Run(c net.Conn){
|
||||
|
||||
}
|
||||
|
||||
func checkError(err error) {
|
||||
if err != nil {
|
||||
for i:=0;i<1;i++{
|
||||
_,file,line,ok := runtime.Caller(i+1)
|
||||
if ok {
|
||||
log.Println("[DEBUG:ERROR",file," ",line)
|
||||
}
|
||||
}
|
||||
log.Println(os.Stderr, "Fatal Error: %s", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func websocketHandler(ws *websocket.Conn){
|
||||
defer logger.PanicHandler()
|
||||
|
||||
var clicnn Connection
|
||||
clicnn.Port = GetPort(ws.RemoteAddr().String())
|
||||
clicnn.IpAdress = GetIpAddr(ws.RemoteAddr().String())
|
||||
clicnn.Cnntime = time.Now().Unix()
|
||||
clicnn.Cnn = ws
|
||||
GetConnMgr().AddConn(clicnn)
|
||||
|
||||
logger.LogRealeaseInfo("accept conn",ws.RemoteAddr().String(),ws.LocalAddr().String())
|
||||
|
||||
go HandleConnection(clicnn)
|
||||
}
|
||||
|
||||
func ServerRun() error{
|
||||
defer logger.PanicHandler()
|
||||
|
||||
config.InitConfig()
|
||||
e := zk.ConnectZooKeeper()
|
||||
if nil != e{
|
||||
logger.LogDebugError(e.Error())
|
||||
}
|
||||
tcpAddr, err := net.ResolveTCPAddr("tcp4", config.Configs().Ipadress)
|
||||
checkError(err)
|
||||
listener, err := net.ListenTCP("tcp4", tcpAddr)
|
||||
checkError(err)
|
||||
go func() {
|
||||
for true{
|
||||
time.Sleep(time.Second * 10)
|
||||
SyncServerStatus()
|
||||
}
|
||||
}()
|
||||
if config.Configs().Type == "websocket"{
|
||||
for{
|
||||
http.Handle("/ws", websocket.Handler(websocketHandler))
|
||||
err := http.ListenAndServe("localhost:8080",nil)
|
||||
if err != nil{
|
||||
log.Println(err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
}else {
|
||||
for {
|
||||
c, err := listener.Accept()
|
||||
var clicnn Connection
|
||||
clicnn.Port = GetPort(c.RemoteAddr().String())
|
||||
clicnn.IpAdress = GetIpAddr(c.RemoteAddr().String())
|
||||
clicnn.Cnntime = time.Now().Unix()
|
||||
clicnn.Cnn = c
|
||||
GetConnMgr().AddConn(clicnn)
|
||||
|
||||
logger.LogRealeaseInfo("accept conn",c.RemoteAddr().String(),c.LocalAddr().String())
|
||||
if err != nil {
|
||||
log.Println(err.Error())
|
||||
continue
|
||||
}
|
||||
go HandleConnection(clicnn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func SyncServerStatus() error{
|
||||
var rs zk.ServerStatus
|
||||
acl := zkdriver.WorldACL(zkdriver.PermAll)
|
||||
|
||||
defer logger.PanicHandler()
|
||||
rs.ServerID = config.Configs().ID
|
||||
rs.CnnCount = GetConnMgr().GetCount()
|
||||
bj,e := json.Marshal(rs)
|
||||
if e != nil{
|
||||
logger.LogDebugError([]interface{}{e.Error()})
|
||||
return e
|
||||
}
|
||||
//log.Println(Configs())
|
||||
ok,_,e := zk.ZkInstance().Exists("/logic/" + config.Configs().ID)
|
||||
if e != nil{
|
||||
logger.LogDebugError([]interface{}{e.Error()})
|
||||
}
|
||||
if !ok{
|
||||
ok1,_,err := zk.ZkInstance().Exists("/logic")
|
||||
if !ok1{
|
||||
path ,err:= zk.ZkInstance().Create("/logic",bytes.NewBufferString(time.Now().String()).Bytes(),0 ,acl)
|
||||
if nil != err{
|
||||
logger.LogDebugError([]interface{}{err.Error()})
|
||||
}
|
||||
logger.LogDebugError([]interface{}{path})
|
||||
}
|
||||
_ ,err = zk.ZkInstance().Create("/logic/" + config.Configs().ID,bj,zkdriver.FlagEphemeral,acl)
|
||||
if nil != err{
|
||||
logger.LogDebugError([]interface{}{err.Error()})
|
||||
}
|
||||
//logger.LogRealeaseInfo([]interface{}{})
|
||||
//log.Println(string(ret))
|
||||
}else {
|
||||
e := zk.ZkInstance().Delete("/logic/" + config.Configs().ID,-1)
|
||||
if e != nil{
|
||||
logger.LogDebugError([]interface{}{e.Error()})
|
||||
}
|
||||
_ ,err := zk.ZkInstance().Create("/logic/" + config.Configs().ID,bj,zkdriver.FlagEphemeral,acl)
|
||||
if nil != err{
|
||||
logger.LogDebugError([]interface{}{err.Error()})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: base.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
/*
|
||||
Package protocol is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
base.proto
|
||||
msgType.proto
|
||||
|
||||
It has these top-level messages:
|
||||
LoginReq
|
||||
OfflineReq
|
||||
LoginResp
|
||||
OfflineResp
|
||||
Request
|
||||
Response
|
||||
*/
|
||||
package protocol
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
|
||||
|
||||
type LoginReq struct {
|
||||
Timestamp *int64 `protobuf:"varint,1,opt,name=timestamp" json:"timestamp,omitempty"`
|
||||
Username *string `protobuf:"bytes,2,opt,name=username" json:"username,omitempty"`
|
||||
Passwd *string `protobuf:"bytes,3,opt,name=passwd" json:"passwd,omitempty"`
|
||||
Channel *string `protobuf:"bytes,4,opt,name=channel" json:"channel,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *LoginReq) Reset() { *m = LoginReq{} }
|
||||
func (m *LoginReq) String() string { return proto.CompactTextString(m) }
|
||||
func (*LoginReq) ProtoMessage() {}
|
||||
func (*LoginReq) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
|
||||
|
||||
func (m *LoginReq) GetTimestamp() int64 {
|
||||
if m != nil && m.Timestamp != nil {
|
||||
return *m.Timestamp
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *LoginReq) GetUsername() string {
|
||||
if m != nil && m.Username != nil {
|
||||
return *m.Username
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *LoginReq) GetPasswd() string {
|
||||
if m != nil && m.Passwd != nil {
|
||||
return *m.Passwd
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *LoginReq) GetChannel() string {
|
||||
if m != nil && m.Channel != nil {
|
||||
return *m.Channel
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type OfflineReq struct {
|
||||
Timestamp *int64 `protobuf:"varint,1,opt,name=timestamp" json:"timestamp,omitempty"`
|
||||
Username *string `protobuf:"bytes,2,opt,name=username" json:"username,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *OfflineReq) Reset() { *m = OfflineReq{} }
|
||||
func (m *OfflineReq) String() string { return proto.CompactTextString(m) }
|
||||
func (*OfflineReq) ProtoMessage() {}
|
||||
func (*OfflineReq) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
|
||||
|
||||
func (m *OfflineReq) GetTimestamp() int64 {
|
||||
if m != nil && m.Timestamp != nil {
|
||||
return *m.Timestamp
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *OfflineReq) GetUsername() string {
|
||||
if m != nil && m.Username != nil {
|
||||
return *m.Username
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type LoginResp struct {
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *LoginResp) Reset() { *m = LoginResp{} }
|
||||
func (m *LoginResp) String() string { return proto.CompactTextString(m) }
|
||||
func (*LoginResp) ProtoMessage() {}
|
||||
func (*LoginResp) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
|
||||
|
||||
type OfflineResp struct {
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *OfflineResp) Reset() { *m = OfflineResp{} }
|
||||
func (m *OfflineResp) String() string { return proto.CompactTextString(m) }
|
||||
func (*OfflineResp) ProtoMessage() {}
|
||||
func (*OfflineResp) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*LoginReq)(nil), "protocol.LoginReq")
|
||||
proto.RegisterType((*OfflineReq)(nil), "protocol.OfflineReq")
|
||||
proto.RegisterType((*LoginResp)(nil), "protocol.LoginResp")
|
||||
proto.RegisterType((*OfflineResp)(nil), "protocol.OfflineResp")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("base.proto", fileDescriptor0) }
|
||||
|
||||
var fileDescriptor0 = []byte{
|
||||
// 158 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4a, 0x4a, 0x2c, 0x4e,
|
||||
0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x00, 0x53, 0xc9, 0xf9, 0x39, 0x4a, 0x65, 0x5c,
|
||||
0x1c, 0x3e, 0xf9, 0xe9, 0x99, 0x79, 0x41, 0xa9, 0x85, 0x42, 0x32, 0x5c, 0x9c, 0x25, 0x99, 0xb9,
|
||||
0xa9, 0xc5, 0x25, 0x89, 0xb9, 0x05, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x08, 0x01, 0x21,
|
||||
0x29, 0x2e, 0x8e, 0xd2, 0xe2, 0xd4, 0xa2, 0xbc, 0xc4, 0xdc, 0x54, 0x09, 0x26, 0xa0, 0x24, 0x67,
|
||||
0x10, 0x9c, 0x2f, 0x24, 0xc6, 0xc5, 0x56, 0x90, 0x58, 0x5c, 0x5c, 0x9e, 0x22, 0xc1, 0x0c, 0x96,
|
||||
0x81, 0xf2, 0x84, 0x24, 0xb8, 0xd8, 0x93, 0x33, 0x12, 0xf3, 0xf2, 0x52, 0x73, 0x24, 0x58, 0xc0,
|
||||
0x12, 0x30, 0xae, 0x92, 0x1b, 0x17, 0x97, 0x7f, 0x5a, 0x5a, 0x4e, 0x66, 0x5e, 0x2a, 0x45, 0x36,
|
||||
0x2b, 0x71, 0x73, 0x71, 0x42, 0xdd, 0x5f, 0x5c, 0xa0, 0xc4, 0xcb, 0xc5, 0x0d, 0x37, 0xb4, 0xb8,
|
||||
0x00, 0x10, 0x00, 0x00, 0xff, 0xff, 0x98, 0x8f, 0x49, 0x5a, 0xf2, 0x00, 0x00, 0x00,
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
package middle
|
|
@ -0,0 +1,129 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: msgType.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
package protocol
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
type Request struct {
|
||||
Session *int32 `protobuf:"varint,1,req,name=session" json:"session,omitempty"`
|
||||
Sequence *int32 `protobuf:"varint,2,req,name=sequence" json:"sequence,omitempty"`
|
||||
Login *LoginReq `protobuf:"bytes,1001,opt,name=login" json:"login,omitempty"`
|
||||
Offline *OfflineReq `protobuf:"bytes,1002,opt,name=offline" json:"offline,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Request) Reset() { *m = Request{} }
|
||||
func (m *Request) String() string { return proto.CompactTextString(m) }
|
||||
func (*Request) ProtoMessage() {}
|
||||
func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} }
|
||||
|
||||
func (m *Request) GetSession() int32 {
|
||||
if m != nil && m.Session != nil {
|
||||
return *m.Session
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Request) GetSequence() int32 {
|
||||
if m != nil && m.Sequence != nil {
|
||||
return *m.Sequence
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Request) GetLogin() *LoginReq {
|
||||
if m != nil {
|
||||
return m.Login
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Request) GetOffline() *OfflineReq {
|
||||
if m != nil {
|
||||
return m.Offline
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Session *int32 `protobuf:"varint,1,req,name=session" json:"session,omitempty"`
|
||||
Sequence *int32 `protobuf:"varint,2,req,name=sequence" json:"sequence,omitempty"`
|
||||
Message *string `protobuf:"bytes,3,req,name=message" json:"message,omitempty"`
|
||||
RespLogin *LoginResp `protobuf:"bytes,1001,opt,name=respLogin" json:"respLogin,omitempty"`
|
||||
RespOffline *OfflineResp `protobuf:"bytes,1002,opt,name=respOffline" json:"respOffline,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Response) Reset() { *m = Response{} }
|
||||
func (m *Response) String() string { return proto.CompactTextString(m) }
|
||||
func (*Response) ProtoMessage() {}
|
||||
func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} }
|
||||
|
||||
func (m *Response) GetSession() int32 {
|
||||
if m != nil && m.Session != nil {
|
||||
return *m.Session
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Response) GetSequence() int32 {
|
||||
if m != nil && m.Sequence != nil {
|
||||
return *m.Sequence
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Response) GetMessage() string {
|
||||
if m != nil && m.Message != nil {
|
||||
return *m.Message
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Response) GetRespLogin() *LoginResp {
|
||||
if m != nil {
|
||||
return m.RespLogin
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Response) GetRespOffline() *OfflineResp {
|
||||
if m != nil {
|
||||
return m.RespOffline
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*Request)(nil), "protocol.Request")
|
||||
proto.RegisterType((*Response)(nil), "protocol.Response")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("msgType.proto", fileDescriptor1) }
|
||||
|
||||
var fileDescriptor1 = []byte{
|
||||
// 222 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xcd, 0x2d, 0x4e, 0x0f,
|
||||
0xa9, 0x2c, 0x48, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x00, 0x53, 0xc9, 0xf9, 0x39,
|
||||
0x52, 0x5c, 0x49, 0x89, 0xc5, 0x50, 0x51, 0xa5, 0xd9, 0x8c, 0x5c, 0xec, 0x41, 0xa9, 0x85, 0xa5,
|
||||
0xa9, 0xc5, 0x25, 0x42, 0x12, 0x5c, 0xec, 0xc5, 0xa9, 0xc5, 0xc5, 0x99, 0xf9, 0x79, 0x12, 0x8c,
|
||||
0x0a, 0x4c, 0x1a, 0xac, 0x41, 0x30, 0xae, 0x90, 0x14, 0x17, 0x47, 0x31, 0x48, 0x51, 0x5e, 0x72,
|
||||
0xaa, 0x04, 0x13, 0x58, 0x0a, 0xce, 0x17, 0xd2, 0xe4, 0x62, 0xcd, 0xc9, 0x4f, 0xcf, 0xcc, 0x93,
|
||||
0x78, 0xc9, 0xae, 0xc0, 0xa8, 0xc1, 0x6d, 0x24, 0xa4, 0x07, 0xb3, 0x48, 0xcf, 0x07, 0x24, 0x0e,
|
||||
0x34, 0x3d, 0x08, 0xa2, 0x42, 0x48, 0x9f, 0x8b, 0x3d, 0x3f, 0x2d, 0x2d, 0x27, 0x33, 0x2f, 0x55,
|
||||
0xe2, 0x15, 0x44, 0xb1, 0x08, 0x42, 0xb1, 0x3f, 0x44, 0x06, 0xa4, 0x1c, 0xa6, 0x4a, 0xe9, 0x04,
|
||||
0x23, 0x17, 0x47, 0x50, 0x6a, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0x2a, 0x99, 0xce, 0x03, 0xea, 0xca,
|
||||
0x05, 0x2a, 0x4b, 0x4c, 0x4f, 0x95, 0x60, 0x06, 0x4a, 0x71, 0x06, 0xc1, 0xb8, 0x42, 0x46, 0x5c,
|
||||
0x9c, 0x45, 0x40, 0xb3, 0x7d, 0x90, 0x1d, 0x2f, 0x8c, 0xe1, 0xf8, 0xe2, 0x82, 0x20, 0x84, 0x32,
|
||||
0x21, 0x0b, 0x2e, 0x6e, 0x10, 0xc7, 0x1f, 0xd5, 0x17, 0xa2, 0x58, 0x7c, 0x01, 0xd4, 0x87, 0xac,
|
||||
0x14, 0x10, 0x00, 0x00, 0xff, 0xff, 0x15, 0x0d, 0x55, 0x5d, 0x8e, 0x01, 0x00, 0x00,
|
||||
}
|
|
@ -0,0 +1,168 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: base.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
/*
|
||||
Package protocol is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
base.proto
|
||||
msgType.proto
|
||||
|
||||
It has these top-level messages:
|
||||
LoginReq
|
||||
LoginResp
|
||||
OfflineReq
|
||||
OfflineResp
|
||||
Request
|
||||
Response
|
||||
*/
|
||||
package protocol
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
|
||||
|
||||
type LoginReq struct {
|
||||
Timestamp *int64 `protobuf:"varint,1,opt,name=timestamp" json:"timestamp,omitempty"`
|
||||
Username *string `protobuf:"bytes,2,opt,name=username" json:"username,omitempty"`
|
||||
Passwd *string `protobuf:"bytes,3,opt,name=passwd" json:"passwd,omitempty"`
|
||||
Channel *string `protobuf:"bytes,4,opt,name=channel" json:"channel,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *LoginReq) Reset() { *m = LoginReq{} }
|
||||
func (m *LoginReq) String() string { return proto.CompactTextString(m) }
|
||||
func (*LoginReq) ProtoMessage() {}
|
||||
func (*LoginReq) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
|
||||
|
||||
func (m *LoginReq) GetTimestamp() int64 {
|
||||
if m != nil && m.Timestamp != nil {
|
||||
return *m.Timestamp
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *LoginReq) GetUsername() string {
|
||||
if m != nil && m.Username != nil {
|
||||
return *m.Username
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *LoginReq) GetPasswd() string {
|
||||
if m != nil && m.Passwd != nil {
|
||||
return *m.Passwd
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *LoginReq) GetChannel() string {
|
||||
if m != nil && m.Channel != nil {
|
||||
return *m.Channel
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type LoginResp struct {
|
||||
Message *string `protobuf:"bytes,1,req,name=Message" json:"Message,omitempty"`
|
||||
Code *int32 `protobuf:"varint,2,req,name=Code" json:"Code,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *LoginResp) Reset() { *m = LoginResp{} }
|
||||
func (m *LoginResp) String() string { return proto.CompactTextString(m) }
|
||||
func (*LoginResp) ProtoMessage() {}
|
||||
func (*LoginResp) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
|
||||
|
||||
func (m *LoginResp) GetMessage() string {
|
||||
if m != nil && m.Message != nil {
|
||||
return *m.Message
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *LoginResp) GetCode() int32 {
|
||||
if m != nil && m.Code != nil {
|
||||
return *m.Code
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type OfflineReq struct {
|
||||
Username *string `protobuf:"bytes,2,req,name=username" json:"username,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *OfflineReq) Reset() { *m = OfflineReq{} }
|
||||
func (m *OfflineReq) String() string { return proto.CompactTextString(m) }
|
||||
func (*OfflineReq) ProtoMessage() {}
|
||||
func (*OfflineReq) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
|
||||
|
||||
func (m *OfflineReq) GetUsername() string {
|
||||
if m != nil && m.Username != nil {
|
||||
return *m.Username
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type OfflineResp struct {
|
||||
Message *string `protobuf:"bytes,1,req,name=Message" json:"Message,omitempty"`
|
||||
Code *int32 `protobuf:"varint,2,req,name=Code" json:"Code,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *OfflineResp) Reset() { *m = OfflineResp{} }
|
||||
func (m *OfflineResp) String() string { return proto.CompactTextString(m) }
|
||||
func (*OfflineResp) ProtoMessage() {}
|
||||
func (*OfflineResp) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
|
||||
|
||||
func (m *OfflineResp) GetMessage() string {
|
||||
if m != nil && m.Message != nil {
|
||||
return *m.Message
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *OfflineResp) GetCode() int32 {
|
||||
if m != nil && m.Code != nil {
|
||||
return *m.Code
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*LoginReq)(nil), "protocol.LoginReq")
|
||||
proto.RegisterType((*LoginResp)(nil), "protocol.LoginResp")
|
||||
proto.RegisterType((*OfflineReq)(nil), "protocol.OfflineReq")
|
||||
proto.RegisterType((*OfflineResp)(nil), "protocol.OfflineResp")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("base.proto", fileDescriptor0) }
|
||||
|
||||
var fileDescriptor0 = []byte{
|
||||
// 192 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x8e, 0x31, 0xaf, 0x82, 0x30,
|
||||
0x14, 0x85, 0x43, 0xe1, 0xbd, 0x07, 0xf7, 0x6d, 0x1d, 0x5e, 0x9a, 0x17, 0x07, 0xd3, 0x89, 0xc9,
|
||||
0xdd, 0x38, 0xba, 0x6a, 0x4c, 0xfa, 0x0f, 0x2a, 0x5c, 0x90, 0x04, 0xda, 0xca, 0x45, 0xfd, 0xfb,
|
||||
0xd2, 0x06, 0x51, 0x47, 0xa7, 0xf6, 0xbb, 0xe7, 0x24, 0xdf, 0x01, 0x38, 0x6a, 0xc2, 0x95, 0xeb,
|
||||
0xed, 0x60, 0x79, 0x1a, 0x9e, 0xc2, 0xb6, 0xf2, 0x0a, 0xe9, 0xce, 0xd6, 0x8d, 0x51, 0x78, 0xe6,
|
||||
0x0b, 0xc8, 0x86, 0xa6, 0x43, 0x1a, 0x74, 0xe7, 0x44, 0xb4, 0x8c, 0xf2, 0x58, 0x3d, 0x0f, 0xfc,
|
||||
0x1f, 0xd2, 0x0b, 0x61, 0x6f, 0x74, 0x87, 0x82, 0x8d, 0x61, 0xa6, 0x66, 0xe6, 0x7f, 0xf0, 0xed,
|
||||
0x34, 0xd1, 0xad, 0x14, 0x71, 0x48, 0x26, 0xe2, 0x02, 0x7e, 0x8a, 0x93, 0x36, 0x06, 0x5b, 0x91,
|
||||
0x84, 0xe0, 0x81, 0x72, 0x0d, 0xd9, 0xe4, 0x25, 0xe7, 0x6b, 0x7b, 0x24, 0xd2, 0x35, 0x8e, 0x5a,
|
||||
0xe6, 0x6b, 0x13, 0x72, 0x0e, 0xc9, 0xd6, 0x96, 0x5e, 0xc8, 0xf2, 0x2f, 0x15, 0xfe, 0x32, 0x07,
|
||||
0x38, 0x54, 0x55, 0xdb, 0x18, 0xf4, 0xa3, 0xdf, 0x67, 0xb1, 0xd7, 0x59, 0x72, 0x03, 0xbf, 0x73,
|
||||
0xf3, 0x53, 0xcd, 0x3d, 0x00, 0x00, 0xff, 0xff, 0x29, 0x47, 0x03, 0x1e, 0x30, 0x01, 0x00, 0x00,
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package protocol;
|
||||
|
||||
message LoginReq{
|
||||
optional int64 timestamp =1 ;
|
||||
optional string username = 2;
|
||||
optional string passwd = 3;
|
||||
optional string channel = 4;
|
||||
}
|
||||
|
||||
message OfflineReq {
|
||||
optional int64 timestamp =1 ;
|
||||
optional string username = 2;
|
||||
}
|
|
@ -0,0 +1,2 @@
|
|||
protoc.exe --plugin=protoc-gen-go=./protoc-gen-go.exe --go_out=./ *.proto
|
||||
pause
|
|
@ -0,0 +1,121 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: msgType.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
package protocol
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
type Request struct {
|
||||
Session *int32 `protobuf:"varint,1,req,name=session" json:"session,omitempty"`
|
||||
Sequence *int32 `protobuf:"varint,2,req,name=sequence" json:"sequence,omitempty"`
|
||||
LoginReq *LoginReq `protobuf:"bytes,1001,opt,name=loginReq" json:"loginReq,omitempty"`
|
||||
OfflineReq *OfflineReq `protobuf:"bytes,1002,opt,name=offlineReq" json:"offlineReq,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Request) Reset() { *m = Request{} }
|
||||
func (m *Request) String() string { return proto.CompactTextString(m) }
|
||||
func (*Request) ProtoMessage() {}
|
||||
func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{0} }
|
||||
|
||||
func (m *Request) GetSession() int32 {
|
||||
if m != nil && m.Session != nil {
|
||||
return *m.Session
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Request) GetSequence() int32 {
|
||||
if m != nil && m.Sequence != nil {
|
||||
return *m.Sequence
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Request) GetLoginReq() *LoginReq {
|
||||
if m != nil {
|
||||
return m.LoginReq
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Request) GetOfflineReq() *OfflineReq {
|
||||
if m != nil {
|
||||
return m.OfflineReq
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Session *int32 `protobuf:"varint,1,req,name=session" json:"session,omitempty"`
|
||||
Sequence *int32 `protobuf:"varint,2,req,name=sequence" json:"sequence,omitempty"`
|
||||
LoginResp *LoginResp `protobuf:"bytes,3,opt,name=loginResp" json:"loginResp,omitempty"`
|
||||
OfflineResp *OfflineResp `protobuf:"bytes,4,opt,name=offlineResp" json:"offlineResp,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Response) Reset() { *m = Response{} }
|
||||
func (m *Response) String() string { return proto.CompactTextString(m) }
|
||||
func (*Response) ProtoMessage() {}
|
||||
func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor1, []int{1} }
|
||||
|
||||
func (m *Response) GetSession() int32 {
|
||||
if m != nil && m.Session != nil {
|
||||
return *m.Session
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Response) GetSequence() int32 {
|
||||
if m != nil && m.Sequence != nil {
|
||||
return *m.Sequence
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Response) GetLoginResp() *LoginResp {
|
||||
if m != nil {
|
||||
return m.LoginResp
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Response) GetOfflineResp() *OfflineResp {
|
||||
if m != nil {
|
||||
return m.OfflineResp
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*Request)(nil), "protocol.Request")
|
||||
proto.RegisterType((*Response)(nil), "protocol.Response")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("msgType.proto", fileDescriptor1) }
|
||||
|
||||
var fileDescriptor1 = []byte{
|
||||
// 210 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xcd, 0x2d, 0x4e, 0x0f,
|
||||
0xa9, 0x2c, 0x48, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x00, 0x53, 0xc9, 0xf9, 0x39,
|
||||
0x52, 0x5c, 0x49, 0x89, 0xc5, 0x50, 0x51, 0xa5, 0xe5, 0x8c, 0x5c, 0xec, 0x41, 0xa9, 0x85, 0xa5,
|
||||
0xa9, 0xc5, 0x25, 0x42, 0x12, 0x5c, 0xec, 0xc5, 0xa9, 0xc5, 0xc5, 0x99, 0xf9, 0x79, 0x12, 0x8c,
|
||||
0x0a, 0x4c, 0x1a, 0xac, 0x41, 0x30, 0xae, 0x90, 0x14, 0x17, 0x47, 0x31, 0x48, 0x51, 0x5e, 0x72,
|
||||
0xaa, 0x04, 0x13, 0x58, 0x0a, 0xce, 0x17, 0xd2, 0xe7, 0xe2, 0xc8, 0xc9, 0x4f, 0xcf, 0xcc, 0x03,
|
||||
0x9a, 0x22, 0xf1, 0x92, 0x5d, 0x81, 0x51, 0x83, 0xdb, 0x48, 0x48, 0x0f, 0x66, 0x97, 0x9e, 0x0f,
|
||||
0x54, 0x2a, 0x08, 0xae, 0x48, 0xc8, 0x94, 0x8b, 0x2b, 0x3f, 0x2d, 0x2d, 0x27, 0x33, 0x2f, 0x15,
|
||||
0xa4, 0xe5, 0x15, 0x44, 0x8b, 0x08, 0x42, 0x8b, 0x3f, 0x5c, 0x32, 0x08, 0x49, 0xa1, 0xd2, 0x1a,
|
||||
0x46, 0x2e, 0x8e, 0xa0, 0xd4, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x32, 0x9d, 0x6a, 0xc8, 0xc5,
|
||||
0x09, 0x75, 0x45, 0x71, 0x81, 0x04, 0x33, 0xd8, 0x5e, 0x61, 0x0c, 0xa7, 0x16, 0x17, 0x04, 0x21,
|
||||
0x54, 0x09, 0x99, 0x73, 0x71, 0xc3, 0xdd, 0x00, 0xd4, 0xc4, 0x02, 0xd6, 0x24, 0x8a, 0xc5, 0xb1,
|
||||
0x40, 0x6d, 0xc8, 0x2a, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe2, 0xe8, 0xbd, 0x08, 0x7e, 0x01,
|
||||
0x00, 0x00,
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package protocol;
|
||||
import "base.proto";
|
||||
|
||||
|
||||
message Request{//请求类型消息
|
||||
optional int32 session = 1; //消息序列
|
||||
optional int32 sequence = 2; //消息序号
|
||||
|
||||
optional LoginReq login = 1001;
|
||||
optional OfflineReq offline = 1002;
|
||||
}
|
||||
|
||||
message Response{//相应类型消息
|
||||
optional int32 session = 1; //消息序列
|
||||
optional int32 sequence = 2; //消息序号
|
||||
|
||||
}
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,13 @@
|
|||
package _const
|
||||
|
||||
type CommonErr struct {
|
||||
Err string
|
||||
}
|
||||
|
||||
func (this *CommonErr) Error() string {
|
||||
return this.Err
|
||||
}
|
||||
|
||||
const Seq_Login = 0x0001
|
||||
const Seq_UnLogin = 0x0002
|
||||
const MAX_PACKAGE_BUFFER = 4096
|
|
@ -0,0 +1,34 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
)
|
||||
var lock chan bool
|
||||
var ri int
|
||||
|
||||
func init() {
|
||||
lock = make(chan bool,1)
|
||||
}
|
||||
/*
|
||||
function:轮询所有的调度
|
||||
author:
|
||||
date:
|
||||
in para:
|
||||
out para:
|
||||
*/
|
||||
func Round_robin(path []string) int64{
|
||||
len := len(path)
|
||||
|
||||
lck := <- lock
|
||||
i := ri%len
|
||||
if(i ==0){
|
||||
ri = 0
|
||||
}
|
||||
ri++
|
||||
lock <- lck
|
||||
if len!= 0{
|
||||
rnd := rand.New(rand.NewSource(int64(i)))
|
||||
return int64(rnd.Int())
|
||||
}
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"golang.org/x/net/websocket"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func echoHandler(ws *websocket.Conn) {
|
||||
msg := make([]byte, 512)
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
fmt.Println(err) //这里的err其实就是panic传入的内容,55
|
||||
}
|
||||
}()
|
||||
for {
|
||||
n, err := ws.Read(msg)
|
||||
if err != nil {
|
||||
log.Println(err.Error())
|
||||
return
|
||||
}
|
||||
fmt.Printf("Receive: %s\n", msg[:n])
|
||||
|
||||
send_msg := "[" + string(msg[:n]) + "]"
|
||||
m, err := ws.Write([]byte(send_msg))
|
||||
if err != nil {
|
||||
log.Println(err.Error())
|
||||
return
|
||||
}
|
||||
fmt.Printf("Send: %s\n", msg[:m])
|
||||
}
|
||||
}
|
||||
|
||||
var staticfs = http.FileServer(http.Dir("E:\\"))
|
||||
|
||||
//这里可以自行定义安全策略
|
||||
func static(w http.ResponseWriter, r *http.Request) {
|
||||
//fmt.Printf("访问静态文件:%s\n", r.URL.Path)
|
||||
old := r.URL.Path
|
||||
r.URL.Path = strings.Replace(old, "/file/", "", 1)
|
||||
staticfs.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func main() {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
fmt.Println(err) //这里的err其实就是panic传入的内容,55
|
||||
}
|
||||
}()
|
||||
|
||||
http.Handle("/echo", websocket.Handler(echoHandler))
|
||||
http.HandleFunc("/file/", static)
|
||||
|
||||
err := http.ListenAndServe("localhost:8080",nil)
|
||||
|
||||
if err != nil {
|
||||
panic("ListenAndServe: " + err.Error())
|
||||
}
|
||||
}
|
|
@ -0,0 +1,139 @@
|
|||
package zk
|
||||
|
||||
import (
|
||||
"github.com/samuel/go-zookeeper/zk"
|
||||
"errors"
|
||||
"time"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type ZookeeperConfig struct {
|
||||
Servers []string
|
||||
RootPath string
|
||||
MasterPath string
|
||||
}
|
||||
|
||||
type ElectionManager struct {
|
||||
ZKClientConn *zk.Conn
|
||||
ZKConfig *ZookeeperConfig
|
||||
IsMasterQ chan bool
|
||||
}
|
||||
|
||||
func NewElectionManager(zkConfig *ZookeeperConfig, isMasterQ chan bool) *ElectionManager {
|
||||
electionManager := &ElectionManager{
|
||||
nil,
|
||||
zkConfig,
|
||||
isMasterQ,
|
||||
}
|
||||
electionManager.initConnection()
|
||||
return electionManager
|
||||
}
|
||||
|
||||
func (electionManager *ElectionManager) Run() {
|
||||
err := electionManager.electMaster()
|
||||
if err != nil {
|
||||
fmt.Println("elect master error, ", err)
|
||||
}
|
||||
electionManager.watchMaster()
|
||||
}
|
||||
|
||||
// 判断是否成功连接到zookeeper
|
||||
func (electionManager *ElectionManager) isConnected() bool {
|
||||
if electionManager.ZKClientConn == nil {
|
||||
return false
|
||||
} else if electionManager.ZKClientConn.State() != zk.StateConnected {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// 初始化zookeeper连接
|
||||
func (electionManager *ElectionManager) initConnection() error {
|
||||
// 连接为空,或连接不成功,获取zookeeper服务器的连接
|
||||
if !electionManager.isConnected() {
|
||||
conn, connChan, err := zk.Connect(electionManager.ZKConfig.Servers, time.Second)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 等待连接成功
|
||||
for {
|
||||
isConnected := false
|
||||
select {
|
||||
case connEvent := <-connChan:
|
||||
if connEvent.State == zk.StateConnected {
|
||||
isConnected = true
|
||||
fmt.Println("connect to zookeeper server success!")
|
||||
}
|
||||
case _ = <-time.After(time.Second * 3): // 3秒仍未连接成功则返回连接超时
|
||||
return errors.New("connect to zookeeper server timeout!")
|
||||
}
|
||||
if isConnected {
|
||||
break
|
||||
}
|
||||
}
|
||||
electionManager.ZKClientConn = conn
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 选举master
|
||||
func (electionManager *ElectionManager) electMaster() error {
|
||||
err := electionManager.initConnection()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 判断zookeeper中是否存在root目录,不存在则创建该目录
|
||||
isExist, _, err := electionManager.ZKClientConn.Exists(electionManager.ZKConfig.RootPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !isExist {
|
||||
path, err := electionManager.ZKClientConn.Create(electionManager.ZKConfig.RootPath, nil, 0, zk.WorldACL(zk.PermAll))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if electionManager.ZKConfig.RootPath != path {
|
||||
return errors.New("Create returned different path " + electionManager.ZKConfig.RootPath + " != " + path)
|
||||
}
|
||||
}
|
||||
|
||||
// 创建用于选举master的ZNode,该节点为Ephemeral类型,表示客户端连接断开后,其创建的节点也会被销毁
|
||||
masterPath := electionManager.ZKConfig.RootPath + electionManager.ZKConfig.MasterPath
|
||||
path, err := electionManager.ZKClientConn.Create(masterPath, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
|
||||
if err == nil { // 创建成功表示选举master成功
|
||||
if path == masterPath {
|
||||
fmt.Println("elect master success!")
|
||||
electionManager.IsMasterQ <- true
|
||||
} else {
|
||||
return errors.New("Create returned different path " + masterPath + " != " + path)
|
||||
}
|
||||
} else { // 创建失败表示选举master失败
|
||||
fmt.Printf("elect master failure, ", err)
|
||||
electionManager.IsMasterQ <- false
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 监听zookeeper中master znode,若被删除,表示master故障或网络迟缓,重新选举
|
||||
func (electionManager *ElectionManager) watchMaster() {
|
||||
// watch zk根znode下面的子znode,当有连接断开时,对应znode被删除,触发事件后重新选举
|
||||
children, state, childCh, err := electionManager.ZKClientConn.ChildrenW(electionManager.ZKConfig.RootPath + electionManager.ZKConfig.MasterPath)
|
||||
if err != nil {
|
||||
fmt.Println("watch children error, ", err)
|
||||
}
|
||||
fmt.Println("watch children result, ", children, state)
|
||||
for {
|
||||
select {
|
||||
case childEvent := <-childCh:
|
||||
if childEvent.Type == zk.EventNodeDeleted {
|
||||
fmt.Println("receive znode delete event, ", childEvent)
|
||||
// 重新选举
|
||||
fmt.Println("start elect new master ...")
|
||||
err = electionManager.electMaster()
|
||||
if err != nil {
|
||||
fmt.Println("elect new master error, ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package zk
|
||||
import (
|
||||
zk "github.com/samuel/go-zookeeper/zk"
|
||||
"time"
|
||||
"log"
|
||||
"os"
|
||||
_ "strings"
|
||||
_ "bytes"
|
||||
"tcpserv/config"
|
||||
)
|
||||
|
||||
var g_zkCnn *zk.Conn
|
||||
|
||||
type ServerStatus struct {
|
||||
CnnCount int32 `json:"cnn_count"`//当前在线连接数目
|
||||
ServerID string `json:"server_id"`//serverID
|
||||
}
|
||||
|
||||
func ConnectZooKeeper() error {
|
||||
var e error
|
||||
var evChan <-chan zk.Event
|
||||
log.Println(config.Configs().Zkserver)
|
||||
g_zkCnn,evChan,e = zk.Connect(config.Configs().Zkserver,time.Second)
|
||||
if nil != e{
|
||||
log.Println("",e.Error())
|
||||
return e
|
||||
}
|
||||
if evChan == nil{
|
||||
log.Println("")
|
||||
os.Exit(1)
|
||||
}
|
||||
if g_zkCnn == nil{
|
||||
log.Println("[ERROR] could not connect zookeeper")
|
||||
os.Exit(1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ZkInstance() *zk.Conn{
|
||||
return g_zkCnn
|
||||
}
|
Loading…
Reference in New Issue