607 lines
23 KiB
Go
607 lines
23 KiB
Go
/*
|
|
* Copyright 2025 coze-dev Authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package service
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/bytedance/sonic"
|
|
"github.com/cloudwego/eino/components/document/parser"
|
|
"github.com/cloudwego/eino/schema"
|
|
|
|
"github.com/coze-dev/coze-studio/backend/api/model/crossdomain/knowledge"
|
|
"github.com/coze-dev/coze-studio/backend/domain/knowledge/entity"
|
|
"github.com/coze-dev/coze-studio/backend/domain/knowledge/internal/consts"
|
|
"github.com/coze-dev/coze-studio/backend/domain/knowledge/internal/convert"
|
|
"github.com/coze-dev/coze-studio/backend/domain/knowledge/internal/dal/model"
|
|
"github.com/coze-dev/coze-studio/backend/domain/knowledge/internal/events"
|
|
"github.com/coze-dev/coze-studio/backend/infra/contract/document"
|
|
"github.com/coze-dev/coze-studio/backend/infra/contract/document/searchstore"
|
|
"github.com/coze-dev/coze-studio/backend/infra/contract/eventbus"
|
|
"github.com/coze-dev/coze-studio/backend/infra/contract/rdb"
|
|
"github.com/coze-dev/coze-studio/backend/infra/contract/storage"
|
|
"github.com/coze-dev/coze-studio/backend/infra/impl/document/progressbar"
|
|
"github.com/coze-dev/coze-studio/backend/pkg/errorx"
|
|
"github.com/coze-dev/coze-studio/backend/pkg/lang/slices"
|
|
"github.com/coze-dev/coze-studio/backend/pkg/logs"
|
|
"github.com/coze-dev/coze-studio/backend/types/errno"
|
|
)
|
|
|
|
func (k *knowledgeSVC) HandleMessage(ctx context.Context, msg *eventbus.Message) (err error) {
|
|
defer func() {
|
|
if err != nil {
|
|
var statusError errorx.StatusError
|
|
if errors.As(err, &statusError) && statusError.Code() == errno.ErrKnowledgeNonRetryableCode {
|
|
logs.Errorf("[HandleMessage][no-retry] failed, %v", err)
|
|
err = nil
|
|
} else {
|
|
logs.Errorf("[HandleMessage][retry] failed, %v", err)
|
|
}
|
|
} else {
|
|
logs.Infof("[HandleMessage] knowledge event handle success, body=%s", string(msg.Body))
|
|
}
|
|
}()
|
|
|
|
event := &entity.Event{}
|
|
if err = sonic.Unmarshal(msg.Body, event); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeParseJSONCode, errorx.KV("msg", fmt.Sprintf("unmarshal event failed, err: %v", err)))
|
|
}
|
|
|
|
switch event.Type {
|
|
case entity.EventTypeIndexDocuments:
|
|
if err = k.indexDocuments(ctx, event); err != nil {
|
|
return err
|
|
}
|
|
case entity.EventTypeIndexDocument:
|
|
if err = k.indexDocument(ctx, event); err != nil {
|
|
return err
|
|
}
|
|
case entity.EventTypeIndexSlice:
|
|
if err = k.indexSlice(ctx, event); err != nil {
|
|
return err
|
|
}
|
|
case entity.EventTypeDeleteKnowledgeData:
|
|
err = k.deleteKnowledgeDataEventHandler(ctx, event)
|
|
if err != nil {
|
|
logs.CtxErrorf(ctx, "[HandleMessage] delete knowledge failed, err: %v", err)
|
|
return err
|
|
}
|
|
case entity.EventTypeDocumentReview:
|
|
if err = k.documentReviewEventHandler(ctx, event); err != nil {
|
|
logs.CtxErrorf(ctx, "[HandleMessage] document review failed, err: %v", err)
|
|
return err
|
|
}
|
|
default:
|
|
return errorx.New(errno.ErrKnowledgeNonRetryableCode, errorx.KV("reason", fmt.Sprintf("unknown event type=%s", event.Type)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *knowledgeSVC) deleteKnowledgeDataEventHandler(ctx context.Context, event *entity.Event) error {
|
|
// Delete the data in each store of the knowledge base
|
|
for _, manager := range k.searchStoreManagers {
|
|
s, err := manager.GetSearchStore(ctx, getCollectionName(event.KnowledgeID))
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("get search store failed, err: %v", err)))
|
|
}
|
|
if err := s.Delete(ctx, slices.Transform(event.SliceIDs, func(id int64) string {
|
|
return strconv.FormatInt(id, 10)
|
|
})); err != nil {
|
|
logs.Errorf("delete knowledge failed, err: %v", err)
|
|
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("delete search store failed, err: %v", err)))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *knowledgeSVC) indexDocuments(ctx context.Context, event *entity.Event) (err error) {
|
|
if len(event.Documents) == 0 {
|
|
logs.CtxWarnf(ctx, "[indexDocuments] documents not provided")
|
|
return nil
|
|
}
|
|
for i := range event.Documents {
|
|
doc := event.Documents[i]
|
|
if doc == nil {
|
|
logs.CtxWarnf(ctx, "[indexDocuments] document not provided")
|
|
continue
|
|
}
|
|
e := events.NewIndexDocumentEvent(doc.KnowledgeID, doc)
|
|
msgData, err := sonic.Marshal(e)
|
|
if err != nil {
|
|
logs.CtxErrorf(ctx, "[indexDocuments] marshal event failed, err: %v", err)
|
|
return errorx.New(errno.ErrKnowledgeParseJSONCode, errorx.KV("msg", fmt.Sprintf("marshal event failed, err: %v", err)))
|
|
}
|
|
err = k.producer.Send(ctx, msgData, eventbus.WithShardingKey(strconv.FormatInt(doc.KnowledgeID, 10)))
|
|
if err != nil {
|
|
logs.CtxErrorf(ctx, "[indexDocuments] send message failed, err: %v", err)
|
|
return errorx.New(errno.ErrKnowledgeMQSendFailCode, errorx.KV("msg", fmt.Sprintf("send message failed, err: %v", err)))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *knowledgeSVC) indexDocument(ctx context.Context, event *entity.Event) (err error) {
|
|
doc := event.Document
|
|
if doc == nil {
|
|
return errorx.New(errno.ErrKnowledgeNonRetryableCode, errorx.KV("reason", "[indexDocument] document not provided"))
|
|
}
|
|
|
|
// 1. The index operations on the same document in the retry queue and the ordinary queue are concurrent, and the same document data is written twice (generated when the backend bugfix is online)
|
|
// 2. rebalance repeated consumption of the same message
|
|
|
|
// check knowledge and document status
|
|
if valid, err := k.isWritableKnowledgeAndDocument(ctx, doc.KnowledgeID, doc.ID); err != nil {
|
|
return err
|
|
} else if !valid {
|
|
return errorx.New(errno.ErrKnowledgeNonRetryableCode,
|
|
errorx.KVf("reason", "[indexDocument] not writable, knowledge_id=%d, document_id=%d", event.KnowledgeID, doc.ID))
|
|
}
|
|
|
|
defer func() {
|
|
if e := recover(); e != nil {
|
|
err = errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", fmt.Sprintf("panic: %v", e)))
|
|
logs.CtxErrorf(ctx, "[indexDocument] panic, err: %v", err)
|
|
if setStatusErr := k.documentRepo.SetStatus(ctx, event.Document.ID, int32(entity.DocumentStatusFailed), err.Error()); setStatusErr != nil {
|
|
logs.CtxErrorf(ctx, "[indexDocument] set document status failed, err: %v", setStatusErr)
|
|
}
|
|
return
|
|
}
|
|
if err != nil {
|
|
var errMsg string
|
|
var statusError errorx.StatusError
|
|
var status int32
|
|
if errors.As(err, &statusError) {
|
|
errMsg = errorx.ErrorWithoutStack(statusError)
|
|
if statusError.Code() == errno.ErrKnowledgeNonRetryableCode {
|
|
status = int32(entity.DocumentStatusFailed)
|
|
} else {
|
|
status = int32(entity.DocumentStatusChunking)
|
|
}
|
|
} else {
|
|
errMsg = err.Error()
|
|
status = int32(entity.DocumentStatusChunking)
|
|
}
|
|
if setStatusErr := k.documentRepo.SetStatus(ctx, event.Document.ID, status, errMsg); setStatusErr != nil {
|
|
logs.CtxErrorf(ctx, "[indexDocument] set document status failed, err: %v", setStatusErr)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// clear
|
|
collectionName := getCollectionName(doc.KnowledgeID)
|
|
|
|
if !doc.IsAppend {
|
|
ids, err := k.sliceRepo.GetDocumentSliceIDs(ctx, []int64{doc.ID})
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("get document slice ids failed, err: %v", err)))
|
|
}
|
|
if len(ids) > 0 {
|
|
if err = k.sliceRepo.DeleteByDocument(ctx, doc.ID); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("delete document slice failed, err: %v", err)))
|
|
}
|
|
for _, manager := range k.searchStoreManagers {
|
|
s, err := manager.GetSearchStore(ctx, collectionName)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("get search store failed, err: %v", err)))
|
|
}
|
|
if err := s.Delete(ctx, slices.Transform(event.SliceIDs, func(id int64) string {
|
|
return strconv.FormatInt(id, 10)
|
|
})); err != nil {
|
|
logs.Errorf("[indexDocument] delete knowledge failed, err: %v", err)
|
|
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("delete search store failed, err: %v", err)))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// set chunk status
|
|
if err = k.documentRepo.SetStatus(ctx, doc.ID, int32(entity.DocumentStatusChunking), ""); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("set document status failed, err: %v", err)))
|
|
}
|
|
|
|
// parse & chunk
|
|
bodyBytes, err := k.storage.GetObject(ctx, doc.URI)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeGetObjectFailCode, errorx.KV("msg", fmt.Sprintf("get object failed, err: %v", err)))
|
|
}
|
|
|
|
docParser, err := k.parseManager.GetParser(convert.DocumentToParseConfig(doc))
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeGetParserFailCode, errorx.KV("msg", fmt.Sprintf("get parser failed, err: %v", err)))
|
|
}
|
|
|
|
parseResult, err := docParser.Parse(ctx, bytes.NewReader(bodyBytes), parser.WithExtraMeta(map[string]any{
|
|
document.MetaDataKeyCreatorID: doc.CreatorID,
|
|
document.MetaDataKeyExternalStorage: map[string]any{
|
|
"document_id": doc.ID,
|
|
},
|
|
}))
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeParserParseFailCode, errorx.KV("msg", fmt.Sprintf("parse document failed, err: %v", err)))
|
|
}
|
|
|
|
if doc.Type == knowledge.DocumentTypeTable {
|
|
noData, err := document.GetDocumentsColumnsOnly(parseResult)
|
|
if err != nil { // unexpected
|
|
return errorx.New(errno.ErrKnowledgeNonRetryableCode,
|
|
errorx.KVf("reason", "[indexDocument] get table data status failed, err: %v", err))
|
|
}
|
|
if noData {
|
|
parseResult = nil // clear parse result
|
|
}
|
|
}
|
|
|
|
// set id
|
|
allIDs := make([]int64, 0, len(parseResult))
|
|
for l := 0; l < len(parseResult); l += 100 {
|
|
r := min(l+100, len(parseResult))
|
|
batchSize := r - l
|
|
ids, err := k.idgen.GenMultiIDs(ctx, batchSize)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeIDGenCode, errorx.KV("msg", fmt.Sprintf("GenMultiIDs failed, err: %v", err)))
|
|
}
|
|
allIDs = append(allIDs, ids...)
|
|
for i := 0; i < batchSize; i++ {
|
|
id := ids[i]
|
|
index := l + i
|
|
parseResult[index].ID = strconv.FormatInt(id, 10)
|
|
}
|
|
}
|
|
|
|
convertFn := d2sMapping[doc.Type]
|
|
if convertFn == nil {
|
|
return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", "convertFn is empty"))
|
|
}
|
|
|
|
sliceEntities, err := slices.TransformWithErrorCheck(parseResult, func(a *schema.Document) (*entity.Slice, error) {
|
|
return convertFn(a, doc.KnowledgeID, doc.ID, doc.CreatorID)
|
|
})
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", fmt.Sprintf("convert document failed, err: %v", err)))
|
|
}
|
|
|
|
// save slices
|
|
if doc.Type == knowledge.DocumentTypeTable {
|
|
// Table type to insert data into a database
|
|
err = k.upsertDataToTable(ctx, &doc.TableInfo, sliceEntities)
|
|
if err != nil {
|
|
logs.CtxErrorf(ctx, "[indexDocument] insert data to table failed, err: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
var seqOffset float64
|
|
if doc.IsAppend {
|
|
seqOffset, err = k.sliceRepo.GetLastSequence(ctx, doc.ID)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("get last sequence failed, err: %v", err)))
|
|
}
|
|
seqOffset += 1
|
|
}
|
|
|
|
sliceModels := make([]*model.KnowledgeDocumentSlice, 0, len(parseResult))
|
|
for i, src := range parseResult {
|
|
now := time.Now().UnixMilli()
|
|
sliceModel := &model.KnowledgeDocumentSlice{
|
|
ID: allIDs[i],
|
|
KnowledgeID: doc.KnowledgeID,
|
|
DocumentID: doc.ID,
|
|
Content: parseResult[i].Content,
|
|
Sequence: seqOffset + float64(i),
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
CreatorID: doc.CreatorID,
|
|
SpaceID: doc.SpaceID,
|
|
Status: int32(model.SliceStatusProcessing),
|
|
FailReason: "",
|
|
}
|
|
if doc.Type == knowledge.DocumentTypeTable {
|
|
sliceEntity, err := convertFn(src, doc.KnowledgeID, doc.ID, doc.CreatorID)
|
|
if err != nil {
|
|
logs.CtxErrorf(ctx, "[indexDocument] convert document failed, err: %v", err)
|
|
return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", fmt.Sprintf("convert document failed, err: %v", err)))
|
|
}
|
|
sliceModel.Content = sliceEntity.GetSliceContent()
|
|
}
|
|
sliceModels = append(sliceModels, sliceModel)
|
|
}
|
|
if err = k.sliceRepo.BatchCreate(ctx, sliceModels); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("batch create slice failed, err: %v", err)))
|
|
}
|
|
|
|
defer func() {
|
|
if err != nil { // set slice status
|
|
if setStatusErr := k.sliceRepo.BatchSetStatus(ctx, allIDs, int32(model.SliceStatusFailed), err.Error()); setStatusErr != nil {
|
|
logs.CtxErrorf(ctx, "[indexDocument] set slice status failed, err: %v", setStatusErr)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// to vectorstore
|
|
fields, err := k.mapSearchFields(doc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
indexingFields := getIndexingFields(fields)
|
|
|
|
// reformat docs, mainly for enableCompactTable
|
|
ssDocs, err := slices.TransformWithErrorCheck(sliceEntities, func(a *entity.Slice) (*schema.Document, error) {
|
|
return k.slice2Document(ctx, doc, a)
|
|
})
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", fmt.Sprintf("reformat document failed, err: %v", err)))
|
|
}
|
|
progressbar := progressbar.NewProgressBar(ctx, doc.ID, int64(len(ssDocs)*len(k.searchStoreManagers)), k.cacheCli, true)
|
|
for _, manager := range k.searchStoreManagers {
|
|
now := time.Now()
|
|
if err = manager.Create(ctx, &searchstore.CreateRequest{
|
|
CollectionName: collectionName,
|
|
Fields: fields,
|
|
CollectionMeta: nil,
|
|
}); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("create search store failed, err: %v", err)))
|
|
}
|
|
// Picture knowledge base kn: doc: slice = 1: n: n, maybe the content is empty, no need to write
|
|
if doc.Type == knowledge.DocumentTypeImage && len(ssDocs) == 1 && len(ssDocs[0].Content) == 0 {
|
|
continue
|
|
}
|
|
ss, err := manager.GetSearchStore(ctx, collectionName)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("get search store failed, err: %v", err)))
|
|
}
|
|
if _, err = ss.Store(ctx, ssDocs,
|
|
searchstore.WithIndexerPartitionKey(fieldNameDocumentID),
|
|
searchstore.WithPartition(strconv.FormatInt(doc.ID, 10)),
|
|
searchstore.WithIndexingFields(indexingFields),
|
|
searchstore.WithProgressBar(progressbar),
|
|
); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("store search store failed, err: %v", err)))
|
|
}
|
|
logs.CtxDebugf(ctx, "[indexDocument] ss type=%v, len(docs)=%d, finished after %d ms",
|
|
manager.GetType(), len(ssDocs), time.Now().Sub(now).Milliseconds())
|
|
}
|
|
// set slice status
|
|
if err = k.sliceRepo.BatchSetStatus(ctx, allIDs, int32(model.SliceStatusDone), ""); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("batch set slice status failed, err: %v", err)))
|
|
}
|
|
|
|
// set document status
|
|
|
|
if err = k.documentRepo.SetStatus(ctx, doc.ID, int32(entity.DocumentStatusEnable), ""); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("set document status failed, err: %v", err)))
|
|
}
|
|
if err = k.documentRepo.UpdateDocumentSliceInfo(ctx, event.Document.ID); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("update document slice info failed, err: %v", err)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *knowledgeSVC) upsertDataToTable(ctx context.Context, tableInfo *entity.TableInfo, slices []*entity.Slice) (err error) {
|
|
if len(slices) == 0 {
|
|
return nil
|
|
}
|
|
insertData, err := packInsertData(slices)
|
|
if err != nil {
|
|
logs.CtxErrorf(ctx, "[insertDataToTable] pack insert data failed, err: %v", err)
|
|
return err
|
|
}
|
|
resp, err := k.rdb.UpsertData(ctx, &rdb.UpsertDataRequest{
|
|
TableName: tableInfo.PhysicalTableName,
|
|
Data: insertData,
|
|
})
|
|
if err != nil {
|
|
logs.CtxErrorf(ctx, "[insertDataToTable] insert data failed, err: %v", err)
|
|
return errorx.New(errno.ErrKnowledgeCrossDomainCode, errorx.KVf("msg", "insert data failed, err: %v", err))
|
|
}
|
|
if resp.AffectedRows+resp.UnchangedRows != int64(len(slices)) {
|
|
logs.CtxErrorf(ctx, "[insertDataToTable] insert data failed, affected rows: %d, expect: %d", resp.AffectedRows, len(slices))
|
|
return errorx.New(errno.ErrKnowledgeCrossDomainCode, errorx.KVf("msg", "insert data failed, affected rows: %d, expect: %d", resp.AffectedRows, len(slices)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func packInsertData(slices []*entity.Slice) (data []map[string]interface{}, err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logs.Errorf("[packInsertData] panic: %v", r)
|
|
err = errorx.New(errno.ErrKnowledgeSystemCode, errorx.KVf("msg", "panic: %v", r))
|
|
return
|
|
}
|
|
}()
|
|
|
|
for i := range slices {
|
|
dataMap := map[string]any{
|
|
consts.RDBFieldID: slices[i].ID,
|
|
}
|
|
for j := range slices[i].RawContent[0].Table.Columns {
|
|
val := slices[i].RawContent[0].Table.Columns[j]
|
|
if val.ColumnName == consts.RDBFieldID {
|
|
continue
|
|
}
|
|
physicalColumnName := convert.ColumnIDToRDBField(val.ColumnID)
|
|
dataMap[physicalColumnName] = val.GetValue()
|
|
}
|
|
data = append(data, dataMap)
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
func (k *knowledgeSVC) indexSlice(ctx context.Context, event *entity.Event) (err error) {
|
|
slice := event.Slice
|
|
if slice == nil {
|
|
return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", "slice not provided"))
|
|
}
|
|
if slice.ID == 0 {
|
|
return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", "slice.id not set"))
|
|
}
|
|
if event.Document == nil {
|
|
doc, err := k.documentRepo.GetByID(ctx, slice.DocumentID)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("get document failed, err: %v", err)))
|
|
}
|
|
event.Document, err = k.fromModelDocument(ctx, doc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if slice.DocumentID == 0 {
|
|
slice.DocumentID = event.Document.ID
|
|
}
|
|
if slice.KnowledgeID == 0 {
|
|
slice.KnowledgeID = event.Document.KnowledgeID
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
if setStatusErr := k.sliceRepo.BatchSetStatus(ctx, []int64{slice.ID}, int32(model.SliceStatusFailed), err.Error()); setStatusErr != nil {
|
|
logs.CtxErrorf(ctx, "[indexSlice] set slice status failed, err: %v", setStatusErr)
|
|
}
|
|
}
|
|
}()
|
|
|
|
fields, err := k.mapSearchFields(event.Document)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
indexingFields := getIndexingFields(fields)
|
|
collectionName := getCollectionName(slice.KnowledgeID)
|
|
for _, manager := range k.searchStoreManagers {
|
|
ss, err := manager.GetSearchStore(ctx, collectionName)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("get search store failed, err: %v", err)))
|
|
}
|
|
|
|
doc, err := k.slice2Document(ctx, event.Document, slice)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err = ss.Store(ctx, []*schema.Document{doc},
|
|
searchstore.WithIndexerPartitionKey(fieldNameDocumentID),
|
|
searchstore.WithPartition(strconv.FormatInt(event.Document.ID, 10)),
|
|
searchstore.WithIndexingFields(indexingFields),
|
|
); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", fmt.Sprintf("store search store failed, err: %v", err)))
|
|
}
|
|
}
|
|
|
|
if err = k.sliceRepo.BatchSetStatus(ctx, []int64{slice.ID}, int32(model.SliceStatusDone), ""); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("batch set slice status failed, err: %v", err)))
|
|
}
|
|
if err = k.documentRepo.UpdateDocumentSliceInfo(ctx, slice.DocumentID); err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("update document slice info failed, err: %v", err)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type chunk struct {
|
|
ID string `json:"id"`
|
|
Text string `json:"text"`
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
type chunkResult struct {
|
|
Chunks []*chunk `json:"chunks"`
|
|
}
|
|
|
|
func (k *knowledgeSVC) documentReviewEventHandler(ctx context.Context, event *entity.Event) (err error) {
|
|
review := event.DocumentReview
|
|
if review == nil {
|
|
return errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "review not provided"))
|
|
}
|
|
if review.ReviewID == nil {
|
|
return errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "review.id not set"))
|
|
}
|
|
reviewModel, err := k.reviewRepo.GetByID(ctx, *review.ReviewID)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", fmt.Sprintf("get review failed, err: %v", err)))
|
|
}
|
|
if reviewModel.Status == int32(entity.ReviewStatus_Enable) {
|
|
return nil
|
|
}
|
|
byteData, err := k.storage.GetObject(ctx, review.Uri)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeGetObjectFailCode, errorx.KV("msg", fmt.Sprintf("get object failed, err: %v", err)))
|
|
}
|
|
p, err := k.parseManager.GetParser(convert.DocumentToParseConfig(event.Document))
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeGetParserFailCode, errorx.KV("msg", fmt.Sprintf("get parser failed, err: %v", err)))
|
|
}
|
|
result, err := p.Parse(ctx, bytes.NewReader(byteData))
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeParserParseFailCode, errorx.KV("msg", fmt.Sprintf("parse document failed, err: %v", err)))
|
|
}
|
|
ids, err := k.genMultiIDs(ctx, len(result))
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeIDGenCode, errorx.KV("msg", fmt.Sprintf("GenMultiIDs failed, err: %v", err)))
|
|
}
|
|
fn, ok := d2sMapping[event.Document.Type]
|
|
if !ok {
|
|
return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", "convertFn is empty"))
|
|
}
|
|
var chunks []*chunk
|
|
for i, doc := range result {
|
|
slice, err := fn(doc, event.Document.KnowledgeID, event.Document.ID, event.Document.CreatorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
chunks = append(chunks, &chunk{
|
|
ID: strconv.FormatInt(ids[i], 10),
|
|
Text: slice.GetSliceContent(),
|
|
Type: "text",
|
|
})
|
|
}
|
|
chunkResp := &chunkResult{
|
|
Chunks: chunks,
|
|
}
|
|
chunksData, err := sonic.Marshal(chunkResp)
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgeParseJSONCode, errorx.KV("msg", fmt.Sprintf("marshal chunk failed, err: %v", err)))
|
|
}
|
|
tosUri := fmt.Sprintf("DocReview/%d_%d_%d.txt", reviewModel.CreatorID, time.Now().UnixMilli(), *review.ReviewID)
|
|
err = k.storage.PutObject(ctx, tosUri, chunksData, storage.WithContentType("text/plain; charset=utf-8"))
|
|
if err != nil {
|
|
return errorx.New(errno.ErrKnowledgePutObjectFailCode, errorx.KV("msg", fmt.Sprintf("put object failed, err: %v", err)))
|
|
}
|
|
return k.reviewRepo.UpdateReview(ctx, reviewModel.ID, map[string]interface{}{
|
|
"status": int32(entity.ReviewStatus_Enable),
|
|
"chunk_resp_uri": tosUri,
|
|
})
|
|
}
|
|
|
|
func (k *knowledgeSVC) mapSearchFields(doc *entity.Document) ([]*searchstore.Field, error) {
|
|
fn, found := fMapping[doc.Type]
|
|
if !found {
|
|
return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", fmt.Sprintf("document type invalid, type=%d", doc.Type)))
|
|
}
|
|
return fn(doc, k.enableCompactTable), nil
|
|
}
|
|
|
|
func (k *knowledgeSVC) slice2Document(ctx context.Context, src *entity.Document, slice *entity.Slice) (*schema.Document, error) {
|
|
fn, found := s2dMapping[src.Type]
|
|
if !found {
|
|
return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", fmt.Sprintf("document type invalid, type=%d", src.Type)))
|
|
}
|
|
return fn(ctx, slice, src.TableInfo.Columns, k.enableCompactTable)
|
|
}
|