792 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			792 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  * Copyright 2025 coze-dev Authors
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| 
 | |
| package compose
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"runtime/debug"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/cloudwego/eino/callbacks"
 | |
| 	"github.com/cloudwego/eino/compose"
 | |
| 	"github.com/cloudwego/eino/schema"
 | |
| 	"golang.org/x/exp/maps"
 | |
| 
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/execute"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/errorx"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/logs"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/safego"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/sonic"
 | |
| 	"github.com/coze-dev/coze-studio/backend/types/errno"
 | |
| )
 | |
| 
 | |
| type nodeRunConfig[O any] struct {
 | |
| 	nodeKey             vo.NodeKey
 | |
| 	nodeName            string
 | |
| 	nodeType            entity.NodeType
 | |
| 	timeoutMS           int64
 | |
| 	maxRetry            int64
 | |
| 	errProcessType      vo.ErrorProcessType
 | |
| 	dataOnErr           func(ctx context.Context) map[string]any
 | |
| 	callbackEnabled     bool
 | |
| 	preProcessors       []func(ctx context.Context, input map[string]any) (map[string]any, error)
 | |
| 	postProcessors      []func(ctx context.Context, input map[string]any) (map[string]any, error)
 | |
| 	streamPreProcessors []func(ctx context.Context,
 | |
| 		input *schema.StreamReader[map[string]any]) *schema.StreamReader[map[string]any]
 | |
| 	callbackInputConverter  func(context.Context, map[string]any) (map[string]any, error)
 | |
| 	callbackOutputConverter func(context.Context, map[string]any) (*nodes.StructuredCallbackOutput, error)
 | |
| 	init                    []func(context.Context) (context.Context, error)
 | |
| 	i                       compose.Invoke[map[string]any, map[string]any, O]
 | |
| 	s                       compose.Stream[map[string]any, map[string]any, O]
 | |
| 	t                       compose.Transform[map[string]any, map[string]any, O]
 | |
| }
 | |
| 
 | |
