fix: match repeated workflow node finishes by execution id

This commit is contained in:
Yanli 盐粒 2026-03-17 19:33:49 +08:00
parent 64308c3d0d
commit 5e22818296
2 changed files with 43 additions and 1 deletions

View File

@ -186,6 +186,38 @@ describe('workflow-stream-handlers helpers', () => {
}))
})
it('should finish the matching top-level trace when the same node runs again with a new execution id', () => {
const process = createWorkflowProcess()
process.tracing = [
createTrace({
id: 'trace-1',
node_id: 'node-1',
status: NodeRunningStatus.Succeeded,
}),
createTrace({
id: 'trace-2',
node_id: 'node-1',
status: NodeRunningStatus.Running,
}),
]
const updatedProcess = finishWorkflowNode(process, createTrace({
id: 'trace-2',
node_id: 'node-1',
status: NodeRunningStatus.Succeeded,
}))!
expect(updatedProcess.tracing).toHaveLength(2)
expect(updatedProcess.tracing[0]).toEqual(expect.objectContaining({
id: 'trace-1',
status: NodeRunningStatus.Succeeded,
}))
expect(updatedProcess.tracing[1]).toEqual(expect.objectContaining({
id: 'trace-2',
status: NodeRunningStatus.Succeeded,
}))
})
it('should leave tracing unchanged when a parallel next event has no matching trace', () => {
const process = createWorkflowProcess()
process.tracing = [
@ -269,6 +301,7 @@ describe('workflow-stream-handlers helpers', () => {
loop_id: 'loop-1',
}))
const unmatchedFinish = finishWorkflowNode(process, createTrace({
id: 'trace-missing',
node_id: 'missing',
execution_metadata: {
parallel_id: 'missing',

View File

@ -107,12 +107,21 @@ const upsertWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTrac
})
}
const findWorkflowNodeTraceIndex = (tracing: WorkflowProcess['tracing'], data: NodeTracing) => {
return tracing.findIndex((trace) => {
if (trace.id && data.id)
return trace.id === data.id
return matchParallelTrace(trace, data)
})
}
const finishWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTracing) => {
if (data.iteration_id || data.loop_id)
return current
return updateWorkflowProcess(current, (draft) => {
const currentIndex = draft.tracing.findIndex(trace => matchParallelTrace(trace, data))
const currentIndex = findWorkflowNodeTraceIndex(draft.tracing, data)
if (currentIndex > -1) {
draft.tracing[currentIndex] = {
...(draft.tracing[currentIndex].extras