957 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			957 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  * Copyright 2025 coze-dev Authors
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| 
 | |
| package compose
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"runtime/debug"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/cloudwego/eino/callbacks"
 | |
| 	"github.com/cloudwego/eino/compose"
 | |
| 	"github.com/cloudwego/eino/schema"
 | |
| 	"golang.org/x/exp/maps"
 | |
| 
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/workflow/entity"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/workflow/entity/vo"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/execute"
 | |
| 	"github.com/coze-dev/coze-studio/backend/domain/workflow/internal/nodes"
 | |
| 	schema2 "github.com/coze-dev/coze-studio/backend/domain/workflow/internal/schema"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/ctxcache"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/errorx"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/logs"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/safego"
 | |
| 	"github.com/coze-dev/coze-studio/backend/pkg/sonic"
 | |
| 	"github.com/coze-dev/coze-studio/backend/types/errno"
 | |
| )
 | |
| 
 | |
| type nodeRunConfig[O any] struct {
 | |
| 	nodeKey             vo.NodeKey
 | |
| 	nodeName            string
 | |
| 	nodeType            entity.NodeType
 | |
| 	timeoutMS           int64
 | |
| 	maxRetry            int64
 | |
| 	errProcessType      vo.ErrorProcessType
 | |
| 	dataOnErr           func(ctx context.Context) map[string]any
 | |
| 	preProcessors       []func(ctx context.Context, input map[string]any) (map[string]any, error)
 | |
| 	postProcessors      []func(ctx context.Context, input map[string]any) (map[string]any, error)
 | |
| 	streamPreProcessors []func(ctx context.Context,
 | |
| 		input *schema.StreamReader[map[string]any]) *schema.StreamReader[map[string]any]
 | |
| 	callbackInputConverter  func(context.Context, map[string]any) (map[string]any, error)
 | |
| 	callbackOutputConverter func(context.Context, map[string]any) (*nodes.StructuredCallbackOutput, error)
 | |
| 	init                    []func(context.Context) (context.Context, error)
 | |
| 	i                       compose.Invoke[map[string]any, map[string]any, O]
 | |
| 	s                       compose.Stream[map[string]any, map[string]any, O]
 | |
| 	c                       compose.Collect[map[string]any, map[string]any, O]
 | |
| 	t                       compose.Transform[map[string]any, map[string]any, O]
 | |
| }
 | |
| 
 | |
