From 5e22818296b8c7958713edb8507c789739a7a019 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yanli=20=E7=9B=90=E7=B2=92?= Date: Tue, 17 Mar 2026 19:33:49 +0800 Subject: [PATCH] fix: match repeated workflow node finishes by execution id --- .../workflow-stream-handlers.spec.ts | 33 +++++++++++++++++++ .../result/workflow-stream-handlers.ts | 11 ++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/web/app/components/share/text-generation/result/__tests__/workflow-stream-handlers.spec.ts b/web/app/components/share/text-generation/result/__tests__/workflow-stream-handlers.spec.ts index 4b61a8ffd9..703d43cf3f 100644 --- a/web/app/components/share/text-generation/result/__tests__/workflow-stream-handlers.spec.ts +++ b/web/app/components/share/text-generation/result/__tests__/workflow-stream-handlers.spec.ts @@ -186,6 +186,38 @@ describe('workflow-stream-handlers helpers', () => { })) }) + it('should finish the matching top-level trace when the same node runs again with a new execution id', () => { + const process = createWorkflowProcess() + process.tracing = [ + createTrace({ + id: 'trace-1', + node_id: 'node-1', + status: NodeRunningStatus.Succeeded, + }), + createTrace({ + id: 'trace-2', + node_id: 'node-1', + status: NodeRunningStatus.Running, + }), + ] + + const updatedProcess = finishWorkflowNode(process, createTrace({ + id: 'trace-2', + node_id: 'node-1', + status: NodeRunningStatus.Succeeded, + }))! + + expect(updatedProcess.tracing).toHaveLength(2) + expect(updatedProcess.tracing[0]).toEqual(expect.objectContaining({ + id: 'trace-1', + status: NodeRunningStatus.Succeeded, + })) + expect(updatedProcess.tracing[1]).toEqual(expect.objectContaining({ + id: 'trace-2', + status: NodeRunningStatus.Succeeded, + })) + }) + it('should leave tracing unchanged when a parallel next event has no matching trace', () => { const process = createWorkflowProcess() process.tracing = [ @@ -269,6 +301,7 @@ describe('workflow-stream-handlers helpers', () => { loop_id: 'loop-1', })) const unmatchedFinish = finishWorkflowNode(process, createTrace({ + id: 'trace-missing', node_id: 'missing', execution_metadata: { parallel_id: 'missing', diff --git a/web/app/components/share/text-generation/result/workflow-stream-handlers.ts b/web/app/components/share/text-generation/result/workflow-stream-handlers.ts index 7b50faba27..a89cef962a 100644 --- a/web/app/components/share/text-generation/result/workflow-stream-handlers.ts +++ b/web/app/components/share/text-generation/result/workflow-stream-handlers.ts @@ -107,12 +107,21 @@ const upsertWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTrac }) } +const findWorkflowNodeTraceIndex = (tracing: WorkflowProcess['tracing'], data: NodeTracing) => { + return tracing.findIndex((trace) => { + if (trace.id && data.id) + return trace.id === data.id + + return matchParallelTrace(trace, data) + }) +} + const finishWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTracing) => { if (data.iteration_id || data.loop_id) return current return updateWorkflowProcess(current, (draft) => { - const currentIndex = draft.tracing.findIndex(trace => matchParallelTrace(trace, data)) + const currentIndex = findWorkflowNodeTraceIndex(draft.tracing, data) if (currentIndex > -1) { draft.tracing[currentIndex] = { ...(draft.tracing[currentIndex].extras