| func newNodeRunConfig[O any](ns *NodeSchema,
 | |
| 	i compose.Invoke[map[string]any, map[string]any, O],
 | |
| 	s compose.Stream[map[string]any, map[string]any, O],
 | |
| 	t compose.Transform[map[string]any, map[string]any, O],
 | |
| 	opts *newNodeOptions) *nodeRunConfig[O] {
 | |
| 	meta := entity.NodeMetaByNodeType(ns.Type)
 | |
| 
 | |
| 	var (
 | |
| 		timeoutMS      = meta.DefaultTimeoutMS
 | |
| 		maxRetry       int64
 | |
| 		errProcessType = vo.ErrorProcessTypeThrow
 | |
| 		dataOnErr      func(ctx context.Context) map[string]any
 | |
| 	)
 | |
| 	if ns.ExceptionConfigs != nil {
 | |
| 		timeoutMS = ns.ExceptionConfigs.TimeoutMS
 | |
| 		maxRetry = ns.ExceptionConfigs.MaxRetry
 | |
| 		if ns.ExceptionConfigs.ProcessType != nil {
 | |
| 			errProcessType = *ns.ExceptionConfigs.ProcessType
 | |
| 		}
 | |
| 		if len(ns.ExceptionConfigs.DataOnErr) > 0 {
 | |
| 			dataOnErr = func(ctx context.Context) map[string]any {
 | |
| 				return parseDefaultOutputOrFallback(ctx, ns.ExceptionConfigs.DataOnErr, ns.OutputTypes)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	preProcessors := []func(ctx context.Context, input map[string]any) (map[string]any, error){
 | |
| 		preTypeConverter(ns.InputTypes),
 | |
| 		keyFinishedMarkerTrimmer(),
 | |
| 	}
 | |
| 	if meta.PreFillZero {
 | |
| 		preProcessors = append(preProcessors, ns.inputValueFiller())
 | |
| 	}
 | |
| 
 | |
| 	var postProcessors []func(ctx context.Context, input map[string]any) (map[string]any, error)
 | |
| 	if meta.PostFillNil {
 | |
| 		postProcessors = append(postProcessors, ns.outputValueFiller())
 | |
| 	}
 | |
| 
 | |
| 	streamPreProcessors := []func(ctx context.Context,
 | |
| 		input *schema.StreamReader[map[string]any]) *schema.StreamReader[map[string]any]{
 | |
| 		func(ctx context.Context, input *schema.StreamReader[map[string]any]) *schema.StreamReader[map[string]any] {
 | |
| 			f := func(in map[string]any) (map[string]any, error) {
 | |
| 				return preTypeConverter(ns.InputTypes)(ctx, in)
 | |
| 			}
 | |
| 			return schema.StreamReaderWithConvert(input, f)
 | |
| 		},
 | |
| 	}
 | |
| 	if meta.PreFillZero {
 | |
| 		streamPreProcessors = append(streamPreProcessors, ns.streamInputValueFiller())
 | |
| 	}
 | |
| 
 | |
| 	opts.init = append(opts.init, func(ctx context.Context) (context.Context, error) {
 | |
| 		current, exceeded := execute.IncrAndCheckExecutedNodes(ctx)
 | |
| 		if exceeded {
 | |
| 			return nil, fmt.Errorf("exceeded max executed node count: %d, current: %d", execute.GetStaticConfig().MaxNodeCountPerExecution, current)
 | |
| 		}
 | |
| 		return ctx, nil
 | |
| 	})
 | |
| 
 | |
| 	return &nodeRunConfig[O]{
 | |
| 		nodeKey:                 ns.Key,
 | |
| 		nodeName:                ns.Name,
 | |
| 		nodeType:                ns.Type,
 | |
| 		timeoutMS:               timeoutMS,
 | |
| 		maxRetry:                maxRetry,
 | |
| 		errProcessType:          errProcessType,
 | |
| 		dataOnErr:               dataOnErr,
 | |
| 		callbackEnabled:         meta.CallbackEnabled,
 | |
| 		preProcessors:           preProcessors,
 | |
| 		postProcessors:          postProcessors,
 | |
| 		streamPreProcessors:     streamPreProcessors,
 | |
| 		callbackInputConverter:  opts.callbackInputConverter,
 | |
| 		callbackOutputConverter: opts.callbackOutputConverter,
 | |
| 		init:                    opts.init,
 | |
| 		i:                       i,
 | |
| 		s:                       s,
 | |
| 		t:                       t,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func newNodeRunConfigWOOpt(ns *NodeSchema,
 | |
| 	i compose.InvokeWOOpt[map[string]any, map[string]any],
 | |
| 	s compose.StreamWOOpt[map[string]any, map[string]any],
 | |
| 	t compose.TransformWOOpts[map[string]any, map[string]any],
 | |
| 	opts *newNodeOptions) *nodeRunConfig[any] {
 | |
| 	var (
 | |
| 		iWO compose.Invoke[map[string]any, map[string]any, any]
 | |
| 		sWO compose.Stream[map[string]any, map[string]any, any]
 | |
| 		tWO compose.Transform[map[string]any, map[string]any, any]
 | |
| 	)
 | |
| 
 | |
| 	if i != nil {
 | |
| 		iWO = func(ctx context.Context, in map[string]any, _ ...any) (out map[string]any, err error) {
 | |
| 			return i(ctx, in)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if s != nil {
 | |
| 		sWO = func(ctx context.Context, in map[string]any, _ ...any) (out *schema.StreamReader[map[string]any], err error) {
 | |
| 			return s(ctx, in)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if t != nil {
 | |
| 		tWO = func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...any) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 			return t(ctx, input)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return newNodeRunConfig[any](ns, iWO, sWO, tWO, opts)
 | |
| }
 | |
| 
 | |
| type newNodeOptions struct {
 | |
| 	callbackInputConverter  func(context.Context, map[string]any) (map[string]any, error)
 | |
| 	callbackOutputConverter func(context.Context, map[string]any) (*nodes.StructuredCallbackOutput, error)
 | |
| 	init                    []func(context.Context) (context.Context, error)
 | |
| }
 | |
| 
 | |
| type newNodeOption func(*newNodeOptions)
 | |
| 
 | |
| func withCallbackInputConverter(f func(context.Context, map[string]any) (map[string]any, error)) newNodeOption {
 | |
| 	return func(opts *newNodeOptions) {
 | |
| 		opts.callbackInputConverter = f
 | |
| 	}
 | |
| }
 | |
| func withCallbackOutputConverter(f func(context.Context, map[string]any) (*nodes.StructuredCallbackOutput, error)) newNodeOption {
 | |
| 	return func(opts *newNodeOptions) {
 | |
| 		opts.callbackOutputConverter = f
 | |
| 	}
 | |
| }
 | |
| func withInit(f func(context.Context) (context.Context, error)) newNodeOption {
 | |
| 	return func(opts *newNodeOptions) {
 | |
| 		opts.init = append(opts.init, f)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func invokableNode(ns *NodeSchema, i compose.InvokeWOOpt[map[string]any, map[string]any], opts ...newNodeOption) *Node {
 | |
| 	options := &newNodeOptions{}
 | |
| 	for _, opt := range opts {
 | |
| 		opt(options)
 | |
| 	}
 | |
| 
 | |
| 	return newNodeRunConfigWOOpt(ns, i, nil, nil, options).toNode()
 | |
| }
 | |
| 
 | |
| func invokableNodeWO[O any](ns *NodeSchema, i compose.Invoke[map[string]any, map[string]any, O], opts ...newNodeOption) *Node {
 | |
| 	options := &newNodeOptions{}
 | |
| 	for _, opt := range opts {
 | |
| 		opt(options)
 | |
| 	}
 | |
| 
 | |
| 	return newNodeRunConfig(ns, i, nil, nil, options).toNode()
 | |
| }
 | |
| 
 | |
| func invokableTransformableNode(ns *NodeSchema, i compose.InvokeWOOpt[map[string]any, map[string]any],
 | |
| 	t compose.TransformWOOpts[map[string]any, map[string]any], opts ...newNodeOption) *Node {
 | |
| 	options := &newNodeOptions{}
 | |
| 	for _, opt := range opts {
 | |
| 		opt(options)
 | |
| 	}
 | |
| 	return newNodeRunConfigWOOpt(ns, i, nil, t, options).toNode()
 | |
| }
 | |
| 
 | |
| func invokableStreamableNodeWO[O any](ns *NodeSchema, i compose.Invoke[map[string]any, map[string]any, O], s compose.Stream[map[string]any, map[string]any, O], opts ...newNodeOption) *Node {
 | |
| 	options := &newNodeOptions{}
 | |
| 	for _, opt := range opts {
 | |
| 		opt(options)
 | |
| 	}
 | |
| 	return newNodeRunConfig(ns, i, s, nil, options).toNode()
 | |
| }
 | |
| 
 | |
| func (nc *nodeRunConfig[O]) invoke() func(ctx context.Context, input map[string]any, opts ...O) (output map[string]any, err error) {
 | |
| 	if nc.i == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return func(ctx context.Context, input map[string]any, opts ...O) (output map[string]any, err error) {
 | |
| 		ctx, runner := newNodeRunner(ctx, nc)
 | |
| 
 | |
| 		defer func() {
 | |
| 			if panicErr := recover(); panicErr != nil {
 | |
| 				err = safego.NewPanicErr(panicErr, debug.Stack())
 | |
| 			}
 | |
| 
 | |
| 			if err == nil {
 | |
| 				err = runner.onEnd(ctx, output)
 | |
| 			}
 | |
| 
 | |
| 			if err != nil {
 | |
| 				errOutput, hasErrOutput := runner.onError(ctx, err)
 | |
| 				if hasErrOutput {
 | |
| 					output = errOutput
 | |
| 					err = nil
 | |
| 					if output, err = runner.postProcess(ctx, output); err != nil {
 | |
| 						logs.CtxErrorf(ctx, "postProcess failed after returning error output: %v", err)
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		for _, i := range runner.init {
 | |
| 			if ctx, err = i(ctx); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if input, err = runner.preProcess(ctx, input); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if ctx, err = runner.onStart(ctx, input); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if output, err = runner.invoke(ctx, input, opts...); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return runner.postProcess(ctx, output)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (nc *nodeRunConfig[O]) stream() func(ctx context.Context, input map[string]any, opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 	if nc.s == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return func(ctx context.Context, input map[string]any, opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 		ctx, runner := newNodeRunner(ctx, nc)
 | |
| 
 | |
| 		defer func() {
 | |
| 			if panicErr := recover(); panicErr != nil {
 | |
| 				err = safego.NewPanicErr(panicErr, debug.Stack())
 | |
| 			}
 | |
| 
 | |
| 			if err == nil {
 | |
| 				output, err = runner.onEndStream(ctx, output)
 | |
| 			}
 | |
| 
 | |
| 			if err != nil {
 | |
| 				errOutput, hasErrOutput := runner.onError(ctx, err)
 | |
| 				if hasErrOutput {
 | |
| 					output = schema.StreamReaderFromArray([]map[string]any{errOutput})
 | |
| 					err = nil
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		for _, i := range runner.init {
 | |
| 			if ctx, err = i(ctx); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if input, err = runner.preProcess(ctx, input); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if ctx, err = runner.onStart(ctx, input); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return runner.stream(ctx, input, opts...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (nc *nodeRunConfig[O]) transform() func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 	if nc.t == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 		ctx, runner := newNodeRunner(ctx, nc)
 | |
| 
 | |
| 		defer func() {
 | |
| 			if panicErr := recover(); panicErr != nil {
 | |
| 				err = safego.NewPanicErr(panicErr, debug.Stack())
 | |
| 			}
 | |
| 
 | |
| 			if err == nil {
 | |
| 				output, err = runner.onEndStream(ctx, output)
 | |
| 			}
 | |
| 
 | |
| 			if err != nil {
 | |
| 				errOutput, hasErrOutput := runner.onError(ctx, err)
 | |
| 				if hasErrOutput {
 | |
| 					output = schema.StreamReaderFromArray([]map[string]any{errOutput})
 | |
| 					err = nil
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		for _, i := range runner.init {
 | |
| 			if ctx, err = i(ctx); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		for _, p := range runner.streamPreProcessors {
 | |
| 			input = p(ctx, input)
 | |
| 		}
 | |
| 
 | |
| 		if ctx, input, err = runner.onStartStream(ctx, input); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return runner.transform(ctx, input, opts...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (nc *nodeRunConfig[O]) toNode() *Node {
 | |
| 	var opts []compose.LambdaOpt
 | |
| 	opts = append(opts, compose.WithLambdaType(string(nc.nodeType)))
 | |
| 
 | |
| 	if nc.callbackEnabled {
 | |
| 		opts = append(opts, compose.WithLambdaCallbackEnable(true))
 | |
| 	}
 | |
| 	l, err := compose.AnyLambda(nc.invoke(), nc.stream(), nil, nc.transform(), opts...)
 | |
| 	if err != nil {
 | |
| 		panic(fmt.Sprintf("failed to create lambda for node %s, err: %v", nc.nodeName, err))
 | |
| 	}
 | |
| 
 | |
| 	return &Node{Lambda: l}
 | |
| }
 | |
| 
 | |
| type nodeRunner[O any] struct {
 | |
| 	*nodeRunConfig[O]
 | |
| 	interrupted bool
 | |
| 	cancelFn    context.CancelFunc
 | |
| }
 | |
| 
 | |
| func newNodeRunner[O any](ctx context.Context, cfg *nodeRunConfig[O]) (context.Context, *nodeRunner[O]) {
 | |
| 	runner := &nodeRunner[O]{
 | |
| 		nodeRunConfig: cfg,
 | |
| 	}
 | |
| 
 | |
| 	if cfg.timeoutMS > 0 {
 | |
| 		ctx, runner.cancelFn = context.WithTimeout(ctx, time.Duration(cfg.timeoutMS)*time.Millisecond)
 | |
| 	}
 | |
| 
 | |
| 	return ctx, runner
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onStart(ctx context.Context, input map[string]any) (context.Context, error) {
 | |
| 	if !r.callbackEnabled {
 | |
| 		return ctx, nil
 | |
| 	}
 | |
| 	if r.callbackInputConverter != nil {
 | |
| 		convertedInput, err := r.callbackInputConverter(ctx, input)
 | |
| 		if err != nil {
 | |
| 			ctx = callbacks.OnStart(ctx, input)
 | |
| 			return ctx, err
 | |
| 		}
 | |
| 		ctx = callbacks.OnStart(ctx, convertedInput)
 | |
| 	} else {
 | |
| 		ctx = callbacks.OnStart(ctx, input)
 | |
| 	}
 | |
| 
 | |
| 	return ctx, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onStartStream(ctx context.Context, input *schema.StreamReader[map[string]any]) (
 | |
| 	context.Context, *schema.StreamReader[map[string]any], error) {
 | |
| 	if !r.callbackEnabled {
 | |
| 		return ctx, input, nil
 | |
| 	}
 | |
| 
 | |
| 	if r.callbackInputConverter != nil {
 | |
| 		copied := input.Copy(2)
 | |
| 		realConverter := func(ctx context.Context) func(map[string]any) (map[string]any, error) {
 | |
| 			return func(in map[string]any) (map[string]any, error) {
 | |
| 				return r.callbackInputConverter(ctx, in)
 | |
| 			}
 | |
| 		}
 | |
| 		callbackS := schema.StreamReaderWithConvert(copied[0], realConverter(ctx))
 | |
| 		newCtx, unused := callbacks.OnStartWithStreamInput(ctx, callbackS)
 | |
| 		unused.Close()
 | |
| 		return newCtx, copied[1], nil
 | |
| 	}
 | |
| 
 | |
| 	newCtx, newInput := callbacks.OnStartWithStreamInput(ctx, input)
 | |
| 	return newCtx, newInput, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) preProcess(ctx context.Context, input map[string]any) (_ map[string]any, err error) {
 | |
| 	for _, preProcessor := range r.preProcessors {
 | |
| 		if preProcessor == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		input, err = preProcessor(ctx, input)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	return input, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) postProcess(ctx context.Context, output map[string]any) (_ map[string]any, err error) {
 | |
| 	for _, postProcessor := range r.postProcessors {
 | |
| 		if postProcessor == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		output, err = postProcessor(ctx, output)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	return output, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) invoke(ctx context.Context, input map[string]any, opts ...O) (output map[string]any, err error) {
 | |
| 	var n int64
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return nil, ctx.Err()
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		output, err = r.i(ctx, input, opts...)
 | |
| 		if err != nil {
 | |
| 			if _, ok := compose.IsInterruptRerunError(err); ok { // interrupt, won't retry
 | |
| 				r.interrupted = true
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			logs.CtxErrorf(ctx, "[invoke] node %s ID %s failed on %d attempt, err: %v", r.nodeName, r.nodeKey, n, err)
 | |
| 			if r.maxRetry > n {
 | |
| 				n++
 | |
| 				if exeCtx := execute.GetExeCtx(ctx); exeCtx != nil && exeCtx.NodeCtx != nil {
 | |
| 					exeCtx.CurrentRetryCount++
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return output, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) stream(ctx context.Context, input map[string]any, opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 	var n int64
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return nil, ctx.Err()
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		output, err = r.s(ctx, input, opts...)
 | |
| 		if err != nil {
 | |
| 			if _, ok := compose.IsInterruptRerunError(err); ok { // interrupt, won't retry
 | |
| 				r.interrupted = true
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			logs.CtxErrorf(ctx, "[invoke] node %s ID %s failed on %d attempt, err: %v", r.nodeName, r.nodeKey, n, err)
 | |
| 			if r.maxRetry > n {
 | |
| 				n++
 | |
| 				if exeCtx := execute.GetExeCtx(ctx); exeCtx != nil && exeCtx.NodeCtx != nil {
 | |
| 					exeCtx.CurrentRetryCount++
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return output, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) transform(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 	if r.maxRetry == 0 {
 | |
| 		return r.t(ctx, input, opts...)
 | |
| 	}
 | |
| 
 | |
| 	copied := input.Copy(int(r.maxRetry))
 | |
| 
 | |
| 	var n int64
 | |
| 	defer func() {
 | |
| 		for i := n + 1; i < r.maxRetry; i++ {
 | |
| 			copied[i].Close()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return nil, ctx.Err()
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		output, err = r.t(ctx, copied[n], opts...)
 | |
| 		if err != nil {
 | |
| 			if _, ok := compose.IsInterruptRerunError(err); ok { // interrupt, won't retry
 | |
| 				r.interrupted = true
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			logs.CtxErrorf(ctx, "[invoke] node %s ID %s failed on %d attempt, err: %v", r.nodeName, r.nodeKey, n, err)
 | |
| 			if r.maxRetry > n {
 | |
| 				n++
 | |
| 				if exeCtx := execute.GetExeCtx(ctx); exeCtx != nil && exeCtx.NodeCtx != nil {
 | |
| 					exeCtx.CurrentRetryCount++
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return output, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onEnd(ctx context.Context, output map[string]any) error {
 | |
| 	if r.errProcessType == vo.ErrorProcessTypeExceptionBranch || r.errProcessType == vo.ErrorProcessTypeDefault {
 | |
| 		output["isSuccess"] = true
 | |
| 	}
 | |
| 
 | |
| 	if !r.callbackEnabled {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if r.callbackOutputConverter != nil {
 | |
| 		convertedOutput, err := r.callbackOutputConverter(ctx, output)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		_ = callbacks.OnEnd(ctx, convertedOutput)
 | |
| 	} else {
 | |
| 		_ = callbacks.OnEnd(ctx, output)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onEndStream(ctx context.Context, output *schema.StreamReader[map[string]any]) (
 | |
| 	*schema.StreamReader[map[string]any], error) {
 | |
| 	if r.errProcessType == vo.ErrorProcessTypeExceptionBranch || r.errProcessType == vo.ErrorProcessTypeDefault {
 | |
| 		flag := schema.StreamReaderFromArray([]map[string]any{{"isSuccess": true}})
 | |
| 		output = schema.MergeStreamReaders([]*schema.StreamReader[map[string]any]{flag, output})
 | |
| 	}
 | |
| 
 | |
| 	if !r.callbackEnabled {
 | |
| 		return output, nil
 | |
| 	}
 | |
| 
 | |
| 	if r.callbackOutputConverter != nil {
 | |
| 		copied := output.Copy(2)
 | |
| 		realConverter := func(ctx context.Context) func(map[string]any) (*nodes.StructuredCallbackOutput, error) {
 | |
| 			return func(in map[string]any) (*nodes.StructuredCallbackOutput, error) {
 | |
| 				return r.callbackOutputConverter(ctx, in)
 | |
| 			}
 | |
| 		}
 | |
| 		callbackS := schema.StreamReaderWithConvert(copied[0], realConverter(ctx))
 | |
| 		_, unused := callbacks.OnEndWithStreamOutput(ctx, callbackS)
 | |
| 		unused.Close()
 | |
| 
 | |
| 		return copied[1], nil
 | |
| 	}
 | |
| 
 | |
| 	_, newOutput := callbacks.OnEndWithStreamOutput(ctx, output)
 | |
| 	return newOutput, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onError(ctx context.Context, err error) (map[string]any, bool) {
 | |
| 	if r.interrupted {
 | |
| 		if r.callbackEnabled {
 | |
| 			_ = callbacks.OnError(ctx, err)
 | |
| 		}
 | |
| 		return nil, false
 | |
| 	}
 | |
| 
 | |
| 	var sErr vo.WorkflowError
 | |
| 	if !errors.As(err, &sErr) {
 | |
| 		if errors.Is(err, context.DeadlineExceeded) {
 | |
| 			sErr = vo.NodeTimeoutErr
 | |
| 		} else if errors.Is(err, context.Canceled) {
 | |
| 			sErr = vo.CancelErr
 | |
| 		} else {
 | |
| 			sErr = vo.WrapError(errno.ErrWorkflowExecuteFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	code := int(sErr.Code())
 | |
| 	msg := sErr.Msg()
 | |
| 
 | |
| 	switch r.errProcessType {
 | |
| 	case vo.ErrorProcessTypeDefault:
 | |
| 		d := r.dataOnErr(ctx)
 | |
| 		d["errorBody"] = map[string]any{
 | |
| 			"errorMessage": msg,
 | |
| 			"errorCode":    code,
 | |
| 		}
 | |
| 		d["isSuccess"] = false
 | |
| 		if r.callbackEnabled {
 | |
| 			sErr = sErr.ChangeErrLevel(vo.LevelWarn)
 | |
| 			sOutput := &nodes.StructuredCallbackOutput{
 | |
| 				Output:    d,
 | |
| 				RawOutput: d,
 | |
| 				Error:     sErr,
 | |
| 			}
 | |
| 			_ = callbacks.OnEnd(ctx, sOutput)
 | |
| 		}
 | |
| 		return d, true
 | |
| 	case vo.ErrorProcessTypeExceptionBranch:
 | |
| 		s := make(map[string]any)
 | |
| 		s["errorBody"] = map[string]any{
 | |
| 			"errorMessage": msg,
 | |
| 			"errorCode":    code,
 | |
| 		}
 | |
| 		s["isSuccess"] = false
 | |
| 		if r.callbackEnabled {
 | |
| 			sErr = sErr.ChangeErrLevel(vo.LevelWarn)
 | |
| 			sOutput := &nodes.StructuredCallbackOutput{
 | |
| 				Output:    s,
 | |
| 				RawOutput: s,
 | |
| 				Error:     sErr,
 | |
| 			}
 | |
| 			_ = callbacks.OnEnd(ctx, sOutput)
 | |
| 		}
 | |
| 		return s, true
 | |
| 	default:
 | |
| 		if r.callbackEnabled {
 | |
| 			_ = callbacks.OnError(ctx, sErr)
 | |
| 		}
 | |
| 		return nil, false
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func parseDefaultOutput(ctx context.Context, data string, schema_ map[string]*vo.TypeInfo) (map[string]any, error) {
 | |
| 	var result map[string]any
 | |
| 
 | |
| 	err := sonic.UnmarshalString(data, &result)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	r, ws, e := nodes.ConvertInputs(ctx, result, schema_)
 | |
| 	if e != nil {
 | |
| 		return nil, e
 | |
| 	}
 | |
| 
 | |
| 	if ws != nil {
 | |
| 		logs.CtxWarnf(ctx, "convert output warnings: %v", *ws)
 | |
| 	}
 | |
| 
 | |
| 	return r, nil
 | |
| }
 | |
| 
 | |
| func parseDefaultOutputOrFallback(ctx context.Context, data string, schema_ map[string]*vo.TypeInfo) map[string]any {
 | |
| 	result, err := parseDefaultOutput(ctx, data, schema_)
 | |
| 	if err != nil {
 | |
| 		fallback := make(map[string]any, len(schema_))
 | |
| 		for k, v := range schema_ {
 | |
| 			if v.Type == vo.DataTypeString {
 | |
| 				fallback[k] = data
 | |
| 				continue
 | |
| 			}
 | |
| 			fallback[k] = v.Zero()
 | |
| 		}
 | |
| 		return fallback
 | |
| 	}
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| func preTypeConverter(inTypes map[string]*vo.TypeInfo) func(ctx context.Context, in map[string]any) (map[string]any, error) {
 | |
| 	return func(ctx context.Context, in map[string]any) (map[string]any, error) {
 | |
| 		out, ws, err := nodes.ConvertInputs(ctx, in, inTypes)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if ws != nil {
 | |
| 			logs.CtxWarnf(ctx, "convert inputs warnings: %v", *ws)
 | |
| 		}
 | |
| 
 | |
| 		return out, err
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func trimKeyFinishedMarker(ctx context.Context, in map[string]any) (map[string]any, bool, error) {
 | |
| 	var (
 | |
| 		newIn   map[string]any
 | |
| 		trimmed bool
 | |
| 	)
 | |
| 	for k, v := range in {
 | |
| 		if vStr, ok := v.(string); ok {
 | |
| 			if strings.HasSuffix(vStr, nodes.KeyIsFinished) {
 | |
| 				if newIn == nil {
 | |
| 					newIn = maps.Clone(in)
 | |
| 				}
 | |
| 				vStr = strings.TrimSuffix(vStr, nodes.KeyIsFinished)
 | |
| 				newIn[k] = vStr
 | |
| 				trimmed = true
 | |
| 			}
 | |
| 		} else if vMap, ok := v.(map[string]any); ok {
 | |
| 			newMap, subTrimmed, err := trimKeyFinishedMarker(ctx, vMap)
 | |
| 			if err != nil {
 | |
| 				return nil, false, err
 | |
| 			}
 | |
| 			if subTrimmed {
 | |
| 				if newIn == nil {
 | |
| 					newIn = maps.Clone(in)
 | |
| 				}
 | |
| 				newIn[k] = newMap
 | |
| 				trimmed = true
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if trimmed {
 | |
| 		return newIn, true, nil
 | |
| 	}
 | |
| 
 | |
| 	return in, false, nil
 | |
| }
 | |
| 
 | |
| func keyFinishedMarkerTrimmer() func(ctx context.Context, in map[string]any) (map[string]any, error) {
 | |
| 	return func(ctx context.Context, in map[string]any) (map[string]any, error) {
 | |
| 		out, _, err := trimKeyFinishedMarker(ctx, in)
 | |
| 		return out, err
 | |
| 	}
 | |
| }
 |