diff --git a/backend/crossdomain/workflow/database/database.go b/backend/crossdomain/workflow/database/database.go index f770fcc7..8f7b22e2 100644 --- a/backend/crossdomain/workflow/database/database.go +++ b/backend/crossdomain/workflow/database/database.go @@ -64,6 +64,7 @@ func (d *DatabaseRepository) Execute(ctx context.Context, request *nodedatabase. SQL: &request.SQL, TableType: tableType, UserID: strconv.FormatInt(request.UserID, 10), + ConnectorID: ptr.Of(request.ConnectorID), } req.SQLParams = make([]*database.SQLParamVal, 0, len(request.Params)) @@ -106,6 +107,7 @@ func (d *DatabaseRepository) Delete(ctx context.Context, request *nodedatabase.D OperateType: database.OperateType_Delete, TableType: tableType, UserID: strconv.FormatInt(request.UserID, 10), + ConnectorID: ptr.Of(request.ConnectorID), } if request.ConditionGroup != nil { @@ -141,6 +143,7 @@ func (d *DatabaseRepository) Query(ctx context.Context, request *nodedatabase.Qu OperateType: database.OperateType_Select, TableType: tableType, UserID: strconv.FormatInt(request.UserID, 10), + ConnectorID: ptr.Of(request.ConnectorID), } req.SelectFieldList = &database.SelectFieldList{FieldID: make([]string, 0, len(request.SelectFields))} @@ -196,6 +199,7 @@ func (d *DatabaseRepository) Update(ctx context.Context, request *nodedatabase.U OperateType: database.OperateType_Update, SQLParams: make([]*database.SQLParamVal, 0), TableType: tableType, + ConnectorID: ptr.Of(request.ConnectorID), } uid := ctxutil.GetUIDFromCtx(ctx) @@ -243,6 +247,7 @@ func (d *DatabaseRepository) Insert(ctx context.Context, request *nodedatabase.I OperateType: database.OperateType_Insert, TableType: tableType, UserID: strconv.FormatInt(request.UserID, 10), + ConnectorID: ptr.Of(request.ConnectorID), } req.UpsertRows, req.SQLParams, err = resolveUpsertRow(request.Fields) diff --git a/backend/domain/workflow/crossdomain/database/database.go b/backend/domain/workflow/crossdomain/database/database.go index e8ef815c..8675ce1e 100644 --- a/backend/domain/workflow/crossdomain/database/database.go +++ b/backend/domain/workflow/crossdomain/database/database.go @@ -30,6 +30,7 @@ type CustomSQLRequest struct { Params []SQLParam IsDebugRun bool UserID int64 + ConnectorID int64 } type Object = map[string]any @@ -91,6 +92,7 @@ type DeleteRequest struct { ConditionGroup *ConditionGroup IsDebugRun bool UserID int64 + ConnectorID int64 } type QueryRequest struct { @@ -101,6 +103,7 @@ type QueryRequest struct { OrderClauses []*OrderClause IsDebugRun bool UserID int64 + ConnectorID int64 } type OrderClause struct { @@ -113,6 +116,7 @@ type UpdateRequest struct { Fields map[string]any IsDebugRun bool UserID int64 + ConnectorID int64 } type InsertRequest struct { @@ -120,6 +124,7 @@ type InsertRequest struct { Fields map[string]any IsDebugRun bool UserID int64 + ConnectorID int64 } func GetDatabaseOperator() DatabaseOperator { diff --git a/backend/domain/workflow/internal/nodes/database/common.go b/backend/domain/workflow/internal/nodes/database/common.go index f0e16670..3fd2c665 100644 --- a/backend/domain/workflow/internal/nodes/database/common.go +++ b/backend/domain/workflow/internal/nodes/database/common.go @@ -423,6 +423,14 @@ func getExecUserID(ctx context.Context) int64 { return execCtx.RootCtx.ExeCfg.Operator } +func getConnectorID(ctx context.Context) int64 { + execCtx := execute.GetExeCtx(ctx) + if execCtx == nil { + panic(fmt.Errorf("unable to get exe context")) + } + return execCtx.RootCtx.ExeCfg.ConnectorID +} + func parseToInput(input map[string]any) map[string]any { result := make(map[string]any, len(input)) for key, value := range input { diff --git a/backend/domain/workflow/internal/nodes/database/customsql.go b/backend/domain/workflow/internal/nodes/database/customsql.go index 3e05a57c..15d0ef27 100644 --- a/backend/domain/workflow/internal/nodes/database/customsql.go +++ b/backend/domain/workflow/internal/nodes/database/customsql.go @@ -104,6 +104,7 @@ func (c *CustomSQL) Invoke(ctx context.Context, input map[string]any) (map[strin DatabaseInfoID: c.databaseInfoID, IsDebugRun: isDebugExecute(ctx), UserID: getExecUserID(ctx), + ConnectorID: getConnectorID(ctx), } inputBytes, err := sonic.Marshal(input) diff --git a/backend/domain/workflow/internal/nodes/database/delete.go b/backend/domain/workflow/internal/nodes/database/delete.go index fdbb9a3e..1ca62549 100644 --- a/backend/domain/workflow/internal/nodes/database/delete.go +++ b/backend/domain/workflow/internal/nodes/database/delete.go @@ -108,6 +108,7 @@ func (d *Delete) Invoke(ctx context.Context, in map[string]any) (map[string]any, ConditionGroup: conditionGroup, IsDebugRun: isDebugExecute(ctx), UserID: getExecUserID(ctx), + ConnectorID: getConnectorID(ctx), } response, err := d.deleter.Delete(ctx, request) diff --git a/backend/domain/workflow/internal/nodes/database/insert.go b/backend/domain/workflow/internal/nodes/database/insert.go index 2f25666e..1e05a1de 100644 --- a/backend/domain/workflow/internal/nodes/database/insert.go +++ b/backend/domain/workflow/internal/nodes/database/insert.go @@ -90,6 +90,7 @@ func (is *Insert) Invoke(ctx context.Context, input map[string]any) (map[string] Fields: fields, IsDebugRun: isDebugExecute(ctx), UserID: getExecUserID(ctx), + ConnectorID: getConnectorID(ctx), } response, err := is.inserter.Insert(ctx, req) diff --git a/backend/domain/workflow/internal/nodes/database/query.go b/backend/domain/workflow/internal/nodes/database/query.go index c32f7795..34430949 100644 --- a/backend/domain/workflow/internal/nodes/database/query.go +++ b/backend/domain/workflow/internal/nodes/database/query.go @@ -141,6 +141,7 @@ func (ds *Query) Invoke(ctx context.Context, in map[string]any) (map[string]any, Limit: ds.limit, IsDebugRun: isDebugExecute(ctx), UserID: getExecUserID(ctx), + ConnectorID: getConnectorID(ctx), } req.ConditionGroup = conditionGroup diff --git a/backend/domain/workflow/internal/nodes/database/update.go b/backend/domain/workflow/internal/nodes/database/update.go index 8e3e602b..9138f00a 100644 --- a/backend/domain/workflow/internal/nodes/database/update.go +++ b/backend/domain/workflow/internal/nodes/database/update.go @@ -123,6 +123,7 @@ func (u *Update) Invoke(ctx context.Context, in map[string]any) (map[string]any, Fields: fields, IsDebugRun: isDebugExecute(ctx), UserID: getExecUserID(ctx), + ConnectorID: getConnectorID(ctx), } response, err := u.updater.Update(ctx, req) diff --git a/backend/go.mod b/backend/go.mod index a507d37e..c7cfab80 100755 --- a/backend/go.mod +++ b/backend/go.mod @@ -127,7 +127,7 @@ require ( github.com/aws/smithy-go v1.22.4 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect - github.com/bytedance/gopkg v0.1.1 // indirect + github.com/bytedance/gopkg v0.1.1 github.com/bytedance/sonic/loader v0.2.4 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect