fix: workflow onEndWithStream handles stream asynchronously (#518)
This commit is contained in:
parent
08d1f6bf67
commit
3fe4031531
|
|
@ -496,48 +496,49 @@ func (w *WorkflowHandler) OnEndWithStreamOutput(ctx context.Context, info *callb
|
|||
return ctx
|
||||
}
|
||||
|
||||
// consumes the stream synchronously because the Exit node has already processed this stream synchronously.
|
||||
defer output.Close()
|
||||
fullOutput := make(map[string]any)
|
||||
for {
|
||||
chunk, e := output.Recv()
|
||||
if e != nil {
|
||||
if e == io.EOF {
|
||||
break
|
||||
safego.Go(ctx, func() {
|
||||
defer output.Close()
|
||||
fullOutput := make(map[string]any)
|
||||
for {
|
||||
chunk, e := output.Recv()
|
||||
if e != nil {
|
||||
if e == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if _, ok := schema.GetSourceName(e); ok {
|
||||
continue
|
||||
}
|
||||
|
||||
logs.Errorf("workflow OnEndWithStreamOutput failed to receive stream output: %v", e)
|
||||
_ = w.OnError(ctx, info, e)
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok := schema.GetSourceName(e); ok {
|
||||
continue
|
||||
fullOutput, e = nodes.ConcatTwoMaps(fullOutput, chunk.(map[string]any))
|
||||
if e != nil {
|
||||
logs.Errorf("failed to concat two maps: %v", e)
|
||||
return
|
||||
}
|
||||
|
||||
logs.Errorf("workflow OnEndWithStreamOutput failed to receive stream output: %v", e)
|
||||
_ = w.OnError(ctx, info, e)
|
||||
return ctx
|
||||
}
|
||||
fullOutput, e = nodes.ConcatTwoMaps(fullOutput, chunk.(map[string]any))
|
||||
if e != nil {
|
||||
logs.Errorf("failed to concat two maps: %v", e)
|
||||
return ctx
|
||||
}
|
||||
}
|
||||
|
||||
c := GetExeCtx(ctx)
|
||||
e := &Event{
|
||||
Type: WorkflowSuccess,
|
||||
Context: c,
|
||||
Duration: time.Since(time.UnixMilli(c.StartTime)),
|
||||
Output: fullOutput,
|
||||
}
|
||||
|
||||
if c.TokenCollector != nil {
|
||||
usage := c.TokenCollector.wait()
|
||||
e.Token = &TokenInfo{
|
||||
InputToken: int64(usage.PromptTokens),
|
||||
OutputToken: int64(usage.CompletionTokens),
|
||||
TotalToken: int64(usage.TotalTokens),
|
||||
c := GetExeCtx(ctx)
|
||||
e := &Event{
|
||||
Type: WorkflowSuccess,
|
||||
Context: c,
|
||||
Duration: time.Since(time.UnixMilli(c.StartTime)),
|
||||
Output: fullOutput,
|
||||
}
|
||||
}
|
||||
w.ch <- e
|
||||
|
||||
if c.TokenCollector != nil {
|
||||
usage := c.TokenCollector.wait()
|
||||
e.Token = &TokenInfo{
|
||||
InputToken: int64(usage.PromptTokens),
|
||||
OutputToken: int64(usage.CompletionTokens),
|
||||
TotalToken: int64(usage.TotalTokens),
|
||||
}
|
||||
}
|
||||
w.ch <- e
|
||||
})
|
||||
|
||||
return ctx
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue