302 lines
		
	
	
		
			7.6 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			302 lines
		
	
	
		
			7.6 KiB
		
	
	
	
		
			Go
		
	
	
	
/*
 | 
						|
 * Copyright 2025 coze-dev Authors
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 */
 | 
						|
 | 
						|
package es
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"os"
 | 
						|
 | 
						|
	"github.com/elastic/go-elasticsearch/v8"
 | 
						|
	"github.com/elastic/go-elasticsearch/v8/esutil"
 | 
						|
	"github.com/elastic/go-elasticsearch/v8/typedapi/core/search"
 | 
						|
	"github.com/elastic/go-elasticsearch/v8/typedapi/indices/create"
 | 
						|
	"github.com/elastic/go-elasticsearch/v8/typedapi/indices/delete"
 | 
						|
	"github.com/elastic/go-elasticsearch/v8/typedapi/indices/exists"
 | 
						|
	"github.com/elastic/go-elasticsearch/v8/typedapi/types"
 | 
						|
	"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/operator"
 | 
						|
	"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/sortorder"
 | 
						|
	"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/textquerytype"
 | 
						|
 | 
						|
	"github.com/coze-dev/coze-studio/backend/infra/contract/es"
 | 
						|
	"github.com/coze-dev/coze-studio/backend/pkg/lang/conv"
 | 
						|
	"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
 | 
						|
	"github.com/coze-dev/coze-studio/backend/pkg/logs"
 | 
						|
	"github.com/coze-dev/coze-studio/backend/pkg/sonic"
 | 
						|
)
 | 
						|
 | 
						|
type es8Client struct {
 | 
						|
	esClient *elasticsearch.TypedClient
 | 
						|
	types    *es8Types
 | 
						|
}
 | 
						|
 | 
						|
type es8BulkIndexer struct {
 | 
						|
	bi esutil.BulkIndexer
 | 
						|
}
 | 
						|
 | 
						|
type es8Types struct{}
 | 
						|
 | 
						|
