From 31621542c8b58a21b143ee9d4113f4b437b36e13 Mon Sep 17 00:00:00 2001 From: "zhuangjie.1125" Date: Mon, 21 Jul 2025 18:30:36 +0800 Subject: [PATCH] feat(backend): batch & loop node fix get input --- .../api/handler/coze/workflow_service_test.go | 49 ++++ .../examples/loop_with_object_input.json | 254 ++++++++++++++++++ .../workflow/internal/nodes/batch/batch.go | 19 +- .../workflow/internal/nodes/loop/loop.go | 19 +- 4 files changed, 317 insertions(+), 24 deletions(-) create mode 100644 backend/domain/workflow/internal/canvas/examples/loop_with_object_input.json diff --git a/backend/api/handler/coze/workflow_service_test.go b/backend/api/handler/coze/workflow_service_test.go index 4527f887..147a6e5d 100644 --- a/backend/api/handler/coze/workflow_service_test.go +++ b/backend/api/handler/coze/workflow_service_test.go @@ -3604,6 +3604,55 @@ func TestNodeDebugLoop(t *testing.T) { result = r.getNodeExeHistory(id, "", "wrong_node_id", ptr.Of(workflow.NodeHistoryScene_TestRunInput)) assert.Equal(t, "", result.Output) }) + + mockey.PatchConvey("test node debug loop", t, func() { + r := newWfTestRunner(t) + defer r.closeFn() + + id := r.load("loop_selector_variable_assign_text_processor.json") + exeID := r.nodeDebug(id, "192046", withNDInput(map[string]string{"input": `["a", "bb", "ccc", "dddd"]`})) + e := r.getProcess(id, exeID, withSpecificNodeID("192046")) + e.assertSuccess() + assert.Equal(t, map[string]any{ + "converted": []any{ + "new_a", + "new_ccc", + }, + "variable_out": "dddd", + }, mustUnmarshalToMap(t, e.output)) + + result := r.getNodeExeHistory(id, exeID, "192046", nil) + assert.Equal(t, mustUnmarshalToMap(t, e.output), mustUnmarshalToMap(t, result.Output)) + + // verify this workflow has not been successfully test ran + result = r.getNodeExeHistory(id, "", "100001", ptr.Of(workflow.NodeHistoryScene_TestRunInput)) + assert.Equal(t, "", result.Output) + + // verify that another node of this workflow is not node debugged + result = r.getNodeExeHistory(id, "", "wrong_node_id", ptr.Of(workflow.NodeHistoryScene_TestRunInput)) + assert.Equal(t, "", result.Output) + }) + + mockey.PatchConvey("test node debug loop", t, func() { + r := newWfTestRunner(t) + defer r.closeFn() + runner := mockcode.NewMockRunner(r.ctrl) + runner.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, request *code.RunRequest) (*code.RunResponse, error) { + return &code.RunResponse{ + Result: request.Params, + }, nil + }).AnyTimes() + + code.SetCodeRunner(runner) + id := r.load("loop_with_object_input.json") + exeID := r.nodeDebug(id, "122149", + withNDInput(map[string]string{"input": `[{"a":"1"},{"a":"2"}]`})) + e := r.getProcess(id, exeID, withSpecificNodeID("122149")) + e.assertSuccess() + assert.Equal(t, `{"output":["1","2"]}`, e.output) + + }) + } func TestCopyWorkflow(t *testing.T) { diff --git a/backend/domain/workflow/internal/canvas/examples/loop_with_object_input.json b/backend/domain/workflow/internal/canvas/examples/loop_with_object_input.json new file mode 100644 index 00000000..63cf8d8b --- /dev/null +++ b/backend/domain/workflow/internal/canvas/examples/loop_with_object_input.json @@ -0,0 +1,254 @@ +{ + "nodes": [ + { + "id": "100001", + "type": "1", + "meta": { + "position": { + "x": 180, + "y": 26.000000000000007 + } + }, + "data": { + "nodeMeta": { + "description": "工作流的起始节点,用于设定启动工作流需要的信息", + "icon": "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Start-v2.jpg", + "subTitle": "", + "title": "开始" + }, + "outputs": [ + { + "type": "list", + "name": "objs", + "schema": { + "type": "object", + "schema": [] + }, + "required": true + } + ], + "trigger_parameters": [ + { + "type": "list", + "name": "objs", + "schema": { + "type": "object", + "schema": [] + }, + "required": true + } + ] + } + }, + { + "id": "900001", + "type": "2", + "meta": { + "position": { + "x": 1300, + "y": 13.000000000000007 + } + }, + "data": { + "nodeMeta": { + "description": "工作流的最终节点,用于返回工作流运行后的结果信息", + "icon": "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-End-v2.jpg", + "subTitle": "", + "title": "结束" + }, + "inputs": { + "terminatePlan": "returnVariables", + "inputParameters": [ + { + "name": "output", + "input": { + "type": "list", + "schema": { + "type": "string" + }, + "value": { + "type": "ref", + "content": { + "source": "block-output", + "blockID": "122149", + "name": "output" + }, + "rawMeta": { + "type": 99 + } + } + } + } + ] + } + } + }, + { + "id": "122149", + "type": "21", + "meta": { + "position": { + "x": 740, + "y": 0 + }, + "canvasPosition": { + "x": 560, + "y": 317.30000000000007 + } + }, + "data": { + "nodeMeta": { + "title": "循环", + "icon": "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Loop-v2.jpg", + "description": "用于通过设定循环次数和逻辑,重复执行一系列任务", + "mainColor": "#00B2B2", + "subTitle": "循环" + }, + "inputs": { + "loopType": "array", + "loopCount": { + "type": "integer", + "value": { + "type": "literal", + "content": "10" + } + }, + "variableParameters": [], + "inputParameters": [ + { + "name": "input", + "input": { + "type": "list", + "schema": { + "type": "object", + "schema": [] + }, + "value": { + "type": "ref", + "content": { + "source": "block-output", + "blockID": "100001", + "name": "objs" + }, + "rawMeta": { + "type": 103 + } + } + } + } + ] + }, + "outputs": [ + { + "name": "output", + "input": { + "type": "list", + "schema": { + "type": "string" + }, + "value": { + "type": "ref", + "content": { + "source": "block-output", + "blockID": "116706", + "name": "input.a" + }, + "rawMeta": { + "type": 1 + } + } + } + } + ] + }, + "blocks": [ + { + "id": "116706", + "type": "5", + "meta": { + "position": { + "x": 180, + "y": 0 + } + }, + "data": { + "nodeMeta": { + "title": "代码", + "icon": "https://lf3-static.bytednsdoc.com/obj/eden-cn/dvsmryvd_avi_dvsm/ljhwZthlaukjlkulzlp/icon/icon-Code-v2.jpg", + "description": "编写代码,处理输入变量来生成返回值", + "mainColor": "#00B2B2", + "subTitle": "代码" + }, + "inputs": { + "inputParameters": [ + { + "name": "input", + "input": { + "type": "object", + "schema": [], + "value": { + "type": "ref", + "content": { + "source": "block-output", + "blockID": "122149", + "name": "input" + }, + "rawMeta": { + "type": 6 + } + } + } + } + ], + "code": "# 在这里,您可以通过 'args' 获取节点中的输入变量,并通过 'ret' 输出结果\n# 'args' 已经被正确地注入到环境中\n# 下面是一个示例,首先获取节点的全部输入参数params,其次获取其中参数名为'input'的值:\n# params = args.params; \n# input = params['input'];\n# 下面是一个示例,输出一个包含多种数据类型的 'ret' 对象:\n# ret: Output = { \"name\": '小明', \"hobbies\": [\"看书\", \"旅游\"] };\nimport json\nasync def main(args: Args) -> Output:\n params = args.params\n return params", + "language": 3, + "settingOnError": { + "processType": 1, + "timeoutMs": 60000, + "retryTimes": 0 + } + }, + "outputs": [ + { + "type": "object", + "name": "input", + "schema": [ + { + "type": "string", + "name": "a" + } + ] + } + ] + } + } + ], + "edges": [ + { + "sourceNodeID": "122149", + "targetNodeID": "116706", + "sourcePortID": "loop-function-inline-output" + }, + { + "sourceNodeID": "116706", + "targetNodeID": "122149", + "targetPortID": "loop-function-inline-input" + } + ] + } + ], + "edges": [ + { + "sourceNodeID": "100001", + "targetNodeID": "122149" + }, + { + "sourceNodeID": "122149", + "targetNodeID": "900001", + "sourcePortID": "loop-output" + } + ], + "versions": { + "loop": "v2" + } +} diff --git a/backend/domain/workflow/internal/nodes/batch/batch.go b/backend/domain/workflow/internal/nodes/batch/batch.go index dac31812..95b96a2a 100644 --- a/backend/domain/workflow/internal/nodes/batch/batch.go +++ b/backend/domain/workflow/internal/nodes/batch/batch.go @@ -169,21 +169,16 @@ func (b *Batch) Execute(ctx context.Context, in map[string]any, opts ...nodes.Ne currentKey := string(b.config.BatchNodeKey) + "#" + arrayKey // Recursively expand map[string]any elements - if m, ok := ele.(map[string]any); ok { - var expand func(prefix string, val interface{}) - expand = func(prefix string, val interface{}) { - if nestedMap, ok := val.(map[string]any); ok { - for k, v := range nestedMap { - expand(prefix+"#"+k, v) - } - } else { - input[prefix] = val + var expand func(prefix string, val interface{}) + expand = func(prefix string, val interface{}) { + input[prefix] = val + if nestedMap, ok := val.(map[string]any); ok { + for k, v := range nestedMap { + expand(prefix+"#"+k, v) } } - expand(currentKey, m) - } else { - input[currentKey] = ele } + expand(currentKey, ele) } return input, items, nil diff --git a/backend/domain/workflow/internal/nodes/loop/loop.go b/backend/domain/workflow/internal/nodes/loop/loop.go index 7f1d3a9c..ab570a46 100644 --- a/backend/domain/workflow/internal/nodes/loop/loop.go +++ b/backend/domain/workflow/internal/nodes/loop/loop.go @@ -199,21 +199,16 @@ func (l *Loop) Execute(ctx context.Context, in map[string]any, opts ...nodes.Nes currentKey := string(l.config.LoopNodeKey) + "#" + arrayKey // Recursively expand map[string]any elements - if m, ok := ele.(map[string]any); ok { - var expand func(prefix string, val interface{}) - expand = func(prefix string, val interface{}) { - if nestedMap, ok := val.(map[string]any); ok { - for k, v := range nestedMap { - expand(prefix+"#"+k, v) - } - } else { - input[prefix] = val + var expand func(prefix string, val interface{}) + expand = func(prefix string, val interface{}) { + input[prefix] = val + if nestedMap, ok := val.(map[string]any); ok { + for k, v := range nestedMap { + expand(prefix+"#"+k, v) } } - expand(currentKey, m) - } else { - input[currentKey] = ele } + expand(currentKey, ele) } return input, items, nil