691 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			691 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  * Copyright 2025 coze-dev Authors
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| 
 | |
| package service
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"encoding/base64"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"strconv"
 | |
| 
 | |
| 	"github.com/xuri/excelize/v2"
 | |
| 
 | |
| 	knowledgeModel "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/knowledge"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/knowledge/entity"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/knowledge/internal/consts"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/knowledge/internal/convert"
 | |
| 	"github.com/coze-dev/coze-studio/backend/infra/contract/document"
 | |
| 	"github.com/coze-dev/coze-studio/backend/infra/contract/document/parser"
 | |
| 	"github.com/coze-dev/coze-studio/backend/infra/contract/rdb"
 | |
| 	rentity "github.com/coze-dev/coze-studio/backend/infra/contract/rdb/entity"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/errorx"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
 | |
| 	"github.com/coze-dev/coze-studio/backend/types/errno"
 | |
| )
 | |
| 
 | |
| func (k *knowledgeSVC) GetAlterTableSchema(ctx context.Context, req *AlterTableSchemaRequest) (*TableSchemaResponse, error) {
 | |
| 	if (req.OriginTableMeta == nil && req.PreviewTableMeta != nil) ||
 | |
| 		(req.OriginTableMeta != nil && req.PreviewTableMeta == nil) {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "invalid table meta param"))
 | |
| 	}
 | |
| 
 | |
| 	tableInfo, err := k.GetDocumentTableInfoByID(ctx, req.DocumentID, true)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("[AlterTableSchema] getDocumentTableInfoByID: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return k.FormatTableSchemaResponse(&TableSchemaResponse{
 | |
| 		TableSheet:     tableInfo.TableSheet,
 | |
| 		AllTableSheets: []*entity.TableSheet{tableInfo.TableSheet},
 | |
| 		TableMeta:      tableInfo.TableMeta,
 | |
| 		PreviewData:    tableInfo.PreviewData,
 | |
| 	}, req.PreviewTableMeta, req.TableDataType)
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) GetImportDataTableSchema(ctx context.Context, req *ImportDataTableSchemaRequest) (resp *TableSchemaResponse, err error) {
 | |
| 	if (req.OriginTableMeta == nil && req.PreviewTableMeta != nil) ||
 | |
| 		(req.OriginTableMeta != nil && req.PreviewTableMeta == nil) {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "invalid table meta param"))
 | |
| 	}
 | |
| 
 | |
| 	reqSheet := req.TableSheet
 | |
| 	if reqSheet == nil {
 | |
| 		reqSheet = &entity.TableSheet{
 | |
| 			SheetId:       0,
 | |
| 			HeaderLineIdx: 0,
 | |
| 			StartLineIdx:  1,
 | |
| 			TotalRows:     20,
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		sheet             *rawSheet
 | |
| 		savedDoc          = &TableSchemaResponse{}
 | |
| 		targetColumns     []*entity.TableColumn
 | |
| 		alignCurrentTable = req.DocumentID != nil && req.PreviewTableMeta == nil
 | |
| 		allSheets         []*entity.TableSheet
 | |
| 	)
 | |
| 
 | |
| 	if alignCurrentTable {
 | |
| 		savedDoc, err = k.GetDocumentTableInfoByID(ctx, *req.DocumentID, false)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("[GetImportDataTableSchema] getDocumentTableInfoByID failed, %w", err)
 | |
| 		}
 | |
| 		targetColumns = savedDoc.TableMeta
 | |
| 	} else {
 | |
| 		targetColumns = req.PreviewTableMeta
 | |
| 	}
 | |
| 
 | |