func newES8() (Client, error) {
 | 
						|
	esAddr := os.Getenv("ES_ADDR")
 | 
						|
	esUsername := os.Getenv("ES_USERNAME")
 | 
						|
	esPassword := os.Getenv("ES_PASSWORD")
 | 
						|
	esClient, err := elasticsearch.NewTypedClient(elasticsearch.Config{
 | 
						|
		Addresses: []string{esAddr},
 | 
						|
		Username:  esUsername,
 | 
						|
		Password:  esPassword,
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &es8Client{
 | 
						|
		esClient: esClient,
 | 
						|
		types:    &es8Types{},
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) Create(ctx context.Context, index, id string, document any) error {
 | 
						|
	_, err := c.esClient.Index(index).Id(id).Document(document).Do(ctx)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) Update(ctx context.Context, index, id string, document any) error {
 | 
						|
	_, err := c.esClient.Update(index, id).Doc(document).Do(ctx)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) Delete(ctx context.Context, index, id string) error {
 | 
						|
	_, err := c.esClient.Delete(index, id).Do(ctx)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) Exists(ctx context.Context, index string) (bool, error) {
 | 
						|
	exist, err := exists.NewExistsFunc(c.esClient)(index).Do(ctx)
 | 
						|
	if err != nil {
 | 
						|
		return false, err
 | 
						|
	}
 | 
						|
 | 
						|
	return exist, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) query2ESQuery(q *Query) *types.Query {
 | 
						|
	if q == nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	var typesQ *types.Query
 | 
						|
	switch q.Type {
 | 
						|
	case es.QueryTypeEqual:
 | 
						|
		typesQ = &types.Query{
 | 
						|
			Term: map[string]types.TermQuery{
 | 
						|
				q.KV.Key: {Value: q.KV.Value},
 | 
						|
			},
 | 
						|
		}
 | 
						|
	case es.QueryTypeMatch:
 | 
						|
		typesQ = &types.Query{
 | 
						|
			Match: map[string]types.MatchQuery{
 | 
						|
				q.KV.Key: {Query: fmt.Sprint(q.KV.Value)},
 | 
						|
			},
 | 
						|
		}
 | 
						|
	case es.QueryTypeMultiMatch:
 | 
						|
		typesQ = &types.Query{
 | 
						|
			MultiMatch: &types.MultiMatchQuery{
 | 
						|
				Fields:   q.MultiMatchQuery.Fields,
 | 
						|
				Operator: &operator.Operator{Name: q.MultiMatchQuery.Operator},
 | 
						|
				Query:    q.MultiMatchQuery.Query,
 | 
						|
				Type:     &textquerytype.TextQueryType{Name: q.MultiMatchQuery.Type},
 | 
						|
			},
 | 
						|
		}
 | 
						|
	case es.QueryTypeNotExists:
 | 
						|
		typesQ = &types.Query{
 | 
						|
			Bool: &types.BoolQuery{
 | 
						|
				MustNot: []types.Query{{Exists: &types.ExistsQuery{Field: q.KV.Key}}},
 | 
						|
			},
 | 
						|
		}
 | 
						|
	case es.QueryTypeContains:
 | 
						|
		typesQ = &types.Query{
 | 
						|
			Wildcard: map[string]types.WildcardQuery{
 | 
						|
				q.KV.Key: {
 | 
						|
					Value:           ptr.Of(fmt.Sprintf("*%s*", q.KV.Value)),
 | 
						|
					CaseInsensitive: ptr.Of(true), // 忽略大小写
 | 
						|
				},
 | 
						|
			},
 | 
						|
		}
 | 
						|
	case es.QueryTypeIn:
 | 
						|
		typesQ = &types.Query{
 | 
						|
			Terms: &types.TermsQuery{
 | 
						|
				TermsQuery: map[string]types.TermsQueryField{
 | 
						|
					q.KV.Key: q.KV.Value,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		}
 | 
						|
	default:
 | 
						|
		typesQ = &types.Query{}
 | 
						|
	}
 | 
						|
 | 
						|
	if q.Bool == nil {
 | 
						|
		return typesQ
 | 
						|
	}
 | 
						|
 | 
						|
	typesQ.Bool = &types.BoolQuery{}
 | 
						|
	for idx := range q.Bool.Filter {
 | 
						|
		v := q.Bool.Filter[idx]
 | 
						|
		typesQ.Bool.Filter = append(typesQ.Bool.Filter, *c.query2ESQuery(&v))
 | 
						|
	}
 | 
						|
 | 
						|
	for idx := range q.Bool.Must {
 | 
						|
		v := q.Bool.Must[idx]
 | 
						|
		typesQ.Bool.Must = append(typesQ.Bool.Must, *c.query2ESQuery(&v))
 | 
						|
	}
 | 
						|
 | 
						|
	for idx := range q.Bool.MustNot {
 | 
						|
		v := q.Bool.MustNot[idx]
 | 
						|
		typesQ.Bool.MustNot = append(typesQ.Bool.MustNot, *c.query2ESQuery(&v))
 | 
						|
	}
 | 
						|
 | 
						|
	for idx := range q.Bool.Should {
 | 
						|
		v := q.Bool.Should[idx]
 | 
						|
		typesQ.Bool.Should = append(typesQ.Bool.Should, *c.query2ESQuery(&v))
 | 
						|
	}
 | 
						|
 | 
						|
	if q.Bool.MinimumShouldMatch != nil {
 | 
						|
		typesQ.Bool.MinimumShouldMatch = q.Bool.MinimumShouldMatch
 | 
						|
	}
 | 
						|
 | 
						|
	return typesQ
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) Search(ctx context.Context, index string, req *Request) (*Response, error) {
 | 
						|
	esReq := &search.Request{
 | 
						|
		Query:    c.query2ESQuery(req.Query),
 | 
						|
		Size:     req.Size,
 | 
						|
		MinScore: (*types.Float64)(req.MinScore),
 | 
						|
	}
 | 
						|
 | 
						|
	for _, sort := range req.Sort {
 | 
						|
		order := sortorder.Asc
 | 
						|
		if !sort.Asc {
 | 
						|
			order = sortorder.Desc
 | 
						|
		}
 | 
						|
		esReq.Sort = append(esReq.Sort, types.SortCombinations(types.SortOptions{
 | 
						|
			SortOptions: map[string]types.FieldSort{
 | 
						|
				sort.Field: {
 | 
						|
					Order: ptr.Of(order),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		}))
 | 
						|
	}
 | 
						|
 | 
						|
	if req.From != nil {
 | 
						|
		esReq.From = req.From
 | 
						|
	} else {
 | 
						|
		for _, v := range req.SearchAfter {
 | 
						|
			esReq.SearchAfter = append(esReq.SearchAfter, types.FieldValue(v))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	logs.CtxDebugf(ctx, "Elasticsearch Request: %s\n", conv.DebugJsonToStr(esReq))
 | 
						|
 | 
						|
	resp, err := c.esClient.Search().Request(esReq).Index(index).Do(ctx)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	respJson, err := sonic.MarshalString(resp)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	var esResp Response
 | 
						|
	if err := sonic.UnmarshalString(respJson, &esResp); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &esResp, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) CreateIndex(ctx context.Context, index string, properties map[string]any) error {
 | 
						|
	propertiesMap := make(map[string]types.Property)
 | 
						|
	for k, v := range properties {
 | 
						|
		propertiesMap[k] = v
 | 
						|
	}
 | 
						|
 | 
						|
	if _, err := create.NewCreateFunc(c.esClient)(index).Request(&create.Request{
 | 
						|
		Mappings: &types.TypeMapping{
 | 
						|
			Properties: propertiesMap,
 | 
						|
		},
 | 
						|
	}).Do(ctx); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) DeleteIndex(ctx context.Context, index string) error {
 | 
						|
	_, err := delete.NewDeleteFunc(c.esClient)(index).
 | 
						|
		IgnoreUnavailable(true).Do(ctx)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) NewBulkIndexer(index string) (BulkIndexer, error) {
 | 
						|
	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
 | 
						|
		Client: c.esClient,
 | 
						|
		Index:  index,
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &es8BulkIndexer{bi}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *es8Client) Types() Types {
 | 
						|
	return c.types
 | 
						|
}
 | 
						|
 | 
						|
func (t *es8Types) NewLongNumberProperty() any {
 | 
						|
	return types.NewLongNumberProperty()
 | 
						|
}
 | 
						|
 | 
						|
func (t *es8Types) NewTextProperty() any {
 | 
						|
	return types.NewTextProperty()
 | 
						|
}
 | 
						|
 | 
						|
func (t *es8Types) NewUnsignedLongNumberProperty() any {
 | 
						|
	return types.NewUnsignedLongNumberProperty()
 | 
						|
}
 | 
						|
 | 
						|
func (b *es8BulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
 | 
						|
	return b.bi.Add(ctx, esutil.BulkIndexerItem{
 | 
						|
		Index:           item.Index,
 | 
						|
		Action:          item.Action,
 | 
						|
		DocumentID:      item.DocumentID,
 | 
						|
		Routing:         item.Routing,
 | 
						|
		Version:         item.Version,
 | 
						|
		VersionType:     item.VersionType,
 | 
						|
		Body:            item.Body,
 | 
						|
		RetryOnConflict: item.RetryOnConflict,
 | 
						|
		// not support in es7
 | 
						|
		// RequireAlias:    item.RequireAlias,
 | 
						|
		// IfSeqNo:         item.IfSeqNo,
 | 
						|
		// IfPrimaryTerm:   item.IfPrimaryTerm,
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func (b *es8BulkIndexer) Close(ctx context.Context) error {
 | 
						|
	return b.bi.Close(ctx)
 | 
						|
}
 |