diff --git a/backend/domain/workflow/entity/interrupt_event.go b/backend/domain/workflow/entity/interrupt_event.go index 39734fb8..e2c4e606 100644 --- a/backend/domain/workflow/entity/interrupt_event.go +++ b/backend/domain/workflow/entity/interrupt_event.go @@ -35,6 +35,7 @@ type InterruptEvent struct { NodeIcon string `json:"node_icon,omitempty"` EventType InterruptEventType `json:"event_type"` NodePath []string `json:"node_path,omitempty"` + Popped bool `json:"popped,omitempty"` // index within composite node -> interrupt info for that index // TODO: separate the following fields with InterruptEvent @@ -60,6 +61,7 @@ type ResumeRequest struct { ExecuteID int64 EventID int64 ResumeData string + Resumed bool } func (r *ResumeRequest) GetResumeID() string { diff --git a/backend/domain/workflow/internal/compose/workflow_tool.go b/backend/domain/workflow/internal/compose/workflow_tool.go index 70bbdd45..ee8b65d7 100644 --- a/backend/domain/workflow/internal/compose/workflow_tool.go +++ b/backend/domain/workflow/internal/compose/workflow_tool.go @@ -67,6 +67,16 @@ func (i *invokableWorkflow) Info(_ context.Context) (*schema.ToolInfo, error) { return i.info, nil } +func resumeOnce(rInfo *entity.ResumeRequest, callID string, allIEs map[string]*entity.ToolInterruptEvent) { + if rInfo != nil { + rInfo.Resumed = true + } + + if allIEs != nil { + delete(allIEs, callID) + } +} + func (i *invokableWorkflow) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) { rInfo, allIEs := execute.GetResumeRequest(opts...) var ( @@ -88,9 +98,10 @@ func (i *invokableWorkflow) InvokableRun(ctx context.Context, argumentsInJSON st } cfg := execute.GetExecuteConfig(opts...) + defer resumeOnce(rInfo, callID, allIEs) var runOpts []WorkflowRunnerOption - if rInfo != nil { + if rInfo != nil && !rInfo.Resumed { runOpts = append(runOpts, WithResumeReq(rInfo)) } else { runOpts = append(runOpts, WithInput(argumentsInJSON)) @@ -237,9 +248,10 @@ func (s *streamableWorkflow) StreamableRun(ctx context.Context, argumentsInJSON } cfg := execute.GetExecuteConfig(opts...) + defer resumeOnce(rInfo, callID, allIEs) var runOpts []WorkflowRunnerOption - if rInfo != nil { + if rInfo != nil && !rInfo.Resumed { runOpts = append(runOpts, WithResumeReq(rInfo)) } else { runOpts = append(runOpts, WithInput(argumentsInJSON)) diff --git a/backend/domain/workflow/internal/execute/event_handle.go b/backend/domain/workflow/internal/execute/event_handle.go index a42c67b3..88d125dd 100644 --- a/backend/domain/workflow/internal/execute/event_handle.go +++ b/backend/domain/workflow/internal/execute/event_handle.go @@ -283,7 +283,7 @@ func handleEvent(ctx context.Context, event *Event, repo workflow.Repository, return noTerminate, fmt.Errorf("failed to update workflow execution to interrupted for execution id %d, current status is %v", exeID, currentStatus) } - if event.RootCtx.ResumeEvent != nil { + if event.RootCtx.ResumeEvent != nil && !event.RootCtx.ResumeEvent.Popped { needPop := false for _, ie := range event.InterruptEvents { if ie.NodeKey == event.RootCtx.ResumeEvent.NodeKey { diff --git a/backend/domain/workflow/internal/nodes/llm/llm.go b/backend/domain/workflow/internal/nodes/llm/llm.go index ea7feb36..bf777335 100644 --- a/backend/domain/workflow/internal/nodes/llm/llm.go +++ b/backend/domain/workflow/internal/nodes/llm/llm.go @@ -971,6 +971,8 @@ func (l *LLM) prepare(ctx context.Context, _ map[string]any, opts ...nodes.NodeO return ctx } + c.RootCtx.ResumeEvent.Popped = true + return ctx }, }).Handler()