From 3fe4031531a2326769c1755563cb26fb88df8429 Mon Sep 17 00:00:00 2001 From: shentongmartin Date: Mon, 4 Aug 2025 11:58:34 +0800 Subject: [PATCH] fix: workflow onEndWithStream handles stream asynchronously (#518) --- .../workflow/internal/execute/callback.go | 75 ++++++++++--------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/backend/domain/workflow/internal/execute/callback.go b/backend/domain/workflow/internal/execute/callback.go index acc904d2..567f6873 100644 --- a/backend/domain/workflow/internal/execute/callback.go +++ b/backend/domain/workflow/internal/execute/callback.go @@ -496,48 +496,49 @@ func (w *WorkflowHandler) OnEndWithStreamOutput(ctx context.Context, info *callb return ctx } - // consumes the stream synchronously because the Exit node has already processed this stream synchronously. - defer output.Close() - fullOutput := make(map[string]any) - for { - chunk, e := output.Recv() - if e != nil { - if e == io.EOF { - break + safego.Go(ctx, func() { + defer output.Close() + fullOutput := make(map[string]any) + for { + chunk, e := output.Recv() + if e != nil { + if e == io.EOF { + break + } + + if _, ok := schema.GetSourceName(e); ok { + continue + } + + logs.Errorf("workflow OnEndWithStreamOutput failed to receive stream output: %v", e) + _ = w.OnError(ctx, info, e) + return } - - if _, ok := schema.GetSourceName(e); ok { - continue + fullOutput, e = nodes.ConcatTwoMaps(fullOutput, chunk.(map[string]any)) + if e != nil { + logs.Errorf("failed to concat two maps: %v", e) + return } - - logs.Errorf("workflow OnEndWithStreamOutput failed to receive stream output: %v", e) - _ = w.OnError(ctx, info, e) - return ctx } - fullOutput, e = nodes.ConcatTwoMaps(fullOutput, chunk.(map[string]any)) - if e != nil { - logs.Errorf("failed to concat two maps: %v", e) - return ctx - } - } - c := GetExeCtx(ctx) - e := &Event{ - Type: WorkflowSuccess, - Context: c, - Duration: time.Since(time.UnixMilli(c.StartTime)), - Output: fullOutput, - } - - if c.TokenCollector != nil { - usage := c.TokenCollector.wait() - e.Token = &TokenInfo{ - InputToken: int64(usage.PromptTokens), - OutputToken: int64(usage.CompletionTokens), - TotalToken: int64(usage.TotalTokens), + c := GetExeCtx(ctx) + e := &Event{ + Type: WorkflowSuccess, + Context: c, + Duration: time.Since(time.UnixMilli(c.StartTime)), + Output: fullOutput, } - } - w.ch <- e + + if c.TokenCollector != nil { + usage := c.TokenCollector.wait() + e.Token = &TokenInfo{ + InputToken: int64(usage.PromptTokens), + OutputToken: int64(usage.CompletionTokens), + TotalToken: int64(usage.TotalTokens), + } + } + w.ch <- e + }) return ctx }