| func newNodeRunConfig[O any](ns *schema2.NodeSchema,
 | |
| 	i compose.Invoke[map[string]any, map[string]any, O],
 | |
| 	s compose.Stream[map[string]any, map[string]any, O],
 | |
| 	c compose.Collect[map[string]any, map[string]any, O],
 | |
| 	t compose.Transform[map[string]any, map[string]any, O],
 | |
| 	opts *newNodeOptions) *nodeRunConfig[O] {
 | |
| 	meta := entity.NodeMetaByNodeType(ns.Type)
 | |
| 
 | |
| 	var (
 | |
| 		timeoutMS      = meta.DefaultTimeoutMS
 | |
| 		maxRetry       int64
 | |
| 		errProcessType = vo.ErrorProcessTypeThrow
 | |
| 		dataOnErr      func(ctx context.Context) map[string]any
 | |
| 	)
 | |
| 	if ns.ExceptionConfigs != nil {
 | |
| 		timeoutMS = ns.ExceptionConfigs.TimeoutMS
 | |
| 		maxRetry = ns.ExceptionConfigs.MaxRetry
 | |
| 		if ns.ExceptionConfigs.ProcessType != nil {
 | |
| 			errProcessType = *ns.ExceptionConfigs.ProcessType
 | |
| 		}
 | |
| 		if len(ns.ExceptionConfigs.DataOnErr) > 0 {
 | |
| 			dataOnErr = func(ctx context.Context) map[string]any {
 | |
| 				return parseDefaultOutputOrFallback(ctx, ns.ExceptionConfigs.DataOnErr, ns.OutputTypes)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	preProcessors := []func(ctx context.Context, input map[string]any) (map[string]any, error){
 | |
| 		preTypeConverter(ns.InputTypes),
 | |
| 		keyFinishedMarkerTrimmer(),
 | |
| 	}
 | |
| 	if meta.PreFillZero {
 | |
| 		preProcessors = append(preProcessors, inputValueFiller(ns))
 | |
| 	}
 | |
| 
 | |
| 	var postProcessors []func(ctx context.Context, input map[string]any) (map[string]any, error)
 | |
| 	if meta.PostFillNil {
 | |
| 		postProcessors = append(postProcessors, outputValueFiller(ns))
 | |
| 	}
 | |
| 
 | |
| 	streamPreProcessors := []func(ctx context.Context,
 | |
| 		input *schema.StreamReader[map[string]any]) *schema.StreamReader[map[string]any]{
 | |
| 		func(ctx context.Context, input *schema.StreamReader[map[string]any]) *schema.StreamReader[map[string]any] {
 | |
| 			f := func(in map[string]any) (map[string]any, error) {
 | |
| 				return preTypeConverter(ns.InputTypes)(ctx, in)
 | |
| 			}
 | |
| 			return schema.StreamReaderWithConvert(input, f)
 | |
| 		},
 | |
| 	}
 | |
| 	if meta.PreFillZero {
 | |
| 		streamPreProcessors = append(streamPreProcessors, streamInputValueFiller(ns))
 | |
| 	}
 | |
| 
 | |
| 	if meta.UseCtxCache {
 | |
| 		opts.init = append([]func(ctx context.Context) (context.Context, error){
 | |
| 			func(ctx context.Context) (context.Context, error) {
 | |
| 				return ctxcache.Init(ctx), nil
 | |
| 			},
 | |
| 		}, opts.init...)
 | |
| 	}
 | |
| 
 | |
| 	if execute.GetStaticConfig().MaxNodeCountPerExecution > 0 {
 | |
| 		opts.init = append(opts.init, func(ctx context.Context) (context.Context, error) {
 | |
| 			current, exceeded := execute.IncrementAndCheckExecutedNodes(ctx)
 | |
| 			if exceeded {
 | |
| 				return nil, fmt.Errorf("exceeded max executed node count: %d, current: %d", execute.GetStaticConfig().MaxNodeCountPerExecution, current)
 | |
| 			}
 | |
| 			return ctx, nil
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return &nodeRunConfig[O]{
 | |
| 		nodeKey:                 ns.Key,
 | |
| 		nodeName:                ns.Name,
 | |
| 		nodeType:                ns.Type,
 | |
| 		timeoutMS:               timeoutMS,
 | |
| 		maxRetry:                maxRetry,
 | |
| 		errProcessType:          errProcessType,
 | |
| 		dataOnErr:               dataOnErr,
 | |
| 		preProcessors:           preProcessors,
 | |
| 		postProcessors:          postProcessors,
 | |
| 		streamPreProcessors:     streamPreProcessors,
 | |
| 		callbackInputConverter:  opts.callbackInputConverter,
 | |
| 		callbackOutputConverter: opts.callbackOutputConverter,
 | |
| 		init:                    opts.init,
 | |
| 		i:                       i,
 | |
| 		s:                       s,
 | |
| 		c:                       c,
 | |
| 		t:                       t,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func newNodeRunConfigWOOpt(ns *schema2.NodeSchema,
 | |
| 	i compose.InvokeWOOpt[map[string]any, map[string]any],
 | |
| 	s compose.StreamWOOpt[map[string]any, map[string]any],
 | |
| 	c compose.CollectWOOpt[map[string]any, map[string]any],
 | |
| 	t compose.TransformWOOpts[map[string]any, map[string]any],
 | |
| 	opts *newNodeOptions) *nodeRunConfig[any] {
 | |
| 	var (
 | |
| 		iWO compose.Invoke[map[string]any, map[string]any, any]
 | |
| 		sWO compose.Stream[map[string]any, map[string]any, any]
 | |
| 		cWO compose.Collect[map[string]any, map[string]any, any]
 | |
| 		tWO compose.Transform[map[string]any, map[string]any, any]
 | |
| 	)
 | |
| 
 | |
| 	if i != nil {
 | |
| 		iWO = func(ctx context.Context, in map[string]any, _ ...any) (out map[string]any, err error) {
 | |
| 			return i(ctx, in)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if s != nil {
 | |
| 		sWO = func(ctx context.Context, in map[string]any, _ ...any) (out *schema.StreamReader[map[string]any], err error) {
 | |
| 			return s(ctx, in)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if c != nil {
 | |
| 		cWO = func(ctx context.Context, in *schema.StreamReader[map[string]any], _ ...any) (out map[string]any, err error) {
 | |
| 			return c(ctx, in)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if t != nil {
 | |
| 		tWO = func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...any) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 			return t(ctx, input)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return newNodeRunConfig[any](ns, iWO, sWO, cWO, tWO, opts)
 | |
| }
 | |
| 
 | |
| type newNodeOptions struct {
 | |
| 	callbackInputConverter  func(context.Context, map[string]any) (map[string]any, error)
 | |
| 	callbackOutputConverter func(context.Context, map[string]any) (*nodes.StructuredCallbackOutput, error)
 | |
| 	init                    []func(context.Context) (context.Context, error)
 | |
| }
 | |
| 
 | |
| func toNode(ns *schema2.NodeSchema, r any) *Node {
 | |
| 	iWOpt, _ := r.(nodes.InvokableNodeWOpt)
 | |
| 	sWOpt, _ := r.(nodes.StreamableNodeWOpt)
 | |
| 	cWOpt, _ := r.(nodes.CollectableNodeWOpt)
 | |
| 	tWOpt, _ := r.(nodes.TransformableNodeWOpt)
 | |
| 	iWOOpt, _ := r.(nodes.InvokableNode)
 | |
| 	sWOOpt, _ := r.(nodes.StreamableNode)
 | |
| 	cWOOpt, _ := r.(nodes.CollectableNode)
 | |
| 	tWOOpt, _ := r.(nodes.TransformableNode)
 | |
| 
 | |
| 	var wOpt, wOOpt bool
 | |
| 	if iWOpt != nil || sWOpt != nil || cWOpt != nil || tWOpt != nil {
 | |
| 		wOpt = true
 | |
| 	}
 | |
| 	if iWOOpt != nil || sWOOpt != nil || cWOOpt != nil || tWOOpt != nil {
 | |
| 		wOOpt = true
 | |
| 	}
 | |
| 
 | |
| 	if wOpt && wOOpt {
 | |
| 		panic("a node's different streaming methods needs to be consistent: " +
 | |
| 			"they should ALL have NodeOption or None should have them")
 | |
| 	}
 | |
| 
 | |
| 	if !wOpt && !wOOpt {
 | |
| 		panic("a node should implement at least one interface among: InvokableNodeWOpt, StreamableNodeWOpt, CollectableNodeWOpt, TransformableNodeWOpt, InvokableNode, StreamableNode, CollectableNode, TransformableNode")
 | |
| 	}
 | |
| 
 | |
| 	options := &newNodeOptions{}
 | |
| 	ci, ok := r.(nodes.CallbackInputConverted)
 | |
| 	if ok {
 | |
| 		options.callbackInputConverter = ci.ToCallbackInput
 | |
| 	}
 | |
| 
 | |
| 	co, ok := r.(nodes.CallbackOutputConverted)
 | |
| 	if ok {
 | |
| 		options.callbackOutputConverter = co.ToCallbackOutput
 | |
| 	}
 | |
| 
 | |
| 	init, ok := r.(nodes.Initializer)
 | |
| 	if ok {
 | |
| 		options.init = append(options.init, init.Init)
 | |
| 	}
 | |
| 
 | |
| 	if wOpt {
 | |
| 		var (
 | |
| 			i compose.Invoke[map[string]any, map[string]any, nodes.NodeOption]
 | |
| 			s compose.Stream[map[string]any, map[string]any, nodes.NodeOption]
 | |
| 			c compose.Collect[map[string]any, map[string]any, nodes.NodeOption]
 | |
| 			t compose.Transform[map[string]any, map[string]any, nodes.NodeOption]
 | |
| 		)
 | |
| 
 | |
| 		if iWOpt != nil {
 | |
| 			i = iWOpt.Invoke
 | |
| 		}
 | |
| 
 | |
| 		if sWOpt != nil {
 | |
| 			s = sWOpt.Stream
 | |
| 		}
 | |
| 
 | |
| 		if cWOpt != nil {
 | |
| 			c = cWOpt.Collect
 | |
| 		}
 | |
| 
 | |
| 		if tWOpt != nil {
 | |
| 			t = tWOpt.Transform
 | |
| 		}
 | |
| 
 | |
| 		return newNodeRunConfig(ns, i, s, c, t, options).toNode()
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		i compose.InvokeWOOpt[map[string]any, map[string]any]
 | |
| 		s compose.StreamWOOpt[map[string]any, map[string]any]
 | |
| 		c compose.CollectWOOpt[map[string]any, map[string]any]
 | |
| 		t compose.TransformWOOpts[map[string]any, map[string]any]
 | |
| 	)
 | |
| 
 | |
| 	if iWOOpt != nil {
 | |
| 		i = iWOOpt.Invoke
 | |
| 	}
 | |
| 
 | |
| 	if sWOOpt != nil {
 | |
| 		s = sWOOpt.Stream
 | |
| 	}
 | |
| 
 | |
| 	if cWOOpt != nil {
 | |
| 		c = cWOOpt.Collect
 | |
| 	}
 | |
| 
 | |
| 	if tWOOpt != nil {
 | |
| 		t = tWOOpt.Transform
 | |
| 	}
 | |
| 
 | |
| 	return newNodeRunConfigWOOpt(ns, i, s, c, t, options).toNode()
 | |
| }
 | |
| 
 | |
| func (nc *nodeRunConfig[O]) invoke() func(ctx context.Context, input map[string]any, opts ...O) (output map[string]any, err error) {
 | |
| 	if nc.i == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return func(ctx context.Context, input map[string]any, opts ...O) (output map[string]any, err error) {
 | |
| 		ctx, runner := newNodeRunner(ctx, nc)
 | |
| 
 | |
| 		defer func() {
 | |
| 			if panicErr := recover(); panicErr != nil {
 | |
| 				err = safego.NewPanicErr(panicErr, debug.Stack())
 | |
| 			}
 | |
| 
 | |
| 			if err == nil {
 | |
| 				err = runner.onEnd(ctx, output)
 | |
| 			}
 | |
| 
 | |
| 			if err != nil {
 | |
| 				errOutput, hasErrOutput := runner.onError(ctx, err)
 | |
| 				if hasErrOutput {
 | |
| 					output = errOutput
 | |
| 					err = nil
 | |
| 					if output, err = runner.postProcess(ctx, output); err != nil {
 | |
| 						logs.CtxErrorf(ctx, "postProcess failed after returning error output: %v", err)
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		for _, i := range runner.init {
 | |
| 			var newCtx context.Context
 | |
| 			if newCtx, err = i(ctx); err != nil {
 | |
| 				var err1 error
 | |
| 				if ctx, err1 = runner.onStart(ctx, input); err1 != nil {
 | |
| 					return nil, err1
 | |
| 				}
 | |
| 				return nil, err
 | |
| 			} else {
 | |
| 				ctx = newCtx
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if input, err = runner.preProcess(ctx, input); err != nil {
 | |
| 			var err1 error
 | |
| 			if ctx, err1 = runner.onStart(ctx, input); err1 != nil {
 | |
| 				return nil, err1
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if ctx, err = runner.onStart(ctx, input); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if output, err = runner.invoke(ctx, input, opts...); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return runner.postProcess(ctx, output)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (nc *nodeRunConfig[O]) stream() func(ctx context.Context, input map[string]any, opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 	if nc.s == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return func(ctx context.Context, input map[string]any, opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 		ctx, runner := newNodeRunner(ctx, nc)
 | |
| 
 | |
| 		defer func() {
 | |
| 			if panicErr := recover(); panicErr != nil {
 | |
| 				err = safego.NewPanicErr(panicErr, debug.Stack())
 | |
| 			}
 | |
| 
 | |
| 			if err == nil {
 | |
| 				output, err = runner.onEndStream(ctx, output)
 | |
| 			}
 | |
| 
 | |
| 			if err != nil {
 | |
| 				errOutput, hasErrOutput := runner.onError(ctx, err)
 | |
| 				if hasErrOutput {
 | |
| 					output = schema.StreamReaderFromArray([]map[string]any{errOutput})
 | |
| 					err = nil
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		for _, i := range runner.init {
 | |
| 			var newCtx context.Context
 | |
| 			if newCtx, err = i(ctx); err != nil {
 | |
| 				var err1 error
 | |
| 				if ctx, err1 = runner.onStart(ctx, input); err1 != nil {
 | |
| 					return nil, err1
 | |
| 				}
 | |
| 				return nil, err
 | |
| 			} else {
 | |
| 				ctx = newCtx
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if input, err = runner.preProcess(ctx, input); err != nil {
 | |
| 			var err1 error
 | |
| 			if ctx, err1 = runner.onStart(ctx, input); err1 != nil {
 | |
| 				return nil, err1
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if ctx, err = runner.onStart(ctx, input); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return runner.stream(ctx, input, opts...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (nc *nodeRunConfig[O]) collect() func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output map[string]any, err error) {
 | |
| 	if nc.c == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output map[string]any, err error) {
 | |
| 		ctx, runner := newNodeRunner(ctx, nc)
 | |
| 
 | |
| 		defer func() {
 | |
| 			if panicErr := recover(); panicErr != nil {
 | |
| 				err = safego.NewPanicErr(panicErr, debug.Stack())
 | |
| 			}
 | |
| 
 | |
| 			if err == nil {
 | |
| 				err = runner.onEnd(ctx, output)
 | |
| 			}
 | |
| 
 | |
| 			if err != nil {
 | |
| 				errOutput, hasErrOutput := runner.onError(ctx, err)
 | |
| 				if hasErrOutput {
 | |
| 					output = errOutput
 | |
| 					err = nil
 | |
| 					if output, err = runner.postProcess(ctx, output); err != nil {
 | |
| 						logs.CtxErrorf(ctx, "postProcess failed after returning error output: %v", err)
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		for _, i := range runner.init {
 | |
| 			var newCtx context.Context
 | |
| 			if newCtx, err = i(ctx); err != nil {
 | |
| 				var err1 error
 | |
| 				if ctx, _, err1 = runner.onStartStream(ctx, input); err1 != nil {
 | |
| 					return nil, err1
 | |
| 				}
 | |
| 				return nil, err
 | |
| 			} else {
 | |
| 				ctx = newCtx
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		for _, p := range runner.streamPreProcessors {
 | |
| 			input = p(ctx, input)
 | |
| 		}
 | |
| 
 | |
| 		if ctx, input, err = runner.onStartStream(ctx, input); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return runner.collect(ctx, input, opts...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (nc *nodeRunConfig[O]) transform() func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 	if nc.t == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	return func(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 		ctx, runner := newNodeRunner(ctx, nc)
 | |
| 
 | |
| 		defer func() {
 | |
| 			if panicErr := recover(); panicErr != nil {
 | |
| 				err = safego.NewPanicErr(panicErr, debug.Stack())
 | |
| 			}
 | |
| 
 | |
| 			if err == nil {
 | |
| 				output, err = runner.onEndStream(ctx, output)
 | |
| 			}
 | |
| 
 | |
| 			if err != nil {
 | |
| 				errOutput, hasErrOutput := runner.onError(ctx, err)
 | |
| 				if hasErrOutput {
 | |
| 					output = schema.StreamReaderFromArray([]map[string]any{errOutput})
 | |
| 					err = nil
 | |
| 				}
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		for _, i := range runner.init {
 | |
| 			var newCtx context.Context
 | |
| 			if newCtx, err = i(ctx); err != nil {
 | |
| 				var err1 error
 | |
| 				if ctx, _, err1 = runner.onStartStream(ctx, input); err1 != nil {
 | |
| 					return nil, err1
 | |
| 				}
 | |
| 				return nil, err
 | |
| 			} else {
 | |
| 				ctx = newCtx
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		for _, p := range runner.streamPreProcessors {
 | |
| 			input = p(ctx, input)
 | |
| 		}
 | |
| 
 | |
| 		if ctx, input, err = runner.onStartStream(ctx, input); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return runner.transform(ctx, input, opts...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (nc *nodeRunConfig[O]) toNode() *Node {
 | |
| 	var opts []compose.LambdaOpt
 | |
| 	opts = append(opts, compose.WithLambdaType(string(nc.nodeType)))
 | |
| 	opts = append(opts, compose.WithLambdaCallbackEnable(true))
 | |
| 
 | |
| 	l, err := compose.AnyLambda(nc.invoke(), nc.stream(), nc.collect(), nc.transform(), opts...)
 | |
| 	if err != nil {
 | |
| 		panic(fmt.Sprintf("failed to create lambda for node %s, err: %v", nc.nodeName, err))
 | |
| 	}
 | |
| 
 | |
| 	return &Node{Lambda: l}
 | |
| }
 | |
| 
 | |
| type nodeRunner[O any] struct {
 | |
| 	*nodeRunConfig[O]
 | |
| 	interrupted bool
 | |
| 	cancelFn    context.CancelFunc
 | |
| }
 | |
| 
 | |
| func newNodeRunner[O any](ctx context.Context, cfg *nodeRunConfig[O]) (context.Context, *nodeRunner[O]) {
 | |
| 	runner := &nodeRunner[O]{
 | |
| 		nodeRunConfig: cfg,
 | |
| 	}
 | |
| 
 | |
| 	if cfg.timeoutMS > 0 {
 | |
| 		ctx, runner.cancelFn = context.WithTimeout(ctx, time.Duration(cfg.timeoutMS)*time.Millisecond)
 | |
| 	}
 | |
| 
 | |
| 	return ctx, runner
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onStart(ctx context.Context, input map[string]any) (context.Context, error) {
 | |
| 	if r.callbackInputConverter != nil {
 | |
| 		convertedInput, err := r.callbackInputConverter(ctx, input)
 | |
| 		if err != nil {
 | |
| 			ctx = callbacks.OnStart(ctx, input)
 | |
| 			return ctx, err
 | |
| 		}
 | |
| 		ctx = callbacks.OnStart(ctx, convertedInput)
 | |
| 	} else {
 | |
| 		ctx = callbacks.OnStart(ctx, input)
 | |
| 	}
 | |
| 
 | |
| 	return ctx, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onStartStream(ctx context.Context, input *schema.StreamReader[map[string]any]) (
 | |
| 	context.Context, *schema.StreamReader[map[string]any], error) {
 | |
| 	if r.callbackInputConverter != nil {
 | |
| 		copied := input.Copy(2)
 | |
| 		realConverter := func(ctx context.Context) func(map[string]any) (map[string]any, error) {
 | |
| 			return func(in map[string]any) (map[string]any, error) {
 | |
| 				return r.callbackInputConverter(ctx, in)
 | |
| 			}
 | |
| 		}
 | |
| 		callbackS := schema.StreamReaderWithConvert(copied[0], realConverter(ctx))
 | |
| 		newCtx, unused := callbacks.OnStartWithStreamInput(ctx, callbackS)
 | |
| 		unused.Close()
 | |
| 		return newCtx, copied[1], nil
 | |
| 	}
 | |
| 
 | |
| 	newCtx, newInput := callbacks.OnStartWithStreamInput(ctx, input)
 | |
| 	return newCtx, newInput, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) preProcess(ctx context.Context, input map[string]any) (_ map[string]any, err error) {
 | |
| 	for _, preProcessor := range r.preProcessors {
 | |
| 		if preProcessor == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		input, err = preProcessor(ctx, input)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	return input, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) postProcess(ctx context.Context, output map[string]any) (_ map[string]any, err error) {
 | |
| 	for _, postProcessor := range r.postProcessors {
 | |
| 		if postProcessor == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		output, err = postProcessor(ctx, output)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	return output, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) invoke(ctx context.Context, input map[string]any, opts ...O) (output map[string]any, err error) {
 | |
| 	var n int64
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return nil, ctx.Err()
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		output, err = r.i(ctx, input, opts...)
 | |
| 		if err != nil {
 | |
| 			if _, ok := compose.IsInterruptRerunError(err); ok { // interrupt, won't retry
 | |
| 				r.interrupted = true
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			logs.CtxErrorf(ctx, "[invoke] node %s ID %s failed on %d attempt, err: %v", r.nodeName, r.nodeKey, n, err)
 | |
| 			if r.maxRetry > n {
 | |
| 				n++
 | |
| 				if exeCtx := execute.GetExeCtx(ctx); exeCtx != nil && exeCtx.NodeCtx != nil {
 | |
| 					exeCtx.CurrentRetryCount++
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return output, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) stream(ctx context.Context, input map[string]any, opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 	var n int64
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return nil, ctx.Err()
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		output, err = r.s(ctx, input, opts...)
 | |
| 		if err != nil {
 | |
| 			if _, ok := compose.IsInterruptRerunError(err); ok { // interrupt, won't retry
 | |
| 				r.interrupted = true
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			logs.CtxErrorf(ctx, "[invoke] node %s ID %s failed on %d attempt, err: %v", r.nodeName, r.nodeKey, n, err)
 | |
| 			if r.maxRetry > n {
 | |
| 				n++
 | |
| 				if exeCtx := execute.GetExeCtx(ctx); exeCtx != nil && exeCtx.NodeCtx != nil {
 | |
| 					exeCtx.CurrentRetryCount++
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return output, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) collect(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output map[string]any, err error) {
 | |
| 	if r.maxRetry == 0 {
 | |
| 		return r.c(ctx, input, opts...)
 | |
| 	}
 | |
| 
 | |
| 	copied := input.Copy(int(r.maxRetry))
 | |
| 
 | |
| 	var n int64
 | |
| 	defer func() {
 | |
| 		for i := n + 1; i < r.maxRetry; i++ {
 | |
| 			copied[i].Close()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return nil, ctx.Err()
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		output, err = r.c(ctx, copied[n], opts...)
 | |
| 		if err != nil {
 | |
| 			if _, ok := compose.IsInterruptRerunError(err); ok { // interrupt, won't retry
 | |
| 				r.interrupted = true
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			logs.CtxErrorf(ctx, "[invoke] node %s ID %s failed on %d attempt, err: %v", r.nodeName, r.nodeKey, n, err)
 | |
| 			if r.maxRetry > n {
 | |
| 				n++
 | |
| 				if exeCtx := execute.GetExeCtx(ctx); exeCtx != nil && exeCtx.NodeCtx != nil {
 | |
| 					exeCtx.CurrentRetryCount++
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return output, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) transform(ctx context.Context, input *schema.StreamReader[map[string]any], opts ...O) (output *schema.StreamReader[map[string]any], err error) {
 | |
| 	if r.maxRetry == 0 {
 | |
| 		return r.t(ctx, input, opts...)
 | |
| 	}
 | |
| 
 | |
| 	copied := input.Copy(int(r.maxRetry))
 | |
| 
 | |
| 	var n int64
 | |
| 	defer func() {
 | |
| 		for i := n + 1; i < r.maxRetry; i++ {
 | |
| 			copied[i].Close()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return nil, ctx.Err()
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		output, err = r.t(ctx, copied[n], opts...)
 | |
| 		if err != nil {
 | |
| 			if _, ok := compose.IsInterruptRerunError(err); ok { // interrupt, won't retry
 | |
| 				r.interrupted = true
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			logs.CtxErrorf(ctx, "[invoke] node %s ID %s failed on %d attempt, err: %v", r.nodeName, r.nodeKey, n, err)
 | |
| 			if r.maxRetry > n {
 | |
| 				n++
 | |
| 				if exeCtx := execute.GetExeCtx(ctx); exeCtx != nil && exeCtx.NodeCtx != nil {
 | |
| 					exeCtx.CurrentRetryCount++
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return output, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onEnd(ctx context.Context, output map[string]any) error {
 | |
| 	if r.errProcessType == vo.ErrorProcessTypeExceptionBranch || r.errProcessType == vo.ErrorProcessTypeReturnDefaultData {
 | |
| 		output["isSuccess"] = true
 | |
| 	}
 | |
| 
 | |
| 	if r.callbackOutputConverter != nil {
 | |
| 		convertedOutput, err := r.callbackOutputConverter(ctx, output)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		_ = callbacks.OnEnd(ctx, convertedOutput)
 | |
| 	} else {
 | |
| 		_ = callbacks.OnEnd(ctx, output)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onEndStream(ctx context.Context, output *schema.StreamReader[map[string]any]) (
 | |
| 	*schema.StreamReader[map[string]any], error) {
 | |
| 	if r.errProcessType == vo.ErrorProcessTypeExceptionBranch || r.errProcessType == vo.ErrorProcessTypeReturnDefaultData {
 | |
| 		flag := schema.StreamReaderFromArray([]map[string]any{{"isSuccess": true}})
 | |
| 		output = schema.MergeStreamReaders([]*schema.StreamReader[map[string]any]{flag, output})
 | |
| 	}
 | |
| 
 | |
| 	if r.callbackOutputConverter != nil {
 | |
| 		copied := output.Copy(2)
 | |
| 		realConverter := func(ctx context.Context) func(map[string]any) (*nodes.StructuredCallbackOutput, error) {
 | |
| 			return func(in map[string]any) (*nodes.StructuredCallbackOutput, error) {
 | |
| 				return r.callbackOutputConverter(ctx, in)
 | |
| 			}
 | |
| 		}
 | |
| 		callbackS := schema.StreamReaderWithConvert(copied[0], realConverter(ctx))
 | |
| 		_, unused := callbacks.OnEndWithStreamOutput(ctx, callbackS)
 | |
| 		unused.Close()
 | |
| 
 | |
| 		return copied[1], nil
 | |
| 	}
 | |
| 
 | |
| 	_, newOutput := callbacks.OnEndWithStreamOutput(ctx, output)
 | |
| 	return newOutput, nil
 | |
| }
 | |
| 
 | |
| func (r *nodeRunner[O]) onError(ctx context.Context, err error) (map[string]any, bool) {
 | |
| 	if r.interrupted {
 | |
| 		_ = callbacks.OnError(ctx, err)
 | |
| 		return nil, false
 | |
| 	}
 | |
| 
 | |
| 	var sErr vo.WorkflowError
 | |
| 	if !errors.As(err, &sErr) {
 | |
| 		if errors.Is(err, context.DeadlineExceeded) {
 | |
| 			sErr = vo.NodeTimeoutErr
 | |
| 		} else if errors.Is(err, context.Canceled) {
 | |
| 			sErr = vo.CancelErr
 | |
| 		} else {
 | |
| 			sErr = vo.WrapError(errno.ErrWorkflowExecuteFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	code := int(sErr.Code())
 | |
| 	msg := sErr.Msg()
 | |
| 
 | |
| 	switch r.errProcessType {
 | |
| 	case vo.ErrorProcessTypeReturnDefaultData:
 | |
| 		d := r.dataOnErr(ctx)
 | |
| 		d["errorBody"] = map[string]any{
 | |
| 			"errorMessage": msg,
 | |
| 			"errorCode":    code,
 | |
| 		}
 | |
| 		d["isSuccess"] = false
 | |
| 		sErr = sErr.ChangeErrLevel(vo.LevelWarn)
 | |
| 		sOutput := &nodes.StructuredCallbackOutput{
 | |
| 			Output:    d,
 | |
| 			RawOutput: d,
 | |
| 			Error:     sErr,
 | |
| 		}
 | |
| 		_ = callbacks.OnEnd(ctx, sOutput)
 | |
| 		return d, true
 | |
| 	case vo.ErrorProcessTypeExceptionBranch:
 | |
| 		s := make(map[string]any)
 | |
| 		s["errorBody"] = map[string]any{
 | |
| 			"errorMessage": msg,
 | |
| 			"errorCode":    code,
 | |
| 		}
 | |
| 		s["isSuccess"] = false
 | |
| 		sErr = sErr.ChangeErrLevel(vo.LevelWarn)
 | |
| 		sOutput := &nodes.StructuredCallbackOutput{
 | |
| 			Output:    s,
 | |
| 			RawOutput: s,
 | |
| 			Error:     sErr,
 | |
| 		}
 | |
| 		_ = callbacks.OnEnd(ctx, sOutput)
 | |
| 		return s, true
 | |
| 	default:
 | |
| 		_ = callbacks.OnError(ctx, sErr)
 | |
| 		return nil, false
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func parseDefaultOutput(ctx context.Context, data string, schema_ map[string]*vo.TypeInfo) (map[string]any, error) {
 | |
| 	var result map[string]any
 | |
| 
 | |
| 	err := sonic.UnmarshalString(data, &result)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	r, ws, e := nodes.ConvertInputs(ctx, result, schema_)
 | |
| 	if e != nil {
 | |
| 		return nil, e
 | |
| 	}
 | |
| 
 | |
| 	if ws != nil {
 | |
| 		logs.CtxWarnf(ctx, "convert output warnings: %v", *ws)
 | |
| 	}
 | |
| 
 | |
| 	return r, nil
 | |
| }
 | |
| 
 | |
| func parseDefaultOutputOrFallback(ctx context.Context, data string, schema_ map[string]*vo.TypeInfo) map[string]any {
 | |
| 	result, err := parseDefaultOutput(ctx, data, schema_)
 | |
| 	if err != nil {
 | |
| 		fallback := make(map[string]any, len(schema_))
 | |
| 		for k, v := range schema_ {
 | |
| 			if v.Type == vo.DataTypeString {
 | |
| 				fallback[k] = data
 | |
| 				continue
 | |
| 			}
 | |
| 			fallback[k] = v.Zero()
 | |
| 		}
 | |
| 		return fallback
 | |
| 	}
 | |
| 	return result
 | |
| }
 | |
| 
 | |
| func preTypeConverter(inTypes map[string]*vo.TypeInfo) func(ctx context.Context, in map[string]any) (map[string]any, error) {
 | |
| 	return func(ctx context.Context, in map[string]any) (map[string]any, error) {
 | |
| 		out, ws, err := nodes.ConvertInputs(ctx, in, inTypes)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		if ws != nil {
 | |
| 			logs.CtxWarnf(ctx, "convert inputs warnings: %v", *ws)
 | |
| 		}
 | |
| 
 | |
| 		return out, err
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func trimKeyFinishedMarker(ctx context.Context, in map[string]any) (map[string]any, bool, error) {
 | |
| 	var (
 | |
| 		newIn   map[string]any
 | |
| 		trimmed bool
 | |
| 	)
 | |
| 	for k, v := range in {
 | |
| 		if vStr, ok := v.(string); ok {
 | |
| 			if strings.HasSuffix(vStr, nodes.KeyIsFinished) {
 | |
| 				if newIn == nil {
 | |
| 					newIn = maps.Clone(in)
 | |
| 				}
 | |
| 				vStr = strings.TrimSuffix(vStr, nodes.KeyIsFinished)
 | |
| 				newIn[k] = vStr
 | |
| 				trimmed = true
 | |
| 			}
 | |
| 		} else if vMap, ok := v.(map[string]any); ok {
 | |
| 			newMap, subTrimmed, err := trimKeyFinishedMarker(ctx, vMap)
 | |
| 			if err != nil {
 | |
| 				return nil, false, err
 | |
| 			}
 | |
| 			if subTrimmed {
 | |
| 				if newIn == nil {
 | |
| 					newIn = maps.Clone(in)
 | |
| 				}
 | |
| 				newIn[k] = newMap
 | |
| 				trimmed = true
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if trimmed {
 | |
| 		return newIn, true, nil
 | |
| 	}
 | |
| 
 | |
| 	return in, false, nil
 | |
| }
 | |
| 
 | |
| func keyFinishedMarkerTrimmer() func(ctx context.Context, in map[string]any) (map[string]any, error) {
 | |
| 	return func(ctx context.Context, in map[string]any) (map[string]any, error) {
 | |
| 		out, _, err := trimKeyFinishedMarker(ctx, in)
 | |
| 		return out, err
 | |
| 	}
 | |
| }
 |