| 	if req.SourceInfo.FileType != nil && *req.SourceInfo.FileType == string(parser.FileExtensionXLSX) {
 | |
| 		allRawSheets, err := k.LoadSourceInfoAllSheets(ctx, req.SourceInfo, &entity.ParsingStrategy{
 | |
| 			HeaderLine:    int(reqSheet.HeaderLineIdx),
 | |
| 			DataStartLine: int(reqSheet.StartLineIdx),
 | |
| 			RowsCount:     int(reqSheet.TotalRows),
 | |
| 		}, targetColumns)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("[GetImportDataTableSchema] LoadSourceInfoAllSheets failed, %w", err)
 | |
| 		}
 | |
| 
 | |
| 		for i := range allRawSheets {
 | |
| 			s := allRawSheets[i]
 | |
| 			allSheets = append(allSheets, s.sheet)
 | |
| 			if s.sheet.SheetId == reqSheet.SheetId {
 | |
| 				sheet = s
 | |
| 			}
 | |
| 		}
 | |
| 	} else {
 | |
| 		sheet, err = k.LoadSourceInfoSpecificSheet(ctx, req.SourceInfo, &entity.ParsingStrategy{
 | |
| 			SheetID:       reqSheet.SheetId,
 | |
| 			HeaderLine:    int(reqSheet.HeaderLineIdx),
 | |
| 			DataStartLine: int(reqSheet.StartLineIdx),
 | |
| 			RowsCount:     int(reqSheet.TotalRows),
 | |
| 		}, targetColumns)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("[GetImportDataTableSchema] loadTableSourceInfo failed, %w", err)
 | |
| 		}
 | |
| 		if sheet.sheet.SheetName == "" {
 | |
| 			sheet.sheet.SheetName = "default"
 | |
| 		}
 | |
| 		allSheets = []*entity.TableSheet{sheet.sheet}
 | |
| 	}
 | |
| 
 | |
| 	// first time import / import with current document schema
 | |
| 	if !alignCurrentTable {
 | |
| 		return k.FormatTableSchemaResponse(&TableSchemaResponse{
 | |
| 			TableSheet:     sheet.sheet,
 | |
| 			AllTableSheets: allSheets,
 | |
| 			TableMeta:      sheet.cols,
 | |
| 			PreviewData:    sheet.vals,
 | |
| 		}, req.PreviewTableMeta, req.TableDataType)
 | |
| 	}
 | |
| 
 | |
| 	return k.FormatTableSchemaResponse(&TableSchemaResponse{
 | |
| 		TableSheet:     savedDoc.TableSheet,
 | |
| 		AllTableSheets: allSheets,
 | |
| 		TableMeta:      sheet.cols,
 | |
| 		PreviewData:    sheet.vals,
 | |
| 	}, savedDoc.TableMeta, req.TableDataType)
 | |
| }
 | |
| 
 | |
| // FormatTableSchemaResponse format table schema and data
 | |
| // originalResp is raw data before format
 | |
| // prevTableMeta is table schema to be displayed
 | |
