242 lines
5.6 KiB
Go
242 lines
5.6 KiB
Go
package db
|
|
|
|
import (
|
|
"background/utils"
|
|
"encoding/json"
|
|
"reflect"
|
|
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/net/context"
|
|
"gopkg.in/olivere/elastic.v7"
|
|
"qiniupkg.com/x/log.v7"
|
|
)
|
|
|
|
const (
|
|
ERROR_PTR = "null pointer error"
|
|
INPUT_TYPE_ERROR = "wrong input parameter: "
|
|
CREATED_ERROR = "create error"
|
|
DELETE_ERROR = "delete error"
|
|
INDEX_EXISTED = "index existed"
|
|
)
|
|
|
|
type ElkEngine struct {
|
|
cli *elastic.Client
|
|
}
|
|
|
|
func (p *ElkEngine) Create(index string, types string, id string, data interface{}) error {
|
|
if nil != p {
|
|
if (reflect.TypeOf(data).Kind() == reflect.String) || (reflect.TypeOf(data).Kind() == reflect.Struct) {
|
|
resp, err := p.cli.Index().
|
|
Index(index).
|
|
BodyJson(data).
|
|
Do(context.Background())
|
|
if err != nil {
|
|
log.Print("create error", err)
|
|
return err
|
|
}
|
|
log.Print(resp)
|
|
} else {
|
|
log.Print(reflect.TypeOf(data).Kind())
|
|
return errors.New(INPUT_TYPE_ERROR)
|
|
}
|
|
} else {
|
|
return errors.New(ERROR_PTR)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *ElkEngine) Delete(query elastic.Query, index string) error {
|
|
if nil != p {
|
|
_, err := p.cli.DeleteByQuery().Index(index).Query(query).
|
|
Do(context.Background())
|
|
if err != nil {
|
|
log.Print(err)
|
|
return err
|
|
}
|
|
|
|
} else {
|
|
return errors.New(ERROR_PTR)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *ElkEngine) Query(index string, query elastic.Query, v interface{},
|
|
limit int, offset int) ([]string, error) {
|
|
|
|
if reflect.ValueOf(v).Kind() != reflect.Ptr {
|
|
return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Ptr")
|
|
}
|
|
if reflect.ValueOf(v).Elem().Kind() != reflect.Slice {
|
|
return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Slice")
|
|
}
|
|
if reflect.ValueOf(v).Elem().Type().Elem().Kind() != reflect.Struct {
|
|
return nil, errors.New(INPUT_TYPE_ERROR + "shoulbe be Struct")
|
|
}
|
|
eletype := reflect.ValueOf(v).Elem().Type().Elem()
|
|
obj := reflect.ValueOf(v).Elem()
|
|
objAdd := make([]reflect.Value, 0)
|
|
|
|
if nil != p {
|
|
if limit == 0 {
|
|
res, err := p.cli.Search(index).Query(query).Do(context.Background())
|
|
if err != nil {
|
|
print(err)
|
|
return nil, err
|
|
}
|
|
|
|
id := []string{}
|
|
for _, vs := range res.Hits.Hits {
|
|
id = append(id, vs.Id)
|
|
data, e := vs.Source.MarshalJSON()
|
|
if nil != e {
|
|
log.Print(e.Error())
|
|
}
|
|
mapobj := map[string]interface{}{}
|
|
e = json.Unmarshal(data, &mapobj)
|
|
if nil != e {
|
|
log.Print(e.Error())
|
|
}
|
|
obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj)
|
|
log.Print(obj)
|
|
if nil != e {
|
|
log.Print(e.Error())
|
|
}
|
|
objAdd = append(objAdd, reflect.ValueOf(obj))
|
|
}
|
|
return id, nil
|
|
} else {
|
|
res, err := p.cli.Search(index).Query(query).Size(limit).From(limit * offset).Do(context.Background())
|
|
if err != nil {
|
|
print(err)
|
|
return nil, err
|
|
}
|
|
id := []string{}
|
|
for _, vs := range res.Hits.Hits {
|
|
id = append(id, vs.Id)
|
|
data, e := vs.Source.MarshalJSON()
|
|
if nil != e {
|
|
log.Print(e.Error())
|
|
}
|
|
mapobj := map[string]interface{}{}
|
|
e = json.Unmarshal(data, &mapobj)
|
|
if nil != e {
|
|
log.Print(e.Error())
|
|
}
|
|
obj, e := utils.UnmarshalJson2StructGen(eletype, mapobj)
|
|
if nil != e {
|
|
log.Print(e.Error())
|
|
}
|
|
objAdd = append(objAdd, reflect.ValueOf(obj))
|
|
}
|
|
addOp := reflect.Append(obj, objAdd...)
|
|
obj.Set(addOp)
|
|
return id, nil
|
|
}
|
|
} else {
|
|
return nil, errors.New(ERROR_PTR)
|
|
}
|
|
}
|
|
|
|
func (p *ElkEngine) QueryGen(index string, query elastic.Query, typ reflect.Type,
|
|
limit int, offset int) ([]interface{}, []string, error) {
|
|
rets := []interface{}{}
|
|
if nil != p {
|
|
if limit == 0 {
|
|
res, err := p.cli.Search(index).Query(query).Do(context.Background())
|
|
if err != nil {
|
|
print(err)
|
|
return nil, nil, err
|
|
}
|
|
|
|
id := []string{}
|
|
for _, vs := range res.Hits.Hits {
|
|
id = append(id, vs.Id)
|
|
}
|
|
return rets, id, nil
|
|
} else {
|
|
res, err := p.cli.Search(index).Query(query).Size(limit).From(limit * offset).Do(context.Background())
|
|
if err != nil {
|
|
print(err)
|
|
return nil, nil, err
|
|
}
|
|
id := []string{}
|
|
for _, vs := range res.Hits.Hits {
|
|
id = append(id, vs.Id)
|
|
data, e := vs.Source.MarshalJSON()
|
|
if nil != e {
|
|
log.Print(e.Error())
|
|
}
|
|
mapobj := map[string]interface{}{}
|
|
e = json.Unmarshal(data, &mapobj)
|
|
if nil != e {
|
|
log.Print(e.Error())
|
|
}
|
|
obj, e := utils.UnmarshalJson2StructGen(typ, mapobj)
|
|
if nil != e {
|
|
log.Print(e.Error())
|
|
}
|
|
rets = append(rets, obj)
|
|
}
|
|
return rets, id, nil
|
|
}
|
|
} else {
|
|
return nil, nil, errors.New(ERROR_PTR)
|
|
}
|
|
}
|
|
|
|
func (p *ElkEngine) Update(index string, types string, id string, data map[string]interface{}) error {
|
|
if nil != p {
|
|
_, err := p.cli.Update().
|
|
Index(index).
|
|
Id(id).
|
|
Doc(data).
|
|
Do(context.Background())
|
|
if err != nil {
|
|
println(err.Error())
|
|
return err
|
|
}
|
|
}
|
|
return errors.New(ERROR_PTR)
|
|
}
|
|
|
|
func (p *ElkEngine) CreateIndex(index string, typemaping string) error {
|
|
if nil != p {
|
|
exists, err := p.cli.IndexExists(index).Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
log.Print(err)
|
|
return err
|
|
}
|
|
if exists {
|
|
return errors.New(INDEX_EXISTED)
|
|
}
|
|
createIndex, err := p.cli.CreateIndex(index).Body(typemaping).Do(context.Background())
|
|
if err != nil {
|
|
log.Print(err)
|
|
return err
|
|
}
|
|
if !createIndex.Acknowledged {
|
|
return errors.New("create index error")
|
|
// Not acknowledged
|
|
}
|
|
return nil
|
|
}
|
|
return errors.New(ERROR_PTR)
|
|
}
|
|
|
|
func (p *ElkEngine) IndexExisted(index string) (bool, error) {
|
|
if nil != p {
|
|
exists, err := p.cli.IndexExists(index).Do(context.Background())
|
|
if exists {
|
|
return true, nil
|
|
}
|
|
if err != nil {
|
|
// Handle error
|
|
log.Print(err)
|
|
return false, err
|
|
}
|
|
return false, nil
|
|
}
|
|
return false, nil
|
|
}
|