diff --git a/backend/application/workflow/workflow.go b/backend/application/workflow/workflow.go index d006bf35..399e9742 100644 --- a/backend/application/workflow/workflow.go +++ b/backend/application/workflow/workflow.go @@ -1552,7 +1552,6 @@ func (w *ApplicationService) OpenAPIRun(ctx context.Context, req *workflow.OpenA AgentID: agentID, ConnectorID: connectorID, ConnectorUID: strconv.FormatInt(userID, 10), - TaskType: vo.TaskTypeForeground, InputFailFast: true, BizType: vo.BizTypeWorkflow, } @@ -1563,6 +1562,7 @@ func (w *ApplicationService) OpenAPIRun(ctx context.Context, req *workflow.OpenA if req.GetIsAsync() { exeCfg.SyncPattern = vo.SyncPatternAsync + exeCfg.TaskType = vo.TaskTypeBackground exeID, err := GetWorkflowDomainSVC().AsyncExecute(ctx, exeCfg, parameters) if err != nil { return nil, err @@ -1575,6 +1575,7 @@ func (w *ApplicationService) OpenAPIRun(ctx context.Context, req *workflow.OpenA } exeCfg.SyncPattern = vo.SyncPatternSync + exeCfg.TaskType = vo.TaskTypeForeground wfExe, tPlan, err := GetWorkflowDomainSVC().SyncExecute(ctx, exeCfg, parameters) if err != nil { return nil, err diff --git a/backend/domain/workflow/entity/node_meta.go b/backend/domain/workflow/entity/node_meta.go index 184d9734..83d8ffec 100644 --- a/backend/domain/workflow/entity/node_meta.go +++ b/backend/domain/workflow/entity/node_meta.go @@ -268,7 +268,6 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-LLM-v2.jpg", SupportBatch: true, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 3 * 60 * 1000, // 3 minutes PreFillZero: true, PostFillNil: true, InputSourceAware: true, @@ -288,9 +287,8 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Plugin-v2.jpg", SupportBatch: true, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 3 * 60 * 1000, // 3 minutes - PreFillZero: true, - PostFillNil: true, + PreFillZero: true, + PostFillNil: true, }, EnUSName: "Plugin", EnUSDescription: "Used to access external real-time data and perform operations", @@ -306,10 +304,9 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Code-v2.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, - UseCtxCache: true, + PreFillZero: true, + PostFillNil: true, + UseCtxCache: true, }, EnUSName: "Code", EnUSDescription: "Write code to process input variables to generate return values.", @@ -325,9 +322,8 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-KnowledgeQuery-v2.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, + PreFillZero: true, + PostFillNil: true, }, EnUSName: "Knowledge retrieval", EnUSDescription: "In the selected knowledge, the best matching information is recalled based on the input variable and returned as an Array.", @@ -371,9 +367,8 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Database-v2.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, + PreFillZero: true, + PostFillNil: true, }, EnUSName: "SQL Customization", EnUSDescription: "Complete the operations of adding, deleting, modifying and querying the database based on user-defined SQL", @@ -425,10 +420,9 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Direct-Question-v2.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, - MayUseChatModel: true, + PreFillZero: true, + PostFillNil: true, + MayUseChatModel: true, }, EnUSName: "Question", EnUSDescription: "Support asking questions to the user in the middle of the conversation, with both preset options and open-ended questions", @@ -472,10 +466,9 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Loop-v2.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - IsComposite: true, - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, + IsComposite: true, + PreFillZero: true, + PostFillNil: true, }, EnUSName: "Loop", EnUSDescription: "Used to repeatedly execute a series of tasks by setting the number of iterations and logic", @@ -491,10 +484,9 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Intent-v2.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, - MayUseChatModel: true, + PreFillZero: true, + PostFillNil: true, + MayUseChatModel: true, }, EnUSName: "Intent recognition", EnUSDescription: "Used for recognizing the intent in user input and matching it with preset intent options.", @@ -510,9 +502,8 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-KnowledgeWriting-v2.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, + PreFillZero: true, + PostFillNil: true, }, EnUSName: "Knowledge writing", EnUSDescription: "The write node can add a knowledge base of type text. Only one knowledge base can be added.", @@ -528,10 +519,9 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Batch-v2.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - IsComposite: true, - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, + IsComposite: true, + PreFillZero: true, + PostFillNil: true, }, EnUSName: "Batch", EnUSDescription: "By setting the number of batch runs and logic, run the tasks in the batch body.", @@ -670,8 +660,7 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-database-update.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, + PreFillZero: true, }, EnUSName: "Update Data", EnUSDescription: "Modify the existing data records in the table, and the user specifies the update conditions and contents to update the data", @@ -687,8 +676,7 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icaon-database-select.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, + PreFillZero: true, }, EnUSName: "Query Data", EnUSDescription: "Query data from the table, and the user can define query conditions, select columns, etc., and output the data that meets the conditions", @@ -704,8 +692,7 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-database-delete.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, + PreFillZero: true, }, EnUSName: "Delete Data", EnUSDescription: "Delete data records from the table, and the user specifies the deletion conditions to delete the records that meet the conditions", @@ -721,9 +708,8 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-HTTP.png", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, + PreFillZero: true, + PostFillNil: true, }, EnUSName: "HTTP request", EnUSDescription: "It is used to send API requests and return data from the interface.", @@ -739,8 +725,7 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-database-insert.jpg", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, + PreFillZero: true, }, EnUSName: "Add Data", EnUSDescription: "Add new data records to the table, and insert them into the database after the user enters the data content", @@ -780,8 +765,6 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ // ExecutableMeta configures certain common aspects of request-time behaviors for this node. ExecutableMeta: ExecutableMeta{ - // DefaultTimeoutMS configures the default timeout for this node, in milliseconds. 0 means no timeout. - DefaultTimeoutMS: 60 * 1000, // 1 minute // PreFillZero decides whether to pre-fill zero value for any missing fields in input. PreFillZero: true, // PostFillNil decides whether to post-fill nil value for any missing fields in output. @@ -803,10 +786,9 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-from_json.png", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, - UseCtxCache: true, + PreFillZero: true, + PostFillNil: true, + UseCtxCache: true, }, EnUSName: "JSON deserialization", EnUSDescription: "Parse JSON string to variable", @@ -822,9 +804,8 @@ var NodeTypeMetas = map[NodeType]*NodeTypeMeta{ IconURL: "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icons-dataset-delete.png", SupportBatch: false, ExecutableMeta: ExecutableMeta{ - DefaultTimeoutMS: 60 * 1000, // 1 minute - PreFillZero: true, - PostFillNil: true, + PreFillZero: true, + PostFillNil: true, }, EnUSName: "Knowledge delete", EnUSDescription: "The delete node can delete a document in knowledge base.", diff --git a/backend/domain/workflow/internal/compose/node_runner.go b/backend/domain/workflow/internal/compose/node_runner.go index ae47a15d..6a0a8921 100644 --- a/backend/domain/workflow/internal/compose/node_runner.go +++ b/backend/domain/workflow/internal/compose/node_runner.go @@ -124,13 +124,15 @@ func newNodeRunConfig[O any](ns *schema2.NodeSchema, }, opts.init...) } - opts.init = append(opts.init, func(ctx context.Context) (context.Context, error) { - current, exceeded := execute.IncrAndCheckExecutedNodes(ctx) - if exceeded { - return nil, fmt.Errorf("exceeded max executed node count: %d, current: %d", execute.GetStaticConfig().MaxNodeCountPerExecution, current) - } - return ctx, nil - }) + if execute.GetStaticConfig().MaxNodeCountPerExecution > 0 { + opts.init = append(opts.init, func(ctx context.Context) (context.Context, error) { + current, exceeded := execute.IncrementAndCheckExecutedNodes(ctx) + if exceeded { + return nil, fmt.Errorf("exceeded max executed node count: %d, current: %d", execute.GetStaticConfig().MaxNodeCountPerExecution, current) + } + return ctx, nil + }) + } return &nodeRunConfig[O]{ nodeKey: ns.Key, @@ -325,12 +327,23 @@ func (nc *nodeRunConfig[O]) invoke() func(ctx context.Context, input map[string] }() for _, i := range runner.init { - if ctx, err = i(ctx); err != nil { + var newCtx context.Context + if newCtx, err = i(ctx); err != nil { + var err1 error + if ctx, err1 = runner.onStart(ctx, input); err1 != nil { + return nil, err1 + } return nil, err + } else { + ctx = newCtx } } if input, err = runner.preProcess(ctx, input); err != nil { + var err1 error + if ctx, err1 = runner.onStart(ctx, input); err1 != nil { + return nil, err1 + } return nil, err } @@ -373,12 +386,23 @@ func (nc *nodeRunConfig[O]) stream() func(ctx context.Context, input map[string] }() for _, i := range runner.init { - if ctx, err = i(ctx); err != nil { + var newCtx context.Context + if newCtx, err = i(ctx); err != nil { + var err1 error + if ctx, err1 = runner.onStart(ctx, input); err1 != nil { + return nil, err1 + } return nil, err + } else { + ctx = newCtx } } if input, err = runner.preProcess(ctx, input); err != nil { + var err1 error + if ctx, err1 = runner.onStart(ctx, input); err1 != nil { + return nil, err1 + } return nil, err } @@ -390,6 +414,60 @@ func (nc *nodeRunConfig[O]) stream() func(ctx context.Context, input map[string] } } +func (nc *nodeRunConfig[O]) collect() func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output map[string]any, err error) { + if nc.c == nil { + return nil + } + + return func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output map[string]any, err error) { + ctx, runner := newNodeRunner(ctx, nc) + + defer func() { + if panicErr := recover(); panicErr != nil { + err = safego.NewPanicErr(panicErr, debug.Stack()) + } + + if err == nil { + err = runner.onEnd(ctx, output) + } + + if err != nil { + errOutput, hasErrOutput := runner.onError(ctx, err) + if hasErrOutput { + output = errOutput + err = nil + if output, err = runner.postProcess(ctx, output); err != nil { + logs.CtxErrorf(ctx, "postProcess failed after returning error output: %v", err) + } + } + } + }() + + for _, i := range runner.init { + var newCtx context.Context + if newCtx, err = i(ctx); err != nil { + var err1 error + if ctx, _, err1 = runner.onStartStream(ctx, input); err1 != nil { + return nil, err1 + } + return nil, err + } else { + ctx = newCtx + } + } + + for _, p := range runner.streamPreProcessors { + input = p(ctx, input) + } + + if ctx, input, err = runner.onStartStream(ctx, input); err != nil { + return nil, err + } + + return runner.collect(ctx, input, opts...) + } +} + func (nc *nodeRunConfig[O]) transform() func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output *schema.StreamReader[map[string]any], err error) { if nc.t == nil { return nil @@ -417,8 +495,15 @@ func (nc *nodeRunConfig[O]) transform() func(ctx context.Context, input *schema. }() for _, i := range runner.init { - if ctx, err = i(ctx); err != nil { + var newCtx context.Context + if newCtx, err = i(ctx); err != nil { + var err1 error + if ctx, _, err1 = runner.onStartStream(ctx, input); err1 != nil { + return nil, err1 + } return nil, err + } else { + ctx = newCtx } } @@ -439,7 +524,7 @@ func (nc *nodeRunConfig[O]) toNode() *Node { opts = append(opts, compose.WithLambdaType(string(nc.nodeType))) opts = append(opts, compose.WithLambdaCallbackEnable(true)) - l, err := compose.AnyLambda(nc.invoke(), nc.stream(), nil, nc.transform(), opts...) + l, err := compose.AnyLambda(nc.invoke(), nc.stream(), nc.collect(), nc.transform(), opts...) if err != nil { panic(fmt.Sprintf("failed to create lambda for node %s, err: %v", nc.nodeName, err)) } @@ -589,6 +674,49 @@ func (r *nodeRunner[O]) stream(ctx context.Context, input map[string]any, opts . } } +func (r *nodeRunner[O]) collect(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output map[string]any, err error) { + if r.maxRetry == 0 { + return r.c(ctx, input, opts...) + } + + copied := input.Copy(int(r.maxRetry)) + + var n int64 + defer func() { + for i := n + 1; i < r.maxRetry; i++ { + copied[i].Close() + } + }() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + output, err = r.c(ctx, copied[n], opts...) + if err != nil { + if _, ok := compose.IsInterruptRerunError(err); ok { // interrupt, won't retry + r.interrupted = true + return nil, err + } + + logs.CtxErrorf(ctx, "[invoke] node %s ID %s failed on %d attempt, err: %v", r.nodeName, r.nodeKey, n, err) + if r.maxRetry > n { + n++ + if exeCtx := execute.GetExeCtx(ctx); exeCtx != nil && exeCtx.NodeCtx != nil { + exeCtx.CurrentRetryCount++ + } + continue + } + return nil, err + } + + return output, nil + } +} + func (r *nodeRunner[O]) transform(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output *schema.StreamReader[map[string]any], err error) { if r.maxRetry == 0 { return r.t(ctx, input, opts...) diff --git a/backend/domain/workflow/internal/compose/workflow_run.go b/backend/domain/workflow/internal/compose/workflow_run.go index 8089234e..eaa62e49 100644 --- a/backend/domain/workflow/internal/compose/workflow_run.go +++ b/backend/domain/workflow/internal/compose/workflow_run.go @@ -268,8 +268,6 @@ func (r *WorkflowRunner) Prepare(ctx context.Context) ( } } - cancelCtx = execute.InitExecutedNodesCounter(cancelCtx) - lastEventChan := make(chan *execute.Event, 1) go func() { defer func() { diff --git a/backend/domain/workflow/internal/execute/consts.go b/backend/domain/workflow/internal/execute/consts.go index 16260b47..ef079965 100644 --- a/backend/domain/workflow/internal/execute/consts.go +++ b/backend/domain/workflow/internal/execute/consts.go @@ -18,17 +18,14 @@ package execute import ( "context" - "sync/atomic" "time" - - "github.com/coze-dev/coze-studio/backend/pkg/ctxcache" ) const ( - foregroundRunTimeout = 10 * time.Minute - backgroundRunTimeout = 24 * time.Hour - maxNodeCountPerWorkflow = 1000 - maxNodeCountPerExecution = 1000 + foregroundRunTimeout = 0 // timeout for workflow execution in foreground mode, 0 means no timeout + backgroundRunTimeout = 0 // timeout for workflow execution in background mode, 0 means no timeout + maxNodeCountPerWorkflow = 0 // maximum node count for a workflow, 0 means no limit + maxNodeCountPerExecution = 0 // maximum node count for a workflow execution, 0 means no limit cancelCheckInterval = 200 * time.Millisecond ) @@ -52,17 +49,17 @@ const ( executedNodeCountKey = "executed_node_count" ) -func IncrAndCheckExecutedNodes(ctx context.Context) (int64, bool) { - counter, ok := ctxcache.Get[atomic.Int64](ctx, executedNodeCountKey) - if !ok { +func IncrementAndCheckExecutedNodes(ctx context.Context) (int64, bool) { + exeCtx := GetExeCtx(ctx) + if exeCtx == nil { return 0, false } - current := counter.Add(1) + counter := exeCtx.executed + if counter == nil { + return 0, false + } + + current := (*counter).Add(1) return current, current > maxNodeCountPerExecution } - -func InitExecutedNodesCounter(ctx context.Context) context.Context { - ctxcache.Store(ctx, executedNodeCountKey, atomic.Int64{}) - return ctx -} diff --git a/backend/domain/workflow/internal/execute/context.go b/backend/domain/workflow/internal/execute/context.go index 93e2f2e3..b9fa3c49 100644 --- a/backend/domain/workflow/internal/execute/context.go +++ b/backend/domain/workflow/internal/execute/context.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/cloudwego/eino/compose" @@ -30,6 +31,7 @@ import ( "github.com/coze-dev/coze-studio/backend/domain/workflow" "github.com/coze-dev/coze-studio/backend/domain/workflow/entity" "github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo" + "github.com/coze-dev/coze-studio/backend/pkg/lang/ptr" ) type Context struct { @@ -48,6 +50,8 @@ type Context struct { CheckPointID string AppVarStore *AppVariables + + executed *atomic.Int64 } type RootCtx struct { @@ -118,6 +122,7 @@ func restoreWorkflowCtx(ctx context.Context, h *WorkflowHandler) (context.Contex } storedCtx.AppVarStore = currentC.AppVarStore + storedCtx.executed = currentC.executed } return context.WithValue(ctx, contextKey{}, storedCtx), nil @@ -158,13 +163,16 @@ func restoreNodeCtx(ctx context.Context, nodeKey vo.NodeKey, resumeEvent *entity currentC := GetExeCtx(ctx) - // restore the parent-child relationship between token collectors - if storedCtx.TokenCollector != nil && storedCtx.TokenCollector.Parent != nil { - currentTokenCollector := currentC.TokenCollector - storedCtx.TokenCollector.Parent = currentTokenCollector - } + if currentC != nil { + // restore the parent-child relationship between token collectors + if storedCtx.TokenCollector != nil && storedCtx.TokenCollector.Parent != nil { + currentTokenCollector := currentC.TokenCollector + storedCtx.TokenCollector.Parent = currentTokenCollector + } - storedCtx.AppVarStore = currentC.AppVarStore + storedCtx.AppVarStore = currentC.AppVarStore + storedCtx.executed = currentC.executed + } storedCtx.NodeCtx.CurrentRetryCount = 0 @@ -200,6 +208,9 @@ func tryRestoreNodeCtx(ctx context.Context, nodeKey vo.NodeKey) (context.Context if storedCtx.TokenCollector != nil && storedCtx.TokenCollector.Parent != nil && existingC != nil { currentTokenCollector := existingC.TokenCollector storedCtx.TokenCollector.Parent = currentTokenCollector + + storedCtx.AppVarStore = existingC.AppVarStore + storedCtx.executed = existingC.executed } storedCtx.NodeCtx.CurrentRetryCount = 0 @@ -224,6 +235,7 @@ func PrepareRootExeCtx(ctx context.Context, h *WorkflowHandler) (context.Context TokenCollector: newTokenCollector(fmt.Sprintf("wf_%d", h.rootWorkflowBasic.ID), parentTokenCollector), StartTime: time.Now().UnixMilli(), AppVarStore: NewAppVariables(), + executed: ptr.Of(atomic.Int64{}), } if h.requireCheckpoint { @@ -278,6 +290,7 @@ func PrepareSubExeCtx(ctx context.Context, wb *entity.WorkflowBasic, requireChec CheckPointID: newCheckpointID, StartTime: time.Now().UnixMilli(), AppVarStore: c.AppVarStore, + executed: c.executed, } if requireCheckpoint { @@ -321,6 +334,7 @@ func PrepareNodeExeCtx(ctx context.Context, nodeKey vo.NodeKey, nodeName string, StartTime: time.Now().UnixMilli(), CheckPointID: c.CheckPointID, AppVarStore: c.AppVarStore, + executed: c.executed, } if c.NodeCtx == nil { // node within top level workflow, also not under composite node @@ -368,6 +382,7 @@ func InheritExeCtxWithBatchInfo(ctx context.Context, index int, items map[string }, CheckPointID: newCheckpointID, AppVarStore: c.AppVarStore, + executed: c.executed, }), newCheckpointID }