| func (k *knowledgeSVC) FormatTableSchemaResponse(originalResp *TableSchemaResponse, prevTableMeta []*entity.TableColumn, tableDataType TableDataType) (
 | |
| 	*TableSchemaResponse, error,
 | |
| ) {
 | |
| 	switch tableDataType {
 | |
| 	case AllData, OnlyPreview:
 | |
| 		if prevTableMeta == nil {
 | |
| 			if tableDataType == AllData {
 | |
| 				return &TableSchemaResponse{
 | |
| 					TableSheet:     originalResp.TableSheet,
 | |
| 					AllTableSheets: originalResp.AllTableSheets,
 | |
| 					TableMeta:      originalResp.TableMeta,
 | |
| 					PreviewData:    originalResp.PreviewData,
 | |
| 				}, nil
 | |
| 			} else {
 | |
| 				return &TableSchemaResponse{
 | |
| 					PreviewData: originalResp.PreviewData,
 | |
| 				}, nil
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		isFirstImport := true
 | |
| 		for _, col := range prevTableMeta {
 | |
| 			if col.ID != 0 {
 | |
| 				isFirstImport = false
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 		prevData := make([][]*document.ColumnData, 0, len(originalResp.PreviewData))
 | |
| 		for _, row := range originalResp.PreviewData {
 | |
| 			prevRow := make([]*document.ColumnData, len(prevTableMeta))
 | |
| 
 | |
| 			if isFirstImport {
 | |
| 				// align by sequence, for there's no column id
 | |
| 				for i, col := range prevTableMeta {
 | |
| 					if int(col.Sequence) < len(row) {
 | |
| 						prevRow[i] = row[int(col.Sequence)]
 | |
| 						prevRow[i].Type = col.Type
 | |
| 						prevRow[i].ColumnName = col.Name
 | |
| 					} else {
 | |
| 						prevRow[i] = &document.ColumnData{
 | |
| 							ColumnID:   col.ID,
 | |
| 							ColumnName: col.Name,
 | |
| 							Type:       col.Type,
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 			} else {
 | |
| 				// align by column id
 | |
| 				mp := make(map[int64]*document.ColumnData, len(row))
 | |
| 				for _, item := range row {
 | |
| 					cp := item
 | |
| 					mp[cp.ColumnID] = cp
 | |
| 				}
 | |
| 
 | |
| 				for i, col := range prevTableMeta {
 | |
| 					if data, found := mp[col.ID]; found && col.ID != 0 {
 | |
| 						prevRow[i] = data
 | |
| 					} else {
 | |
| 						prevRow[i] = &document.ColumnData{
 | |
| 							ColumnID:   col.ID,
 | |
| 							ColumnName: col.Name,
 | |
| 							Type:       col.Type,
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			prevData = append(prevData, prevRow)
 | |
| 		}
 | |
| 
 | |
| 		if tableDataType == AllData {
 | |
| 			return &TableSchemaResponse{
 | |
| 				TableSheet:     originalResp.TableSheet,
 | |
| 				AllTableSheets: originalResp.AllTableSheets,
 | |
| 				TableMeta:      prevTableMeta,
 | |
| 				PreviewData:    prevData,
 | |
| 			}, nil
 | |
| 		}
 | |
| 
 | |
| 		return &TableSchemaResponse{
 | |
| 			PreviewData: prevData,
 | |
| 			TableSheet:  originalResp.TableSheet,
 | |
| 			TableMeta:   prevTableMeta,
 | |
| 		}, nil
 | |
| 
 | |
| 	case OnlySchema:
 | |
| 		return &TableSchemaResponse{
 | |
| 			TableSheet:     originalResp.TableSheet,
 | |
| 			AllTableSheets: originalResp.AllTableSheets,
 | |
| 			TableMeta:      prevTableMeta,
 | |
| 		}, nil
 | |
| 
 | |
| 	default:
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "invalid table data type"))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) ValidateTableSchema(ctx context.Context, request *ValidateTableSchemaRequest) (*ValidateTableSchemaResponse, error) {
 | |
| 	if request.DocumentID == 0 {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "invalid document id"))
 | |
| 	}
 | |
| 
 | |
| 	docs, err := k.documentRepo.MGetByID(ctx, []int64{request.DocumentID})
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))
 | |
| 	}
 | |
| 	if len(docs) == 0 {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeDocumentNotExistCode, errorx.KV("msg", "document not found"))
 | |
| 	}
 | |
| 
 | |
| 	doc := docs[0]
 | |
| 	sheet, err := k.LoadSourceInfoSpecificSheet(ctx, request.SourceInfo, &entity.ParsingStrategy{
 | |
| 		SheetID:       request.TableSheet.SheetId,
 | |
| 		HeaderLine:    int(request.TableSheet.HeaderLineIdx),
 | |
| 		DataStartLine: int(request.TableSheet.StartLineIdx),
 | |
| 		RowsCount:     5, // parse few rows for type assertion
 | |
| 	}, doc.TableInfo.Columns)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	src := sheet
 | |
| 	dst := doc.TableInfo
 | |
| 	result := make(map[string]string)
 | |
| 
 | |
| 	// validate 通过条件:
 | |
| 	// 1. 表头名称对齐(不要求顺序一致)
 | |
| 	// 2. indexing 列必须有值, 其余列可以为空
 | |
| 	// 3. 值类型可转换
 | |
| 	// 4. 已有表表头字段全包含
 | |
| 	dstMapping := make(map[string]*entity.TableColumn)
 | |
| 	for _, col := range dst.Columns {
 | |
| 		dstCol := col
 | |
| 		if col.Name == consts.RDBFieldID {
 | |
| 			continue
 | |
| 		}
 | |
| 		dstMapping[dstCol.Name] = dstCol
 | |
| 	}
 | |
| 
 | |
| 	for i, srcCol := range src.cols {
 | |
| 		name := srcCol.Name
 | |
| 		dstCol, found := dstMapping[name]
 | |
| 		if !found {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		delete(dstMapping, name)
 | |
| 		if convert.TransformColumnType(srcCol.Type, dstCol.Type) != dstCol.Type {
 | |
| 			result[name] = fmt.Sprintf("column type invalid, expected=%d, got=%d", dstCol.Type, srcCol.Type)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if dstCol.Indexing {
 | |
| 			for _, vals := range src.vals {
 | |
| 				val := vals[i]
 | |
| 				if val.GetStringValue() == "" {
 | |
| 					result[name] = "column indexing requires value, but got none"
 | |
| 					continue
 | |
| 
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if len(dstMapping) != 0 {
 | |
| 		for _, col := range dstMapping {
 | |
| 			result[col.Name] = "column not found in provided data"
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return &ValidateTableSchemaResponse{
 | |
| 		ColumnValidResult: result,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) GetDocumentTableInfo(ctx context.Context, request *GetDocumentTableInfoRequest) (*GetDocumentTableInfoResponse, error) {
 | |
| 	if request.DocumentID == nil && request.SourceInfo == nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "invalid param"))
 | |
| 	}
 | |
| 
 | |
| 	if request.DocumentID != nil {
 | |
| 		info, err := k.GetDocumentTableInfoByID(ctx, *request.DocumentID, true)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if info.Code != 0 {
 | |
| 			return &GetDocumentTableInfoResponse{
 | |
| 				Code: info.Code,
 | |
| 				Msg:  info.Msg,
 | |
| 			}, nil
 | |
| 		}
 | |
| 
 | |
| 		prevData := make([]map[string]string, 0, len(info.PreviewData))
 | |
| 		for _, row := range info.PreviewData {
 | |
| 			mp := make(map[string]string, len(row))
 | |
| 			for i, col := range row {
 | |
| 				mp[strconv.FormatInt(int64(i), 10)] = col.GetStringValue()
 | |
| 			}
 | |
| 			prevData = append(prevData, mp)
 | |
| 		}
 | |
| 
 | |
| 		return &GetDocumentTableInfoResponse{
 | |
| 			TableSheet:  []*entity.TableSheet{info.TableSheet},
 | |
| 			TableMeta:   map[string][]*entity.TableColumn{"0": info.TableMeta},
 | |
| 			PreviewData: map[string][]map[string]string{"0": prevData},
 | |
| 		}, nil
 | |
| 	}
 | |
| 
 | |
| 	sheets, err := k.LoadSourceInfoAllSheets(ctx, *request.SourceInfo, &entity.ParsingStrategy{
 | |
| 		HeaderLine:    0,
 | |
| 		DataStartLine: 1,
 | |
| 		RowsCount:     0, // get all rows
 | |
| 	}, nil)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		tableSheet = make([]*entity.TableSheet, 0, len(sheets))
 | |
| 		tableMeta  = make(map[string][]*entity.TableColumn, len(sheets))
 | |
| 		prevData   = make(map[string][]map[string]string, len(sheets))
 | |
| 	)
 | |
| 
 | |
| 	for i, s := range sheets {
 | |
| 		tableSheet = append(tableSheet, s.sheet)
 | |
| 		tableMeta[strconv.FormatInt(int64(i), 10)] = s.cols
 | |
| 
 | |
| 		data := make([]map[string]string, 0, len(s.vals))
 | |
| 		for j, row := range s.vals {
 | |
| 			if j > 20 { // get first 20 rows as preview
 | |
| 				break
 | |
| 			}
 | |
| 			valMapping := make(map[string]string)
 | |
| 			for k, v := range row {
 | |
| 				valMapping[strconv.FormatInt(int64(k), 10)] = v.GetStringValue()
 | |
| 			}
 | |
| 			data = append(data, valMapping)
 | |
| 		}
 | |
| 		prevData[strconv.FormatInt(int64(i), 10)] = data
 | |
| 	}
 | |
| 
 | |
| 	return &GetDocumentTableInfoResponse{
 | |
| 		TableSheet:  tableSheet,
 | |
| 		TableMeta:   tableMeta,
 | |
| 		PreviewData: prevData,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // GetDocumentTableInfoByID 先不作为接口,有需要再改
 | |
| func (k *knowledgeSVC) GetDocumentTableInfoByID(ctx context.Context, documentID int64, needData bool) (*TableSchemaResponse, error) {
 | |
| 	docs, err := k.documentRepo.MGetByID(ctx, []int64{documentID})
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeDBCode, errorx.KVf("msg", "get document failed: %v", err))
 | |
| 	}
 | |
| 
 | |
| 	if len(docs) == 0 {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeDocumentNotExistCode, errorx.KVf("msg", "document not found, id=%d", documentID))
 | |
| 	}
 | |
| 
 | |
| 	doc := docs[0]
 | |
| 	if doc.DocumentType != int32(knowledgeModel.DocumentTypeTable) {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeSystemCode, errorx.KV("msg", "document type invalid"))
 | |
| 	}
 | |
| 
 | |
| 	tblInfo := doc.TableInfo
 | |
| 	cols := k.filterIDColumn(tblInfo.Columns) // filter `id`
 | |
| 	var sheet *entity.TableSheet
 | |
| 	if doc.ParseRule.ParsingStrategy != nil {
 | |
| 		sheet = &entity.TableSheet{
 | |
| 			SheetId:       doc.ParseRule.ParsingStrategy.SheetID,
 | |
| 			HeaderLineIdx: int64(doc.ParseRule.ParsingStrategy.HeaderLine),
 | |
| 			StartLineIdx:  int64(doc.ParseRule.ParsingStrategy.DataStartLine),
 | |
| 			SheetName:     doc.Name,
 | |
| 			TotalRows:     doc.SliceCount,
 | |
| 		}
 | |
| 	} else {
 | |
| 		sheet = &entity.TableSheet{
 | |
| 			SheetId:       0,
 | |
| 			HeaderLineIdx: 0,
 | |
| 			StartLineIdx:  1,
 | |
| 			SheetName:     "default",
 | |
| 			TotalRows:     0,
 | |
| 		}
 | |
| 	}
 | |
| 	if !needData {
 | |
| 		return &TableSchemaResponse{
 | |
| 			TableSheet:     sheet,
 | |
| 			AllTableSheets: []*entity.TableSheet{sheet},
 | |
| 			TableMeta:      cols,
 | |
| 		}, nil
 | |
| 	}
 | |
| 
 | |
| 	rows, err := k.rdb.SelectData(ctx, &rdb.SelectDataRequest{
 | |
| 		TableName: tblInfo.PhysicalTableName,
 | |
| 		Limit:     ptr.Of(20),
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeCrossDomainCode, errorx.KVf("msg", "select data failed: %v", err))
 | |
| 	}
 | |
| 
 | |
| 	data, err := k.ParseRDBData(cols, rows.ResultSet)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &TableSchemaResponse{
 | |
| 		TableSheet:     sheet,
 | |
| 		AllTableSheets: []*entity.TableSheet{sheet},
 | |
| 		TableMeta:      cols,
 | |
| 		PreviewData:    data,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) LoadSourceInfoAllSheets(ctx context.Context, sourceInfo TableSourceInfo, ps *entity.ParsingStrategy, columns []*entity.TableColumn) (
 | |
| 	sheets []*rawSheet, err error,
 | |
| ) {
 | |
| 	switch {
 | |
| 	case sourceInfo.FileType != nil && (sourceInfo.Uri != nil || sourceInfo.FileBase64 != nil):
 | |
| 		var b []byte
 | |
| 		if sourceInfo.Uri != nil {
 | |
| 			b, err = k.storage.GetObject(ctx, *sourceInfo.Uri)
 | |
| 			if err != nil {
 | |
| 				return nil, errorx.New(errno.ErrKnowledgeGetObjectURLFailCode, errorx.KVf("msg", "get object failed: %v", err))
 | |
| 			}
 | |
| 		} else {
 | |
| 			b, err = base64.StdEncoding.DecodeString(*sourceInfo.FileBase64)
 | |
| 			if err != nil {
 | |
| 				return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KVf("msg", "decode base64 failed: %v", err))
 | |
| 			}
 | |
| 		}
 | |
| 		if *sourceInfo.FileType == string(parser.FileExtensionXLSX) {
 | |
| 			f, err := excelize.OpenReader(bytes.NewReader(b))
 | |
| 			if err != nil {
 | |
| 				return nil, errorx.New(errno.ErrKnowledgeParserParseFailCode, errorx.KVf("msg", "open xlsx file failed: %v", err))
 | |
| 			}
 | |
| 			for i, sheet := range f.GetSheetList() {
 | |
| 				newPS := &entity.ParsingStrategy{
 | |
| 					SheetID:       int64(i),
 | |
| 					HeaderLine:    ps.HeaderLine,
 | |
| 					DataStartLine: ps.DataStartLine,
 | |
| 					RowsCount:     ps.RowsCount,
 | |
| 				}
 | |
| 
 | |
| 				rs, err := k.LoadSheet(ctx, b, newPS, *sourceInfo.FileType, &sheet, columns)
 | |
| 				if err != nil {
 | |
| 					return nil, errorx.New(errno.ErrKnowledgeParserParseFailCode, errorx.KVf("msg", "load xlsx sheet failed: %v", err))
 | |
| 				}
 | |
| 
 | |
| 				sheets = append(sheets, rs)
 | |
| 			}
 | |
| 		} else {
 | |
| 			rs, err := k.LoadSheet(ctx, b, ps, *sourceInfo.FileType, nil, columns)
 | |
| 			if err != nil {
 | |
| 				return nil, errorx.New(errno.ErrKnowledgeParserParseFailCode, errorx.KVf("msg", "load xlsx sheet failed: %v", err))
 | |
| 			}
 | |
| 
 | |
| 			sheets = append(sheets, rs)
 | |
| 		}
 | |
| 
 | |
| 	case sourceInfo.CustomContent != nil:
 | |
| 		rs, err := k.LoadSourceInfoSpecificSheet(ctx, sourceInfo, ps, columns)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		sheets = append(sheets, rs)
 | |
| 
 | |
| 	default:
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "invalid table source info"))
 | |
| 	}
 | |
| 
 | |
| 	return sheets, nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) LoadSourceInfoSpecificSheet(ctx context.Context, sourceInfo TableSourceInfo, ps *entity.ParsingStrategy, columns []*entity.TableColumn) (
 | |
| 	sheet *rawSheet, err error,
 | |
| ) {
 | |
| 	var b []byte
 | |
| 	switch {
 | |
| 	case sourceInfo.FileType != nil && (sourceInfo.Uri != nil || sourceInfo.FileBase64 != nil):
 | |
| 		if sourceInfo.Uri != nil {
 | |
| 			b, err = k.storage.GetObject(ctx, *sourceInfo.Uri)
 | |
| 			if err != nil {
 | |
| 				return nil, errorx.New(errno.ErrKnowledgeGetObjectURLFailCode, errorx.KVf("msg", "get object failed: %v", err))
 | |
| 			}
 | |
| 		} else {
 | |
| 			b, err = base64.StdEncoding.DecodeString(*sourceInfo.FileBase64)
 | |
| 			if err != nil {
 | |
| 				return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KVf("msg", "decode base64 failed: %v", err))
 | |
| 			}
 | |
| 		}
 | |
| 	case sourceInfo.CustomContent != nil:
 | |
| 		b, err = json.Marshal(sourceInfo.CustomContent)
 | |
| 		if err != nil {
 | |
| 			return nil, errorx.New(errno.ErrKnowledgeParseJSONCode, errorx.KVf("msg", "marshal custom content failed: %v", err))
 | |
| 		}
 | |
| 	default:
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "invalid table source info"))
 | |
| 	}
 | |
| 
 | |
| 	sheet, err = k.LoadSheet(ctx, b, ps, *sourceInfo.FileType, nil, columns)
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeParserParseFailCode, errorx.KVf("msg", "load sheet failed: %v", err))
 | |
| 	}
 | |
| 
 | |
| 	return sheet, nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) LoadSheet(ctx context.Context, b []byte, ps *entity.ParsingStrategy, fileExtension string, sheetName *string, columns []*entity.TableColumn) (*rawSheet, error) {
 | |
| 	pConfig := convert.ToParseConfig(parser.FileExtension(fileExtension), ps, nil, false, columns)
 | |
| 	p, err := k.parseManager.GetParser(pConfig)
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeParserParseFailCode, errorx.KVf("msg", "get parser failed: %v", err))
 | |
