551 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			551 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  * Copyright 2025 coze-dev Authors
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| 
 | |
| package service
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/bytedance/sonic"
 | |
| 	"github.com/cloudwego/eino/schema"
 | |
| 	"golang.org/x/sync/errgroup"
 | |
| 
 | |
| 	knowledgeModel "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/knowledge"
 | |
| 	crossdatacopy "github.com/coze-dev/coze-studio/backend/crossdomain/contract/datacopy"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/datacopy"
 | |
| 	copyEntity "github.com/coze-dev/coze-studio/backend/domain/datacopy/entity"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/knowledge/entity"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/knowledge/internal/consts"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/knowledge/internal/convert"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/knowledge/internal/dal/model"
 | |
| 	"github.com/coze-dev/coze-studio/backend/infra/contract/document"
 | |
| 	"github.com/coze-dev/coze-studio/backend/infra/contract/document/searchstore"
 | |
| 	"github.com/coze-dev/coze-studio/backend/infra/contract/rdb"
 | |
| 	rdbEntity "github.com/coze-dev/coze-studio/backend/infra/contract/rdb/entity"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/errorx"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/lang/slices"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/logs"
 | |
| 	"github.com/coze-dev/coze-studio/backend/types/errno"
 | |
| )
 | |
| 
 | |
| func (k *knowledgeSVC) CopyKnowledge(ctx context.Context, request *CopyKnowledgeRequest) (*CopyKnowledgeResponse, error) {
 | |
| 	if request == nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "request is empty"))
 | |
| 	}
 | |
| 	if len(request.TaskUniqKey) == 0 {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "task uniq key is empty"))
 | |
| 	}
 | |
| 	if request.KnowledgeID == 0 {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "knowledge id is empty"))
 | |
| 	}
 | |
| 	kn, err := k.knowledgeRepo.GetByID(ctx, request.KnowledgeID)
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	if kn == nil || kn.ID == 0 {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeNotExistCode, errorx.KV("msg", "knowledge not exist"))
 | |
| 	}
 | |
| 	newID, err := k.idgen.GenID(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeIDGenCode)
 | |
| 	}
 | |
| 	copyTaskEntity := copyEntity.CopyDataTask{
 | |
| 		TaskUniqKey:   request.TaskUniqKey,
 | |
| 		OriginDataID:  request.KnowledgeID,
 | |
| 		TargetDataID:  newID,
 | |
| 		OriginSpaceID: kn.SpaceID,
 | |
| 		TargetSpaceID: request.TargetSpaceID,
 | |
| 		OriginUserID:  kn.CreatorID,
 | |
| 		TargetUserID:  request.TargetUserID,
 | |
| 		OriginAppID:   kn.AppID,
 | |
| 		TargetAppID:   request.TargetAppID,
 | |
| 		DataType:      copyEntity.DataTypeKnowledge,
 | |
| 		StartTime:     time.Now().UnixMilli(),
 | |
| 		FinishTime:    0,
 | |
| 		ExtInfo:       "",
 | |
| 		ErrorMsg:      "",
 | |
| 		Status:        copyEntity.DataCopyTaskStatusCreate,
 | |
| 	}
 | |
| 	checkResult, err := crossdatacopy.DefaultSVC().CheckAndGenCopyTask(ctx, &datacopy.CheckAndGenCopyTaskReq{Task: ©TaskEntity})
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeCrossDomainCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	switch checkResult.CopyTaskStatus {
 | |
| 	case copyEntity.DataCopyTaskStatusSuccess:
 | |
| 		return &CopyKnowledgeResponse{
 | |
| 			OriginKnowledgeID: request.KnowledgeID,
 | |
| 			TargetKnowledgeID: checkResult.TargetID,
 | |
| 			CopyStatus:        knowledgeModel.CopyStatus_Successful,
 | |
| 			ErrMsg:            "",
 | |
| 		}, nil
 | |
| 	case copyEntity.DataCopyTaskStatusInProgress:
 | |
| 		return &CopyKnowledgeResponse{
 | |
| 			OriginKnowledgeID: request.KnowledgeID,
 | |
| 			TargetKnowledgeID: checkResult.TargetID,
 | |
| 			CopyStatus:        knowledgeModel.CopyStatus_Processing,
 | |
| 			ErrMsg:            "",
 | |
| 		}, nil
 | |
| 	case copyEntity.DataCopyTaskStatusFail:
 | |
| 		return &CopyKnowledgeResponse{
 | |
| 			OriginKnowledgeID: request.KnowledgeID,
 | |
| 			TargetKnowledgeID: checkResult.TargetID,
 | |
| 			CopyStatus:        knowledgeModel.CopyStatus_Failed,
 | |
| 			ErrMsg:            checkResult.FailReason,
 | |
| 		}, nil
 | |
| 	}
 | |
| 	copyResp, err := k.copyDo(ctx, &knowledgeCopyCtx{
 | |
| 		OriginData: kn,
 | |
| 		CopyTask:   ©TaskEntity,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return copyResp, nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) copyDo(ctx context.Context, copyCtx *knowledgeCopyCtx) (*CopyKnowledgeResponse, error) {
 | |
| 	var err error
 | |
| 	defer func() {
 | |
| 		if e := recover(); e != nil {
 | |
| 			logs.CtxErrorf(ctx, "copy knowledge failed, err: %v", e)
 | |
| 			err = errorx.New(errno.ErrKnowledgeSystemCode, errorx.KVf("msg", "panic: %v", e))
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			deleteErr := k.DeleteKnowledge(ctx, &DeleteKnowledgeRequest{KnowledgeID: copyCtx.CopyTask.TargetDataID})
 | |
| 			if deleteErr != nil {
 | |
| 				logs.CtxErrorf(ctx, "delete knowledge failed, err: %v", deleteErr)
 | |
| 			}
 | |
| 			if len(copyCtx.NewRDBTableNames) != 0 {
 | |
| 				for i := range copyCtx.NewRDBTableNames {
 | |
| 					_, dropErr := k.rdb.DropTable(ctx, &rdb.DropTableRequest{
 | |
| 						TableName: copyCtx.NewRDBTableNames[i],
 | |
| 						IfExists:  true,
 | |
| 					})
 | |
| 					if dropErr != nil {
 | |
| 						logs.CtxErrorf(ctx, "[copyDo] drop table failed, err: %v", dropErr)
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 			copyCtx.CopyTask.Status = copyEntity.DataCopyTaskStatusFail
 | |
| 			err = crossdatacopy.DefaultSVC().UpdateCopyTask(ctx, &datacopy.UpdateCopyTaskReq{Task: copyCtx.CopyTask})
 | |
| 			if err != nil {
 | |
| 				logs.CtxErrorf(ctx, "update copy task failed, err: %v", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 	copyCtx.CopyTask.Status = copyEntity.DataCopyTaskStatusInProgress
 | |
| 	err = crossdatacopy.DefaultSVC().UpdateCopyTask(ctx, &datacopy.UpdateCopyTaskReq{Task: copyCtx.CopyTask})
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeCrossDomainCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	err = k.copyKnowledge(ctx, copyCtx)
 | |
| 	if err != nil {
 | |
| 		logs.CtxErrorf(ctx, "copy knowledge failed, err: %v", err)
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	err = k.copyKnowledgeDocuments(ctx, copyCtx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	copyCtx.CopyTask.FinishTime = time.Now().UnixMilli()
 | |
| 	copyCtx.CopyTask.Status = copyEntity.DataCopyTaskStatusSuccess
 | |
| 	err = crossdatacopy.DefaultSVC().UpdateCopyTask(ctx, &datacopy.UpdateCopyTaskReq{Task: copyCtx.CopyTask})
 | |
| 	if err != nil {
 | |
| 		logs.CtxWarnf(ctx, "update copy task failed, err: %v", err)
 | |
| 	}
 | |
| 	return &CopyKnowledgeResponse{
 | |
| 		OriginKnowledgeID: copyCtx.OriginData.ID,
 | |
| 		TargetKnowledgeID: copyCtx.CopyTask.TargetDataID,
 | |
| 		CopyStatus:        knowledgeModel.CopyStatus_Successful,
 | |
| 		ErrMsg:            "",
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) copyKnowledge(ctx context.Context, copyCtx *knowledgeCopyCtx) error {
 | |
| 	copyKnowledgeInfo := model.Knowledge{
 | |
| 		ID:          copyCtx.CopyTask.TargetDataID,
 | |
| 		Name:        copyCtx.OriginData.Name,
 | |
| 		AppID:       copyCtx.CopyTask.TargetAppID,
 | |
| 		CreatorID:   copyCtx.CopyTask.TargetUserID,
 | |
| 		SpaceID:     copyCtx.CopyTask.TargetSpaceID,
 | |
| 		CreatedAt:   time.Now().UnixMilli(),
 | |
| 		UpdatedAt:   time.Now().UnixMilli(),
 | |
| 		Status:      int32(knowledgeModel.KnowledgeStatusEnable),
 | |
| 		Description: copyCtx.OriginData.Description,
 | |
| 		IconURI:     copyCtx.OriginData.IconURI,
 | |
| 		FormatType:  copyCtx.OriginData.FormatType,
 | |
| 	}
 | |
| 	err := k.knowledgeRepo.Upsert(ctx, ©KnowledgeInfo)
 | |
| 	if err != nil {
 | |
| 		return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) copyKnowledgeDocuments(ctx context.Context, copyCtx *knowledgeCopyCtx) (err error) {
 | |
| 	// Query document information (only processed documents)
 | |
| 	documents, _, err := k.documentRepo.FindDocumentByCondition(ctx, &entity.WhereDocumentOpt{
 | |
| 		KnowledgeIDs: []int64{copyCtx.OriginData.ID},
 | |
| 		StatusIn:     []int32{int32(entity.DocumentStatusEnable), int32(entity.DocumentStatusInit)},
 | |
| 		SelectAll:    true,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	if len(documents) == 0 {
 | |
| 		logs.CtxInfof(ctx, "knowledge %d has no document", copyCtx.OriginData.ID)
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// create search store collections
 | |
| 	docItem, err := k.fromModelDocument(ctx, documents[0])
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	fields, err := k.mapSearchFields(docItem)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	collectionName := getCollectionName(copyCtx.CopyTask.TargetDataID)
 | |
| 	for _, ssMgr := range k.searchStoreManagers {
 | |
| 		if err = ssMgr.Create(ctx, &searchstore.CreateRequest{
 | |
| 			CollectionName: collectionName,
 | |
| 			Fields:         fields,
 | |
| 			CollectionMeta: nil,
 | |
| 		}); err != nil {
 | |
| 			return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", err.Error()))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	targetDocuments, _, err := k.documentRepo.FindDocumentByCondition(ctx, &entity.WhereDocumentOpt{
 | |
| 		KnowledgeIDs: []int64{copyCtx.CopyTask.TargetDataID},
 | |
| 		SelectAll:    true,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		logs.CtxErrorf(ctx, "find target document failed, err: %v", err)
 | |
| 		return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	for i := range targetDocuments {
 | |
| 		err = k.DeleteDocument(ctx, &DeleteDocumentRequest{DocumentID: targetDocuments[i].ID})
 | |
| 		if err != nil {
 | |
| 			return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 		}
 | |
| 	}
 | |
| 	// table copy
 | |
| 	eg := errgroup.Group{}
 | |
| 	eg.SetLimit(10)
 | |
| 	mu := sync.Mutex{}
 | |
| 	var failList []int64
 | |
| 
 | |
| 	newIDs, err := k.genMultiIDs(ctx, len(documents))
 | |
| 	if err != nil {
 | |
| 		logs.CtxErrorf(ctx, "gen document id failed, err: %v", err)
 | |
| 		return errorx.New(errno.ErrKnowledgeIDGenCode)
 | |
| 	}
 | |
| 
 | |
| 	for i := range documents {
 | |
| 		doc := documents[i]
 | |
| 		newID := newIDs[i]
 | |
| 
 | |
| 		eg.Go(func() error {
 | |
| 			cpErr := k.copyDocument(ctx, copyCtx, doc, newID)
 | |
| 			if cpErr != nil {
 | |
| 				mu.Lock()
 | |
| 				failList = append(failList, doc.ID)
 | |
| 				mu.Unlock()
 | |
| 				logs.CtxErrorf(ctx, "copy document failed, src document id: %d, new id: %d, err: %v", doc.ID, newID, err)
 | |
| 				return cpErr
 | |
| 			}
 | |
| 			return nil
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	if err := eg.Wait(); err != nil {
 | |
| 		logs.CtxErrorf(ctx, "copy document failed, document ids: %v, first-err: %v", failList, err)
 | |
| 		return errorx.New(errno.ErrKnowledgeCopyFailCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) copyDocument(ctx context.Context, copyCtx *knowledgeCopyCtx, doc *model.KnowledgeDocument, newDocID int64) (err error) {
 | |
| 	// tabular document replication
 | |
| 	newDoc := model.KnowledgeDocument{
 | |
| 		ID:            newDocID,
 | |
| 		KnowledgeID:   copyCtx.CopyTask.TargetDataID,
 | |
| 		Name:          doc.Name,
 | |
| 		FileExtension: doc.FileExtension,
 | |
| 		DocumentType:  doc.DocumentType,
 | |
| 		URI:           doc.URI,
 | |
| 		Size:          doc.Size,
 | |
| 		SliceCount:    doc.SliceCount,
 | |
| 		CharCount:     doc.CharCount,
 | |
| 		CreatorID:     copyCtx.CopyTask.TargetUserID,
 | |
| 		SpaceID:       copyCtx.CopyTask.TargetSpaceID,
 | |
| 		CreatedAt:     time.Now().UnixMilli(),
 | |
| 		UpdatedAt:     time.Now().UnixMilli(),
 | |
| 		SourceType:    doc.SourceType,
 | |
| 		Status:        int32(entity.DocumentStatusChunking),
 | |
| 		FailReason:    "",
 | |
| 		ParseRule:     doc.ParseRule,
 | |
| 	}
 | |
| 	columnMap := map[int64]int64{}
 | |
| 	// If it is a tabular knowledge base - > create a new table
 | |
| 	if doc.DocumentType == int32(knowledgeModel.DocumentTypeTable) {
 | |
| 		if doc.TableInfo != nil {
 | |
| 			newTableInfo := entity.TableInfo{}
 | |
| 			data, err := sonic.Marshal(doc.TableInfo)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			err = sonic.Unmarshal(data, &newTableInfo)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			newDoc.TableInfo = &newTableInfo
 | |
| 		}
 | |
| 		err = k.createTable(ctx, &newDoc)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		newColumnName2IDMap := map[string]int64{}
 | |
| 		for i := range newDoc.TableInfo.Columns {
 | |
| 			newColumnName2IDMap[newDoc.TableInfo.Columns[i].Name] = newDoc.TableInfo.Columns[i].ID
 | |
| 		}
 | |
| 		oldColumnName2IDMap := map[string]int64{}
 | |
| 		for i := range doc.TableInfo.Columns {
 | |
| 			oldColumnName2IDMap[doc.TableInfo.Columns[i].Name] = doc.TableInfo.Columns[i].ID
 | |
| 			newDoc.TableInfo.Columns[i].ID = newColumnName2IDMap[doc.TableInfo.Columns[i].Name]
 | |
| 		}
 | |
| 		for i := range doc.TableInfo.Columns {
 | |
| 			columnMap[oldColumnName2IDMap[doc.TableInfo.Columns[i].Name]] = newDoc.TableInfo.Columns[i].ID
 | |
| 		}
 | |
| 		copyCtx.NewRDBTableNames = append(copyCtx.NewRDBTableNames, newDoc.TableInfo.PhysicalTableName)
 | |
| 	}
 | |
| 
 | |
| 	docEntity, err := k.fromModelDocument(ctx, &newDoc)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	fields, err := k.mapSearchFields(docEntity)
 | |
| 	if err != nil {
 | |
| 		logs.CtxErrorf(ctx, "map search fields failed, err: %v", err)
 | |
| 		return errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	indexingFields := getIndexingFields(fields)
 | |
| 	collectionName := getCollectionName(newDoc.KnowledgeID)
 | |
| 
 | |
| 	sliceIDs, err := k.sliceRepo.GetDocumentSliceIDs(ctx, []int64{doc.ID})
 | |
| 	if err != nil {
 | |
| 		return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	err = k.documentRepo.Create(ctx, &newDoc)
 | |
| 	if err != nil {
 | |
| 		return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		status := int32(entity.DocumentStatusEnable)
 | |
| 		msg := ""
 | |
| 		if err != nil {
 | |
| 			status = int32(entity.DocumentStatusFailed)
 | |
| 			msg = err.Error()
 | |
| 		}
 | |
| 		updateErr := k.documentRepo.SetStatus(ctx, newDoc.ID, status, msg)
 | |
| 		if updateErr != nil {
 | |
| 			logs.CtxErrorf(ctx, "update document status failed, err: %v", updateErr)
 | |
| 		}
 | |
| 	}()
 | |
| 	batchSize := 100
 | |
| 	for i := 0; i < len(sliceIDs); i += batchSize {
 | |
| 		end := i + batchSize
 | |
| 		if end > len(sliceIDs) {
 | |
| 			end = len(sliceIDs)
 | |
| 		}
 | |
| 		sliceInfo, err := k.sliceRepo.MGetSlices(ctx, sliceIDs[i:end])
 | |
| 		if err != nil {
 | |
| 			return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 		}
 | |
| 		newSliceModels := make([]*model.KnowledgeDocumentSlice, 0)
 | |
| 		newSliceIDs, err := k.genMultiIDs(ctx, len(sliceInfo))
 | |
| 		if err != nil {
 | |
| 			return errorx.New(errno.ErrKnowledgeIDGenCode, errorx.KV("msg", err.Error()))
 | |
| 		}
 | |
| 		old2NewIDMap := map[int64]int64{}
 | |
| 		newMap := map[int64]*model.KnowledgeDocumentSlice{}
 | |
| 		for t := range sliceInfo {
 | |
| 			old2NewIDMap[sliceInfo[t].ID] = newSliceIDs[t]
 | |
| 			newSliceModel := model.KnowledgeDocumentSlice{
 | |
| 				ID:          old2NewIDMap[sliceInfo[t].ID],
 | |
| 				KnowledgeID: copyCtx.CopyTask.TargetDataID,
 | |
| 				DocumentID:  newDocID,
 | |
| 				Content:     sliceInfo[t].Content,
 | |
| 				Sequence:    sliceInfo[t].Sequence,
 | |
| 				CreatedAt:   time.Now().UnixMilli(),
 | |
| 				UpdatedAt:   time.Now().UnixMilli(),
 | |
| 				CreatorID:   copyCtx.CopyTask.TargetUserID,
 | |
| 				SpaceID:     copyCtx.CopyTask.TargetSpaceID,
 | |
| 				Status:      int32(model.SliceStatusDone),
 | |
| 				FailReason:  "",
 | |
| 				Hit:         0,
 | |
| 			}
 | |
| 			newMap[newSliceIDs[t]] = &newSliceModel
 | |
| 		}
 | |
| 
 | |
| 		var sliceEntities []*entity.Slice
 | |
| 		if doc.DocumentType == int32(knowledgeModel.DocumentTypeTable) {
 | |
| 			sliceMap, err := k.selectTableData(ctx, doc.TableInfo, sliceInfo)
 | |
| 			if err != nil {
 | |
| 				logs.CtxErrorf(ctx, "select table data failed, err: %v", err)
 | |
| 				return err
 | |
| 			}
 | |
| 			newSlices := make([]*entity.Slice, 0)
 | |
| 			for id, info := range sliceMap {
 | |
| 				info.DocumentID = newDocID
 | |
| 				info.Hit = 0
 | |
| 				info.DocumentName = doc.Name
 | |
| 				info.ID = old2NewIDMap[id]
 | |
| 				for t := range info.RawContent[0].Table.Columns {
 | |
| 					info.RawContent[0].Table.Columns[t].ColumnID = columnMap[info.RawContent[0].Table.Columns[t].ColumnID]
 | |
| 				}
 | |
| 				newSlices = append(newSlices, info)
 | |
| 			}
 | |
| 			err = k.upsertDataToTable(ctx, newDoc.TableInfo, newSlices)
 | |
| 			if err != nil {
 | |
| 				logs.CtxErrorf(ctx, "upsert data to table failed, err: %v", err)
 | |
| 				return err
 | |
| 			}
 | |
| 			sliceEntities = newSlices
 | |
| 		}
 | |
| 
 | |
| 		for _, v := range newMap {
 | |
| 			cpSlice := v
 | |
| 			newSliceModels = append(newSliceModels, cpSlice)
 | |
| 			if doc.DocumentType != int32(knowledgeModel.DocumentTypeTable) {
 | |
| 				sliceEntities = append(sliceEntities, k.fromModelSlice(ctx, cpSlice))
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		ssDocs, err := slices.TransformWithErrorCheck(sliceEntities, func(a *entity.Slice) (*schema.Document, error) {
 | |
| 			return k.slice2Document(ctx, docEntity, a)
 | |
| 		})
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		for _, mgr := range k.searchStoreManagers {
 | |
| 			ss, err := mgr.GetSearchStore(ctx, collectionName)
 | |
| 			if err != nil {
 | |
| 				return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", err.Error()))
 | |
| 			}
 | |
| 
 | |
| 			if _, err = ss.Store(ctx, ssDocs,
 | |
| 				searchstore.WithIndexerPartitionKey(fieldNameDocumentID),
 | |
| 				searchstore.WithPartition(strconv.FormatInt(newDoc.ID, 10)),
 | |
| 				searchstore.WithIndexingFields(indexingFields),
 | |
| 			); err != nil {
 | |
| 				return errorx.New(errno.ErrKnowledgeSearchStoreCode, errorx.KV("msg", err.Error()))
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		err = k.sliceRepo.BatchCreate(ctx, newSliceModels)
 | |
| 		if err != nil {
 | |
| 			return errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| func (k *knowledgeSVC) createTable(ctx context.Context, doc *model.KnowledgeDocument) error {
 | |
| 	// Tabular knowledge base, creating tables
 | |
| 	rdbColumns := []*rdbEntity.Column{}
 | |
| 	tableColumns := doc.TableInfo.Columns
 | |
| 	columnIDs, err := k.genMultiIDs(ctx, len(tableColumns)+1)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	for i := range tableColumns {
 | |
| 		tableColumns[i].ID = columnIDs[i]
 | |
| 		rdbColumns = append(rdbColumns, &rdbEntity.Column{
 | |
| 			Name:     convert.ColumnIDToRDBField(columnIDs[i]),
 | |
| 			DataType: convert.ConvertColumnType(tableColumns[i].Type),
 | |
| 			NotNull:  tableColumns[i].Indexing,
 | |
| 		})
 | |
| 	}
 | |
| 	doc.TableInfo.Columns = append(doc.TableInfo.Columns, &entity.TableColumn{
 | |
| 		ID:          columnIDs[len(columnIDs)-1],
 | |
| 		Name:        consts.RDBFieldID,
 | |
| 		Type:        document.TableColumnTypeInteger,
 | |
| 		Description: "主键ID",
 | |
| 		Indexing:    false,
 | |
| 		Sequence:    -1,
 | |
| 	})
 | |
| 	// Add a primary key ID to each table
 | |
| 	rdbColumns = append(rdbColumns, &rdbEntity.Column{
 | |
| 		Name:     consts.RDBFieldID,
 | |
| 		DataType: rdbEntity.TypeBigInt,
 | |
| 		NotNull:  true,
 | |
| 	})
 | |
| 	// Create a data table
 | |
| 	resp, err := k.rdb.CreateTable(ctx, &rdb.CreateTableRequest{
 | |
| 		Table: &rdbEntity.Table{
 | |
| 			Columns: rdbColumns,
 | |
| 			Indexes: []*rdbEntity.Index{
 | |
| 				{
 | |
| 					Name:    "pk",
 | |
| 					Type:    rdbEntity.PrimaryKey,
 | |
| 					Columns: []string{consts.RDBFieldID},
 | |
| 				},
 | |
| 			},
 | |
| 		},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		logs.CtxErrorf(ctx, "create table failed, err: %v", err)
 | |
| 		return err
 | |
| 	}
 | |
| 	doc.TableInfo = &entity.TableInfo{
 | |
| 		VirtualTableName:  doc.Name,
 | |
| 		PhysicalTableName: resp.Table.Name,
 | |
| 		TableDesc:         doc.TableInfo.TableDesc,
 | |
| 		Columns:           doc.TableInfo.Columns,
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type knowledgeCopyCtx struct {
 | |
| 	OriginData       *model.Knowledge
 | |
| 	CopyTask         *copyEntity.CopyDataTask
 | |
| 	NewRDBTableNames []string
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) MoveKnowledgeToLibrary(ctx context.Context, request *MoveKnowledgeToLibraryRequest) error {
 | |
| 	if request == nil || request.KnowledgeID == 0 {
 | |
| 		return errors.New("invalid request")
 | |
| 	}
 | |
| 	kn, err := k.knowledgeRepo.GetByID(ctx, request.KnowledgeID)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if kn == nil || kn.ID == 0 {
 | |
| 		return errors.New("knowledge not found")
 | |
| 	}
 | |
| 	kn.AppID = 0
 | |
| 	err = k.knowledgeRepo.Update(ctx, kn)
 | |
| 	return err
 | |
| }
 |