| 	}
 | |
| 
 | |
| 	docs, err := p.Parse(ctx, bytes.NewReader(b))
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeParserParseFailCode, errorx.KVf("msg", "parse failed: %v", err))
 | |
| 	}
 | |
| 
 | |
| 	if len(docs) == 0 {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeParseResultEmptyCode, errorx.KVf("msg", "parse result is empty"))
 | |
| 	}
 | |
| 
 | |
| 	sheet := &entity.TableSheet{
 | |
| 		SheetId:       ps.SheetID,
 | |
| 		HeaderLineIdx: int64(ps.HeaderLine),
 | |
| 		StartLineIdx:  int64(ps.DataStartLine),
 | |
| 		TotalRows:     int64(len(docs)),
 | |
| 	}
 | |
| 	if sheetName != nil {
 | |
| 		sheet.SheetName = ptr.From(sheetName)
 | |
| 	}
 | |
| 
 | |
| 	srcColumns, err := document.GetDocumentColumns(docs[0])
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("[LoadSheet] get columns failed, %w", err)
 | |
| 	}
 | |
| 
 | |
| 	cols := make([]*entity.TableColumn, 0, len(srcColumns))
 | |
| 	for i, col := range srcColumns {
 | |
| 		cols = append(cols, &entity.TableColumn{
 | |
| 			ID:          col.ID,
 | |
| 			Name:        col.Name,
 | |
| 			Type:        col.Type,
 | |
| 			Description: col.Description,
 | |
| 			Indexing:    false,
 | |
| 			Sequence:    int64(i),
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	if columnsOnly, err := document.GetDocumentsColumnsOnly(docs); err != nil { // unexpected
 | |
| 		return nil, fmt.Errorf("[LoadSheet] get data status failed, %w", err)
 | |
| 	} else if columnsOnly {
 | |
| 		return &rawSheet{
 | |
| 			sheet: sheet,
 | |
| 			cols:  cols,
 | |
| 		}, nil
 | |
| 	}
 | |
| 
 | |
| 	vals := make([][]*document.ColumnData, 0, len(docs))
 | |
| 	for _, doc := range docs {
 | |
| 		v, ok := doc.MetaData[document.MetaDataKeyColumnData].([]*document.ColumnData)
 | |
| 		if !ok {
 | |
| 			return nil, errorx.New(errno.ErrKnowledgeSystemCode, errorx.KVf("msg", "[LoadSheet] get columns data failed"))
 | |
| 		}
 | |
| 		vals = append(vals, v)
 | |
| 	}
 | |
| 
 | |
| 	return &rawSheet{
 | |
| 		sheet: sheet,
 | |
| 		cols:  cols,
 | |
| 		vals:  vals,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) ParseRDBData(columns []*entity.TableColumn, resultSet *rentity.ResultSet) (
 | |
| 	resp [][]*document.ColumnData, err error,
 | |
| ) {
 | |
| 	names := make([]string, 0, len(columns))
 | |
| 	for _, c := range columns {
 | |
| 		if c.Name == consts.RDBFieldID {
 | |
| 			names = append(names, consts.RDBFieldID)
 | |
| 		} else {
 | |
| 			names = append(names, convert.ColumnIDToRDBField(c.ID))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for _, row := range resultSet.Rows {
 | |
| 		parsedData := make([]*document.ColumnData, len(columns))
 | |
| 		for i, col := range columns {
 | |
| 			val, found := row[names[i]]
 | |
| 			if !found { // columns are not aligned when altering table
 | |
| 				if names[i] == consts.RDBFieldID {
 | |
| 					continue
 | |
| 				}
 | |
| 				return nil, errorx.New(errno.ErrKnowledgeSystemCode, errorx.KVf("msg", "[ParseRDBData] altering table, retry later, col=%s", col.Name))
 | |
| 			}
 | |
| 			colData, err := convert.ParseAnyData(col, val)
 | |
| 			if err != nil {
 | |
| 				return nil, errorx.New(errno.ErrKnowledgeSystemCode, errorx.KVf("msg", "[ParseRDBData] invalid column type, col=%s, type=%d", col.Name, col.Type))
 | |
| 			}
 | |
| 			parsedData[i] = colData
 | |
| 		}
 | |
| 
 | |
| 		resp = append(resp, parsedData)
 | |
| 	}
 | |
| 
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) getDocumentTableInfo(ctx context.Context, documentID int64) (*entity.TableInfo, error) {
 | |
| 	docs, err := k.documentRepo.MGetByID(ctx, []int64{documentID})
 | |
| 	if err != nil {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeDBCode, errorx.KVf("msg", "get document failed: %v", err))
 | |
| 	}
 | |
| 	if len(docs) != 1 {
 | |
| 		return nil, errorx.New(errno.ErrKnowledgeDocumentNotExistCode, errorx.KVf("msg", "document not found, id=%d", documentID))
 | |
| 	}
 | |
| 	return docs[0].TableInfo, nil
 | |
| }
 | |
| 
 | |
| func (k *knowledgeSVC) filterIDColumn(cols []*entity.TableColumn) []*entity.TableColumn {
 | |
| 	resp := make([]*entity.TableColumn, 0, len(cols))
 | |
| 	for i := range cols {
 | |
| 		col := cols[i]
 | |
| 		if col.Name == consts.RDBFieldID {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		resp = append(resp, col)
 | |
| 	}
 | |
| 
 | |
| 	return resp
 | |
| }
 | |
| 
 | |
| type rawSheet struct {
 | |
| 	sheet *entity.TableSheet
 | |
| 	cols  []*entity.TableColumn
 | |
| 	vals  [][]*document.ColumnData
 | |
| }
 |