mirror of https://github.com/langgenius/dify.git
Merge a17f6f62bf into a813b9f103
This commit is contained in:
commit
55145f8799
|
|
@ -588,6 +588,66 @@ describe('useChat', () => {
|
||||||
expect(lastResponse.workflowProcess?.status).toBe('failed')
|
expect(lastResponse.workflowProcess?.status).toBe('failed')
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should keep separate iteration traces for repeated executions of the same iteration node', async () => {
|
||||||
|
let callbacks: HookCallbacks
|
||||||
|
|
||||||
|
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
|
||||||
|
callbacks = options as HookCallbacks
|
||||||
|
})
|
||||||
|
|
||||||
|
const { result } = renderHook(() => useChat())
|
||||||
|
|
||||||
|
act(() => {
|
||||||
|
result.current.handleSend('test-url', { query: 'iteration trace test' }, {})
|
||||||
|
})
|
||||||
|
|
||||||
|
act(() => {
|
||||||
|
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-1', task_id: 't-1' })
|
||||||
|
callbacks.onIterationStart({ data: { id: 'iter-run-1', node_id: 'iter-1' } })
|
||||||
|
callbacks.onIterationStart({ data: { id: 'iter-run-2', node_id: 'iter-1' } })
|
||||||
|
callbacks.onIterationFinish({ data: { id: 'iter-run-1', node_id: 'iter-1', status: 'succeeded' } })
|
||||||
|
callbacks.onIterationFinish({ data: { id: 'iter-run-2', node_id: 'iter-1', status: 'succeeded' } })
|
||||||
|
})
|
||||||
|
|
||||||
|
const tracing = result.current.chatList[1].workflowProcess?.tracing ?? []
|
||||||
|
|
||||||
|
expect(tracing).toHaveLength(2)
|
||||||
|
expect(tracing).toEqual(expect.arrayContaining([
|
||||||
|
expect.objectContaining({ id: 'iter-run-1', status: 'succeeded' }),
|
||||||
|
expect.objectContaining({ id: 'iter-run-2', status: 'succeeded' }),
|
||||||
|
]))
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should keep separate top-level traces for repeated executions of the same node', async () => {
|
||||||
|
let callbacks: HookCallbacks
|
||||||
|
|
||||||
|
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
|
||||||
|
callbacks = options as HookCallbacks
|
||||||
|
})
|
||||||
|
|
||||||
|
const { result } = renderHook(() => useChat())
|
||||||
|
|
||||||
|
act(() => {
|
||||||
|
result.current.handleSend('test-url', { query: 'top-level trace test' }, {})
|
||||||
|
})
|
||||||
|
|
||||||
|
act(() => {
|
||||||
|
callbacks.onWorkflowStarted({ workflow_run_id: 'wr-1', task_id: 't-1' })
|
||||||
|
callbacks.onNodeStarted({ data: { id: 'node-run-1', node_id: 'node-1', title: 'Node 1' } })
|
||||||
|
callbacks.onNodeStarted({ data: { id: 'node-run-2', node_id: 'node-1', title: 'Node 1 retry' } })
|
||||||
|
callbacks.onNodeFinished({ data: { id: 'node-run-1', node_id: 'node-1', status: 'succeeded' } })
|
||||||
|
callbacks.onNodeFinished({ data: { id: 'node-run-2', node_id: 'node-1', status: 'succeeded' } })
|
||||||
|
})
|
||||||
|
|
||||||
|
const tracing = result.current.chatList[1].workflowProcess?.tracing ?? []
|
||||||
|
|
||||||
|
expect(tracing).toHaveLength(2)
|
||||||
|
expect(tracing).toEqual(expect.arrayContaining([
|
||||||
|
expect.objectContaining({ id: 'node-run-1', status: 'succeeded' }),
|
||||||
|
expect.objectContaining({ id: 'node-run-2', status: 'succeeded' }),
|
||||||
|
]))
|
||||||
|
})
|
||||||
|
|
||||||
it('should handle early exits in tracing events during iteration or loop', async () => {
|
it('should handle early exits in tracing events during iteration or loop', async () => {
|
||||||
let callbacks: HookCallbacks
|
let callbacks: HookCallbacks
|
||||||
|
|
||||||
|
|
@ -623,7 +683,7 @@ describe('useChat', () => {
|
||||||
callbacks.onNodeFinished({ data: { id: 'n-1', iteration_id: 'iter-1' } })
|
callbacks.onNodeFinished({ data: { id: 'n-1', iteration_id: 'iter-1' } })
|
||||||
})
|
})
|
||||||
|
|
||||||
const traceLen1 = result.current.chatList[result.current.chatList.length - 1].workflowProcess?.tracing?.length
|
const traceLen1 = result.current.chatList.at(-1)!.workflowProcess?.tracing?.length
|
||||||
expect(traceLen1).toBe(0) // None added due to iteration early hits
|
expect(traceLen1).toBe(0) // None added due to iteration early hits
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -707,7 +767,7 @@ describe('useChat', () => {
|
||||||
|
|
||||||
expect(result.current.chatList.some(item => item.id === 'question-m-child')).toBe(true)
|
expect(result.current.chatList.some(item => item.id === 'question-m-child')).toBe(true)
|
||||||
expect(result.current.chatList.some(item => item.id === 'm-child')).toBe(true)
|
expect(result.current.chatList.some(item => item.id === 'm-child')).toBe(true)
|
||||||
expect(result.current.chatList[result.current.chatList.length - 1].content).toBe('child answer')
|
expect(result.current.chatList.at(-1)!.content).toBe('child answer')
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should strip local file urls before sending payload', () => {
|
it('should strip local file urls before sending payload', () => {
|
||||||
|
|
@ -805,7 +865,7 @@ describe('useChat', () => {
|
||||||
})
|
})
|
||||||
|
|
||||||
expect(onGetConversationMessages).toHaveBeenCalled()
|
expect(onGetConversationMessages).toHaveBeenCalled()
|
||||||
expect(result.current.chatList[result.current.chatList.length - 1].content).toBe('streamed content')
|
expect(result.current.chatList.at(-1)!.content).toBe('streamed content')
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should clear suggested questions when suggestion fetch fails after completion', async () => {
|
it('should clear suggested questions when suggestion fetch fails after completion', async () => {
|
||||||
|
|
@ -851,7 +911,7 @@ describe('useChat', () => {
|
||||||
callbacks.onNodeFinished({ data: { node_id: 'n-loop', id: 'n-loop' } })
|
callbacks.onNodeFinished({ data: { node_id: 'n-loop', id: 'n-loop' } })
|
||||||
})
|
})
|
||||||
|
|
||||||
const latestResponse = result.current.chatList[result.current.chatList.length - 1]
|
const latestResponse = result.current.chatList.at(-1)!
|
||||||
expect(latestResponse.workflowProcess?.tracing).toHaveLength(0)
|
expect(latestResponse.workflowProcess?.tracing).toHaveLength(0)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -878,7 +938,7 @@ describe('useChat', () => {
|
||||||
callbacks.onTTSChunk('m-th-bind', '')
|
callbacks.onTTSChunk('m-th-bind', '')
|
||||||
})
|
})
|
||||||
|
|
||||||
const latestResponse = result.current.chatList[result.current.chatList.length - 1]
|
const latestResponse = result.current.chatList.at(-1)!
|
||||||
expect(latestResponse.id).toBe('m-th-bind')
|
expect(latestResponse.id).toBe('m-th-bind')
|
||||||
expect(latestResponse.conversationId).toBe('c-th-bind')
|
expect(latestResponse.conversationId).toBe('c-th-bind')
|
||||||
expect(latestResponse.workflowProcess?.status).toBe('succeeded')
|
expect(latestResponse.workflowProcess?.status).toBe('succeeded')
|
||||||
|
|
@ -971,7 +1031,7 @@ describe('useChat', () => {
|
||||||
callbacks.onCompleted()
|
callbacks.onCompleted()
|
||||||
})
|
})
|
||||||
|
|
||||||
const lastResponse = result.current.chatList[result.current.chatList.length - 1]
|
const lastResponse = result.current.chatList.at(-1)!
|
||||||
expect(lastResponse.agent_thoughts![0].thought).toContain('resumed')
|
expect(lastResponse.agent_thoughts![0].thought).toContain('resumed')
|
||||||
|
|
||||||
expect(lastResponse.workflowProcess?.tracing?.length).toBeGreaterThan(0)
|
expect(lastResponse.workflowProcess?.tracing?.length).toBeGreaterThan(0)
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ import type {
|
||||||
IOnDataMoreInfo,
|
IOnDataMoreInfo,
|
||||||
IOtherOptions,
|
IOtherOptions,
|
||||||
} from '@/service/base'
|
} from '@/service/base'
|
||||||
|
import type { NodeTracing } from '@/types/workflow'
|
||||||
import { uniqBy } from 'es-toolkit/compat'
|
import { uniqBy } from 'es-toolkit/compat'
|
||||||
import { noop } from 'es-toolkit/function'
|
import { noop } from 'es-toolkit/function'
|
||||||
import { produce, setAutoFreeze } from 'immer'
|
import { produce, setAutoFreeze } from 'immer'
|
||||||
|
|
@ -31,6 +32,7 @@ import {
|
||||||
} from '@/app/components/base/file-uploader/utils'
|
} from '@/app/components/base/file-uploader/utils'
|
||||||
import { useToastContext } from '@/app/components/base/toast/context'
|
import { useToastContext } from '@/app/components/base/toast/context'
|
||||||
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
||||||
|
import { upsertTopLevelTracingNodeOnStart } from '@/app/components/workflow/utils/top-level-tracing'
|
||||||
import useTimestamp from '@/hooks/use-timestamp'
|
import useTimestamp from '@/hooks/use-timestamp'
|
||||||
import { useParams, usePathname } from '@/next/navigation'
|
import { useParams, usePathname } from '@/next/navigation'
|
||||||
import {
|
import {
|
||||||
|
|
@ -52,6 +54,39 @@ type SendCallback = {
|
||||||
isPublicAPI?: boolean
|
isPublicAPI?: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ParallelTraceLike = Pick<NodeTracing, 'id' | 'node_id' | 'parallel_id' | 'execution_metadata'>
|
||||||
|
|
||||||
|
const findParallelTraceIndex = (
|
||||||
|
tracing: ParallelTraceLike[],
|
||||||
|
data: Partial<ParallelTraceLike>,
|
||||||
|
) => {
|
||||||
|
const incomingParallelId = data.execution_metadata?.parallel_id ?? data.parallel_id
|
||||||
|
|
||||||
|
if (data.id) {
|
||||||
|
const matchedByIdIndex = tracing.findIndex((item) => {
|
||||||
|
if (item.id !== data.id)
|
||||||
|
return false
|
||||||
|
|
||||||
|
const existingParallelId = item.execution_metadata?.parallel_id ?? item.parallel_id
|
||||||
|
if (!existingParallelId || !incomingParallelId)
|
||||||
|
return true
|
||||||
|
|
||||||
|
return existingParallelId === incomingParallelId
|
||||||
|
})
|
||||||
|
|
||||||
|
if (matchedByIdIndex > -1)
|
||||||
|
return matchedByIdIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
return tracing.findIndex((item) => {
|
||||||
|
if (item.node_id !== data.node_id)
|
||||||
|
return false
|
||||||
|
|
||||||
|
const existingParallelId = item.execution_metadata?.parallel_id ?? item.parallel_id
|
||||||
|
return existingParallelId === incomingParallelId
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
export const useChat = (
|
export const useChat = (
|
||||||
config?: ChatConfig,
|
config?: ChatConfig,
|
||||||
formSettings?: {
|
formSettings?: {
|
||||||
|
|
@ -419,8 +454,7 @@ export const useChat = (
|
||||||
if (!responseItem.workflowProcess?.tracing)
|
if (!responseItem.workflowProcess?.tracing)
|
||||||
return
|
return
|
||||||
const tracing = responseItem.workflowProcess.tracing
|
const tracing = responseItem.workflowProcess.tracing
|
||||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
const iterationIndex = findParallelTraceIndex(tracing, iterationFinishedData)
|
||||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
|
||||||
if (iterationIndex > -1) {
|
if (iterationIndex > -1) {
|
||||||
tracing[iterationIndex] = {
|
tracing[iterationIndex] = {
|
||||||
...tracing[iterationIndex],
|
...tracing[iterationIndex],
|
||||||
|
|
@ -432,38 +466,34 @@ export const useChat = (
|
||||||
},
|
},
|
||||||
onNodeStarted: ({ data: nodeStartedData }) => {
|
onNodeStarted: ({ data: nodeStartedData }) => {
|
||||||
updateChatTreeNode(messageId, (responseItem) => {
|
updateChatTreeNode(messageId, (responseItem) => {
|
||||||
|
if (params.loop_id)
|
||||||
|
return
|
||||||
|
|
||||||
if (!responseItem.workflowProcess)
|
if (!responseItem.workflowProcess)
|
||||||
return
|
return
|
||||||
if (!responseItem.workflowProcess.tracing)
|
if (!responseItem.workflowProcess.tracing)
|
||||||
responseItem.workflowProcess.tracing = []
|
responseItem.workflowProcess.tracing = []
|
||||||
|
|
||||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess.tracing, {
|
||||||
// if the node is already started, update the node
|
...nodeStartedData,
|
||||||
if (currentIndex > -1) {
|
status: NodeRunningStatus.Running,
|
||||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
})
|
||||||
...nodeStartedData,
|
|
||||||
status: NodeRunningStatus.Running,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if (nodeStartedData.iteration_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
responseItem.workflowProcess.tracing.push({
|
|
||||||
...nodeStartedData,
|
|
||||||
status: WorkflowRunningStatus.Running,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||||
updateChatTreeNode(messageId, (responseItem) => {
|
updateChatTreeNode(messageId, (responseItem) => {
|
||||||
|
if (params.loop_id)
|
||||||
|
return
|
||||||
|
|
||||||
if (!responseItem.workflowProcess?.tracing)
|
if (!responseItem.workflowProcess?.tracing)
|
||||||
return
|
return
|
||||||
|
|
||||||
if (nodeFinishedData.iteration_id)
|
if (nodeFinishedData.iteration_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if (nodeFinishedData.loop_id)
|
||||||
|
return
|
||||||
|
|
||||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
||||||
if (!item.execution_metadata?.parallel_id)
|
if (!item.execution_metadata?.parallel_id)
|
||||||
return item.id === nodeFinishedData.id
|
return item.id === nodeFinishedData.id
|
||||||
|
|
@ -505,8 +535,7 @@ export const useChat = (
|
||||||
if (!responseItem.workflowProcess?.tracing)
|
if (!responseItem.workflowProcess?.tracing)
|
||||||
return
|
return
|
||||||
const tracing = responseItem.workflowProcess.tracing
|
const tracing = responseItem.workflowProcess.tracing
|
||||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
const loopIndex = findParallelTraceIndex(tracing, loopFinishedData)
|
||||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
|
||||||
if (loopIndex > -1) {
|
if (loopIndex > -1) {
|
||||||
tracing[loopIndex] = {
|
tracing[loopIndex] = {
|
||||||
...tracing[loopIndex],
|
...tracing[loopIndex],
|
||||||
|
|
@ -582,7 +611,7 @@ export const useChat = (
|
||||||
{},
|
{},
|
||||||
otherOptions,
|
otherOptions,
|
||||||
)
|
)
|
||||||
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer])
|
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager, config?.suggested_questions_after_answer, params.loop_id])
|
||||||
|
|
||||||
const updateCurrentQAOnTree = useCallback(({
|
const updateCurrentQAOnTree = useCallback(({
|
||||||
parentId,
|
parentId,
|
||||||
|
|
@ -972,12 +1001,13 @@ export const useChat = (
|
||||||
},
|
},
|
||||||
onIterationFinish: ({ data: iterationFinishedData }) => {
|
onIterationFinish: ({ data: iterationFinishedData }) => {
|
||||||
const tracing = responseItem.workflowProcess!.tracing!
|
const tracing = responseItem.workflowProcess!.tracing!
|
||||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
const iterationIndex = findParallelTraceIndex(tracing, iterationFinishedData)
|
||||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
if (iterationIndex > -1) {
|
||||||
tracing[iterationIndex] = {
|
tracing[iterationIndex] = {
|
||||||
...tracing[iterationIndex],
|
...tracing[iterationIndex],
|
||||||
...iterationFinishedData,
|
...iterationFinishedData,
|
||||||
status: WorkflowRunningStatus.Succeeded,
|
status: WorkflowRunningStatus.Succeeded,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
updateCurrentQAOnTree({
|
updateCurrentQAOnTree({
|
||||||
|
|
@ -988,30 +1018,19 @@ export const useChat = (
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
onNodeStarted: ({ data: nodeStartedData }) => {
|
onNodeStarted: ({ data: nodeStartedData }) => {
|
||||||
|
// `data` is the outer send payload for this request; loop child runs should not emit top-level node traces here.
|
||||||
|
if (data.loop_id)
|
||||||
|
return
|
||||||
|
|
||||||
if (!responseItem.workflowProcess)
|
if (!responseItem.workflowProcess)
|
||||||
return
|
return
|
||||||
if (!responseItem.workflowProcess.tracing)
|
if (!responseItem.workflowProcess.tracing)
|
||||||
responseItem.workflowProcess.tracing = []
|
responseItem.workflowProcess.tracing = []
|
||||||
|
|
||||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess.tracing, {
|
||||||
if (currentIndex > -1) {
|
...nodeStartedData,
|
||||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
status: NodeRunningStatus.Running,
|
||||||
...nodeStartedData,
|
})
|
||||||
status: NodeRunningStatus.Running,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if (nodeStartedData.iteration_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
if (data.loop_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
responseItem.workflowProcess.tracing.push({
|
|
||||||
...nodeStartedData,
|
|
||||||
status: WorkflowRunningStatus.Running,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
updateCurrentQAOnTree({
|
updateCurrentQAOnTree({
|
||||||
placeholderQuestionId,
|
placeholderQuestionId,
|
||||||
questionItem,
|
questionItem,
|
||||||
|
|
@ -1020,10 +1039,14 @@ export const useChat = (
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||||
|
// Use the outer request payload here as well so loop child runs skip top-level finish handling entirely.
|
||||||
|
if (data.loop_id)
|
||||||
|
return
|
||||||
|
|
||||||
if (nodeFinishedData.iteration_id)
|
if (nodeFinishedData.iteration_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
if (data.loop_id)
|
if (nodeFinishedData.loop_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex((item) => {
|
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex((item) => {
|
||||||
|
|
@ -1069,12 +1092,13 @@ export const useChat = (
|
||||||
},
|
},
|
||||||
onLoopFinish: ({ data: loopFinishedData }) => {
|
onLoopFinish: ({ data: loopFinishedData }) => {
|
||||||
const tracing = responseItem.workflowProcess!.tracing!
|
const tracing = responseItem.workflowProcess!.tracing!
|
||||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
const loopIndex = findParallelTraceIndex(tracing, loopFinishedData)
|
||||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
if (loopIndex > -1) {
|
||||||
tracing[loopIndex] = {
|
tracing[loopIndex] = {
|
||||||
...tracing[loopIndex],
|
...tracing[loopIndex],
|
||||||
...loopFinishedData,
|
...loopFinishedData,
|
||||||
status: WorkflowRunningStatus.Succeeded,
|
status: WorkflowRunningStatus.Succeeded,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
updateCurrentQAOnTree({
|
updateCurrentQAOnTree({
|
||||||
|
|
|
||||||
|
|
@ -264,7 +264,7 @@ describe('UrlInput', () => {
|
||||||
|
|
||||||
render(<UrlInput {...props} />)
|
render(<UrlInput {...props} />)
|
||||||
const input = screen.getByRole('textbox')
|
const input = screen.getByRole('textbox')
|
||||||
await userEvent.type(input, longUrl)
|
fireEvent.change(input, { target: { value: longUrl } })
|
||||||
|
|
||||||
expect(input).toHaveValue(longUrl)
|
expect(input).toHaveValue(longUrl)
|
||||||
})
|
})
|
||||||
|
|
@ -275,7 +275,7 @@ describe('UrlInput', () => {
|
||||||
|
|
||||||
render(<UrlInput {...props} />)
|
render(<UrlInput {...props} />)
|
||||||
const input = screen.getByRole('textbox')
|
const input = screen.getByRole('textbox')
|
||||||
await userEvent.type(input, unicodeUrl)
|
fireEvent.change(input, { target: { value: unicodeUrl } })
|
||||||
|
|
||||||
expect(input).toHaveValue(unicodeUrl)
|
expect(input).toHaveValue(unicodeUrl)
|
||||||
})
|
})
|
||||||
|
|
@ -285,7 +285,7 @@ describe('UrlInput', () => {
|
||||||
|
|
||||||
render(<UrlInput {...props} />)
|
render(<UrlInput {...props} />)
|
||||||
const input = screen.getByRole('textbox')
|
const input = screen.getByRole('textbox')
|
||||||
await userEvent.type(input, 'https://rapid.com', { delay: 1 })
|
fireEvent.change(input, { target: { value: 'https://rapid.com' } })
|
||||||
|
|
||||||
expect(input).toHaveValue('https://rapid.com')
|
expect(input).toHaveValue('https://rapid.com')
|
||||||
})
|
})
|
||||||
|
|
@ -297,7 +297,7 @@ describe('UrlInput', () => {
|
||||||
|
|
||||||
render(<UrlInput {...props} />)
|
render(<UrlInput {...props} />)
|
||||||
const input = screen.getByRole('textbox')
|
const input = screen.getByRole('textbox')
|
||||||
await userEvent.type(input, 'https://enter.com')
|
fireEvent.change(input, { target: { value: 'https://enter.com' } })
|
||||||
|
|
||||||
// Focus button and press enter
|
// Focus button and press enter
|
||||||
const button = screen.getByRole('button', { name: /run/i })
|
const button = screen.getByRole('button', { name: /run/i })
|
||||||
|
|
|
||||||
|
|
@ -157,7 +157,7 @@ describe('useDatasetCardState', () => {
|
||||||
expect(result.current.modalState.showRenameModal).toBe(false)
|
expect(result.current.modalState.showRenameModal).toBe(false)
|
||||||
})
|
})
|
||||||
|
|
||||||
it('should close confirm delete modal when closeConfirmDelete is called', () => {
|
it('should close confirm delete modal when closeConfirmDelete is called', async () => {
|
||||||
const dataset = createMockDataset()
|
const dataset = createMockDataset()
|
||||||
const { result } = renderHook(() =>
|
const { result } = renderHook(() =>
|
||||||
useDatasetCardState({ dataset, onSuccess: vi.fn() }),
|
useDatasetCardState({ dataset, onSuccess: vi.fn() }),
|
||||||
|
|
@ -168,7 +168,7 @@ describe('useDatasetCardState', () => {
|
||||||
result.current.detectIsUsedByApp()
|
result.current.detectIsUsedByApp()
|
||||||
})
|
})
|
||||||
|
|
||||||
waitFor(() => {
|
await waitFor(() => {
|
||||||
expect(result.current.modalState.showConfirmDelete).toBe(true)
|
expect(result.current.modalState.showConfirmDelete).toBe(true)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -101,6 +101,7 @@ const createHumanInput = (overrides: Partial<HumanInputFormData> = {}): HumanInp
|
||||||
describe('workflow-stream-handlers helpers', () => {
|
describe('workflow-stream-handlers helpers', () => {
|
||||||
it('should update tracing, result text, and human input state', () => {
|
it('should update tracing, result text, and human input state', () => {
|
||||||
const parallelTrace = createTrace({
|
const parallelTrace = createTrace({
|
||||||
|
id: 'parallel-trace-1',
|
||||||
node_id: 'parallel-node',
|
node_id: 'parallel-node',
|
||||||
execution_metadata: { parallel_id: 'parallel-1' },
|
execution_metadata: { parallel_id: 'parallel-1' },
|
||||||
details: [[]],
|
details: [[]],
|
||||||
|
|
@ -109,11 +110,13 @@ describe('workflow-stream-handlers helpers', () => {
|
||||||
let workflowProcessData = appendParallelStart(undefined, parallelTrace)
|
let workflowProcessData = appendParallelStart(undefined, parallelTrace)
|
||||||
workflowProcessData = appendParallelNext(workflowProcessData, parallelTrace)
|
workflowProcessData = appendParallelNext(workflowProcessData, parallelTrace)
|
||||||
workflowProcessData = finishParallelTrace(workflowProcessData, createTrace({
|
workflowProcessData = finishParallelTrace(workflowProcessData, createTrace({
|
||||||
|
id: 'parallel-trace-1',
|
||||||
node_id: 'parallel-node',
|
node_id: 'parallel-node',
|
||||||
execution_metadata: { parallel_id: 'parallel-1' },
|
execution_metadata: { parallel_id: 'parallel-1' },
|
||||||
error: 'failed',
|
error: 'failed',
|
||||||
}))
|
}))
|
||||||
workflowProcessData = upsertWorkflowNode(workflowProcessData, createTrace({
|
workflowProcessData = upsertWorkflowNode(workflowProcessData, createTrace({
|
||||||
|
id: 'node-trace-1',
|
||||||
node_id: 'node-1',
|
node_id: 'node-1',
|
||||||
execution_metadata: { parallel_id: 'parallel-2' },
|
execution_metadata: { parallel_id: 'parallel-2' },
|
||||||
}))!
|
}))!
|
||||||
|
|
@ -160,6 +163,129 @@ describe('workflow-stream-handlers helpers', () => {
|
||||||
expect(nextProcess.tracing[0]?.details).toEqual([[], []])
|
expect(nextProcess.tracing[0]?.details).toEqual([[], []])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should keep separate iteration and loop traces for repeated executions with different ids', () => {
|
||||||
|
const process = createWorkflowProcess()
|
||||||
|
process.tracing = [
|
||||||
|
createTrace({
|
||||||
|
id: 'iter-trace-1',
|
||||||
|
node_id: 'iter-1',
|
||||||
|
details: [[]],
|
||||||
|
}),
|
||||||
|
createTrace({
|
||||||
|
id: 'iter-trace-2',
|
||||||
|
node_id: 'iter-1',
|
||||||
|
details: [[]],
|
||||||
|
}),
|
||||||
|
createTrace({
|
||||||
|
id: 'loop-trace-1',
|
||||||
|
node_id: 'loop-1',
|
||||||
|
details: [[]],
|
||||||
|
}),
|
||||||
|
createTrace({
|
||||||
|
id: 'loop-trace-2',
|
||||||
|
node_id: 'loop-1',
|
||||||
|
details: [[]],
|
||||||
|
}),
|
||||||
|
]
|
||||||
|
|
||||||
|
const iterNextProcess = appendParallelNext(process, createTrace({
|
||||||
|
id: 'iter-trace-2',
|
||||||
|
node_id: 'iter-1',
|
||||||
|
}))
|
||||||
|
const iterFinishedProcess = finishParallelTrace(iterNextProcess, createTrace({
|
||||||
|
id: 'iter-trace-2',
|
||||||
|
node_id: 'iter-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
details: undefined,
|
||||||
|
}))
|
||||||
|
const loopNextProcess = appendParallelNext(iterFinishedProcess, createTrace({
|
||||||
|
id: 'loop-trace-2',
|
||||||
|
node_id: 'loop-1',
|
||||||
|
}))
|
||||||
|
const loopFinishedProcess = finishParallelTrace(loopNextProcess, createTrace({
|
||||||
|
id: 'loop-trace-2',
|
||||||
|
node_id: 'loop-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
details: undefined,
|
||||||
|
}))
|
||||||
|
|
||||||
|
expect(loopFinishedProcess.tracing[0]).toEqual(expect.objectContaining({
|
||||||
|
id: 'iter-trace-1',
|
||||||
|
details: [[]],
|
||||||
|
status: NodeRunningStatus.Running,
|
||||||
|
}))
|
||||||
|
expect(loopFinishedProcess.tracing[1]).toEqual(expect.objectContaining({
|
||||||
|
id: 'iter-trace-2',
|
||||||
|
details: [[], []],
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
}))
|
||||||
|
expect(loopFinishedProcess.tracing[2]).toEqual(expect.objectContaining({
|
||||||
|
id: 'loop-trace-1',
|
||||||
|
details: [[]],
|
||||||
|
status: NodeRunningStatus.Running,
|
||||||
|
}))
|
||||||
|
expect(loopFinishedProcess.tracing[3]).toEqual(expect.objectContaining({
|
||||||
|
id: 'loop-trace-2',
|
||||||
|
details: [[], []],
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should append a new top-level trace when the same node starts with a different execution id', () => {
|
||||||
|
const process = createWorkflowProcess()
|
||||||
|
process.tracing = [
|
||||||
|
createTrace({
|
||||||
|
id: 'trace-1',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
}),
|
||||||
|
]
|
||||||
|
|
||||||
|
const updatedProcess = upsertWorkflowNode(process, createTrace({
|
||||||
|
id: 'trace-2',
|
||||||
|
node_id: 'node-1',
|
||||||
|
}))!
|
||||||
|
|
||||||
|
expect(updatedProcess.tracing).toHaveLength(2)
|
||||||
|
expect(updatedProcess.tracing[1]).toEqual(expect.objectContaining({
|
||||||
|
id: 'trace-2',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Running,
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should finish the matching top-level trace when the same node runs again with a new execution id', () => {
|
||||||
|
const process = createWorkflowProcess()
|
||||||
|
process.tracing = [
|
||||||
|
createTrace({
|
||||||
|
id: 'trace-1',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
}),
|
||||||
|
createTrace({
|
||||||
|
id: 'trace-2',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Running,
|
||||||
|
}),
|
||||||
|
]
|
||||||
|
|
||||||
|
const updatedProcess = finishWorkflowNode(process, createTrace({
|
||||||
|
id: 'trace-2',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
}))!
|
||||||
|
|
||||||
|
expect(updatedProcess.tracing).toHaveLength(2)
|
||||||
|
expect(updatedProcess.tracing[0]).toEqual(expect.objectContaining({
|
||||||
|
id: 'trace-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
}))
|
||||||
|
expect(updatedProcess.tracing[1]).toEqual(expect.objectContaining({
|
||||||
|
id: 'trace-2',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
it('should leave tracing unchanged when a parallel next event has no matching trace', () => {
|
it('should leave tracing unchanged when a parallel next event has no matching trace', () => {
|
||||||
const process = createWorkflowProcess()
|
const process = createWorkflowProcess()
|
||||||
process.tracing = [
|
process.tracing = [
|
||||||
|
|
@ -171,6 +297,7 @@ describe('workflow-stream-handlers helpers', () => {
|
||||||
]
|
]
|
||||||
|
|
||||||
const nextProcess = appendParallelNext(process, createTrace({
|
const nextProcess = appendParallelNext(process, createTrace({
|
||||||
|
id: 'trace-missing',
|
||||||
node_id: 'missing-node',
|
node_id: 'missing-node',
|
||||||
execution_metadata: { parallel_id: 'parallel-2' },
|
execution_metadata: { parallel_id: 'parallel-2' },
|
||||||
}))
|
}))
|
||||||
|
|
@ -228,6 +355,7 @@ describe('workflow-stream-handlers helpers', () => {
|
||||||
},
|
},
|
||||||
}))
|
}))
|
||||||
const notFinished = finishParallelTrace(process, createTrace({
|
const notFinished = finishParallelTrace(process, createTrace({
|
||||||
|
id: 'trace-missing',
|
||||||
node_id: 'missing',
|
node_id: 'missing',
|
||||||
execution_metadata: {
|
execution_metadata: {
|
||||||
parallel_id: 'parallel-missing',
|
parallel_id: 'parallel-missing',
|
||||||
|
|
@ -243,6 +371,7 @@ describe('workflow-stream-handlers helpers', () => {
|
||||||
loop_id: 'loop-1',
|
loop_id: 'loop-1',
|
||||||
}))
|
}))
|
||||||
const unmatchedFinish = finishWorkflowNode(process, createTrace({
|
const unmatchedFinish = finishWorkflowNode(process, createTrace({
|
||||||
|
id: 'trace-missing',
|
||||||
node_id: 'missing',
|
node_id: 'missing',
|
||||||
execution_metadata: {
|
execution_metadata: {
|
||||||
parallel_id: 'missing',
|
parallel_id: 'missing',
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import type { HumanInputFormTimeoutData, NodeTracing, WorkflowFinishedResponse }
|
||||||
import { produce } from 'immer'
|
import { produce } from 'immer'
|
||||||
import { getFilesInLogs } from '@/app/components/base/file-uploader/utils'
|
import { getFilesInLogs } from '@/app/components/base/file-uploader/utils'
|
||||||
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
import { NodeRunningStatus, WorkflowRunningStatus } from '@/app/components/workflow/types'
|
||||||
|
import { upsertTopLevelTracingNodeOnStart } from '@/app/components/workflow/utils/top-level-tracing'
|
||||||
import { sseGet } from '@/service/base'
|
import { sseGet } from '@/service/base'
|
||||||
|
|
||||||
type Notify = (payload: { type: 'error' | 'warning', message: string }) => void
|
type Notify = (payload: { type: 'error' | 'warning', message: string }) => void
|
||||||
|
|
@ -49,6 +50,15 @@ const matchParallelTrace = (trace: WorkflowProcess['tracing'][number], data: Nod
|
||||||
|| trace.parallel_id === data.execution_metadata?.parallel_id)
|
|| trace.parallel_id === data.execution_metadata?.parallel_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const findParallelTraceIndex = (tracing: WorkflowProcess['tracing'], data: NodeTracing) => {
|
||||||
|
return tracing.findIndex((trace) => {
|
||||||
|
if (trace.id && data.id)
|
||||||
|
return trace.id === data.id
|
||||||
|
|
||||||
|
return matchParallelTrace(trace, data)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
const ensureParallelTraceDetails = (details?: NodeTracing['details']) => {
|
const ensureParallelTraceDetails = (details?: NodeTracing['details']) => {
|
||||||
return details?.length ? details : [[]]
|
return details?.length ? details : [[]]
|
||||||
}
|
}
|
||||||
|
|
@ -68,7 +78,8 @@ const appendParallelStart = (current: WorkflowProcess | undefined, data: NodeTra
|
||||||
const appendParallelNext = (current: WorkflowProcess | undefined, data: NodeTracing) => {
|
const appendParallelNext = (current: WorkflowProcess | undefined, data: NodeTracing) => {
|
||||||
return updateWorkflowProcess(current, (draft) => {
|
return updateWorkflowProcess(current, (draft) => {
|
||||||
draft.expand = true
|
draft.expand = true
|
||||||
const trace = draft.tracing.find(item => matchParallelTrace(item, data))
|
const traceIndex = findParallelTraceIndex(draft.tracing, data)
|
||||||
|
const trace = draft.tracing[traceIndex]
|
||||||
if (!trace)
|
if (!trace)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
@ -80,10 +91,13 @@ const appendParallelNext = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||||
const finishParallelTrace = (current: WorkflowProcess | undefined, data: NodeTracing) => {
|
const finishParallelTrace = (current: WorkflowProcess | undefined, data: NodeTracing) => {
|
||||||
return updateWorkflowProcess(current, (draft) => {
|
return updateWorkflowProcess(current, (draft) => {
|
||||||
draft.expand = true
|
draft.expand = true
|
||||||
const traceIndex = draft.tracing.findIndex(item => matchParallelTrace(item, data))
|
const traceIndex = findParallelTraceIndex(draft.tracing, data)
|
||||||
if (traceIndex > -1) {
|
if (traceIndex > -1) {
|
||||||
|
const currentTrace = draft.tracing[traceIndex]
|
||||||
draft.tracing[traceIndex] = {
|
draft.tracing[traceIndex] = {
|
||||||
|
...currentTrace,
|
||||||
...data,
|
...data,
|
||||||
|
details: data.details ?? currentTrace.details,
|
||||||
expand: !!data.error,
|
expand: !!data.error,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -96,17 +110,22 @@ const upsertWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||||
|
|
||||||
return updateWorkflowProcess(current, (draft) => {
|
return updateWorkflowProcess(current, (draft) => {
|
||||||
draft.expand = true
|
draft.expand = true
|
||||||
const currentIndex = draft.tracing.findIndex(item => item.node_id === data.node_id)
|
|
||||||
const nextTrace = {
|
const nextTrace = {
|
||||||
...data,
|
...data,
|
||||||
status: NodeRunningStatus.Running,
|
status: NodeRunningStatus.Running,
|
||||||
expand: true,
|
expand: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (currentIndex > -1)
|
upsertTopLevelTracingNodeOnStart(draft.tracing, nextTrace)
|
||||||
draft.tracing[currentIndex] = nextTrace
|
})
|
||||||
else
|
}
|
||||||
draft.tracing.push(nextTrace)
|
|
||||||
|
const findWorkflowNodeTraceIndex = (tracing: WorkflowProcess['tracing'], data: NodeTracing) => {
|
||||||
|
return tracing.findIndex((trace) => {
|
||||||
|
if (trace.id && data.id)
|
||||||
|
return trace.id === data.id
|
||||||
|
|
||||||
|
return matchParallelTrace(trace, data)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -115,7 +134,7 @@ const finishWorkflowNode = (current: WorkflowProcess | undefined, data: NodeTrac
|
||||||
return current
|
return current
|
||||||
|
|
||||||
return updateWorkflowProcess(current, (draft) => {
|
return updateWorkflowProcess(current, (draft) => {
|
||||||
const currentIndex = draft.tracing.findIndex(trace => matchParallelTrace(trace, data))
|
const currentIndex = findWorkflowNodeTraceIndex(draft.tracing, data)
|
||||||
if (currentIndex > -1) {
|
if (currentIndex > -1) {
|
||||||
draft.tracing[currentIndex] = {
|
draft.tracing[currentIndex] = {
|
||||||
...(draft.tracing[currentIndex].extras
|
...(draft.tracing[currentIndex].extras
|
||||||
|
|
|
||||||
|
|
@ -109,13 +109,13 @@ describe('useWorkflowAgentLog', () => {
|
||||||
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
||||||
initialStoreState: {
|
initialStoreState: {
|
||||||
workflowRunningData: baseRunningData({
|
workflowRunningData: baseRunningData({
|
||||||
tracing: [{ node_id: 'n1', execution_metadata: {} }],
|
tracing: [{ id: 'trace-1', node_id: 'n1', execution_metadata: {} }],
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
result.current.handleWorkflowAgentLog({
|
result.current.handleWorkflowAgentLog({
|
||||||
data: { node_id: 'n1', message_id: 'm1' },
|
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm1' },
|
||||||
} as AgentLogResponse)
|
} as AgentLogResponse)
|
||||||
|
|
||||||
const trace = store.getState().workflowRunningData!.tracing![0]
|
const trace = store.getState().workflowRunningData!.tracing![0]
|
||||||
|
|
@ -128,6 +128,7 @@ describe('useWorkflowAgentLog', () => {
|
||||||
initialStoreState: {
|
initialStoreState: {
|
||||||
workflowRunningData: baseRunningData({
|
workflowRunningData: baseRunningData({
|
||||||
tracing: [{
|
tracing: [{
|
||||||
|
id: 'trace-1',
|
||||||
node_id: 'n1',
|
node_id: 'n1',
|
||||||
execution_metadata: { agent_log: [{ message_id: 'm1', text: 'log1' }] },
|
execution_metadata: { agent_log: [{ message_id: 'm1', text: 'log1' }] },
|
||||||
}],
|
}],
|
||||||
|
|
@ -136,7 +137,7 @@ describe('useWorkflowAgentLog', () => {
|
||||||
})
|
})
|
||||||
|
|
||||||
result.current.handleWorkflowAgentLog({
|
result.current.handleWorkflowAgentLog({
|
||||||
data: { node_id: 'n1', message_id: 'm2' },
|
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm2' },
|
||||||
} as AgentLogResponse)
|
} as AgentLogResponse)
|
||||||
|
|
||||||
expect(store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log).toHaveLength(2)
|
expect(store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log).toHaveLength(2)
|
||||||
|
|
@ -147,6 +148,7 @@ describe('useWorkflowAgentLog', () => {
|
||||||
initialStoreState: {
|
initialStoreState: {
|
||||||
workflowRunningData: baseRunningData({
|
workflowRunningData: baseRunningData({
|
||||||
tracing: [{
|
tracing: [{
|
||||||
|
id: 'trace-1',
|
||||||
node_id: 'n1',
|
node_id: 'n1',
|
||||||
execution_metadata: { agent_log: [{ message_id: 'm1', text: 'old' }] },
|
execution_metadata: { agent_log: [{ message_id: 'm1', text: 'old' }] },
|
||||||
}],
|
}],
|
||||||
|
|
@ -155,7 +157,7 @@ describe('useWorkflowAgentLog', () => {
|
||||||
})
|
})
|
||||||
|
|
||||||
result.current.handleWorkflowAgentLog({
|
result.current.handleWorkflowAgentLog({
|
||||||
data: { node_id: 'n1', message_id: 'm1', text: 'new' },
|
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm1', text: 'new' },
|
||||||
} as unknown as AgentLogResponse)
|
} as unknown as AgentLogResponse)
|
||||||
|
|
||||||
const log = store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log!
|
const log = store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log!
|
||||||
|
|
@ -167,17 +169,39 @@ describe('useWorkflowAgentLog', () => {
|
||||||
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
||||||
initialStoreState: {
|
initialStoreState: {
|
||||||
workflowRunningData: baseRunningData({
|
workflowRunningData: baseRunningData({
|
||||||
tracing: [{ node_id: 'n1' }],
|
tracing: [{ id: 'trace-1', node_id: 'n1' }],
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
result.current.handleWorkflowAgentLog({
|
result.current.handleWorkflowAgentLog({
|
||||||
data: { node_id: 'n1', message_id: 'm1' },
|
data: { node_id: 'n1', node_execution_id: 'trace-1', message_id: 'm1' },
|
||||||
} as AgentLogResponse)
|
} as AgentLogResponse)
|
||||||
|
|
||||||
expect(store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log).toHaveLength(1)
|
expect(store.getState().workflowRunningData!.tracing![0].execution_metadata!.agent_log).toHaveLength(1)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should attach the log to the matching execution id when a node runs multiple times', () => {
|
||||||
|
const { result, store } = renderWorkflowHook(() => useWorkflowAgentLog(), {
|
||||||
|
initialStoreState: {
|
||||||
|
workflowRunningData: baseRunningData({
|
||||||
|
tracing: [
|
||||||
|
{ id: 'trace-1', node_id: 'n1', execution_metadata: {} },
|
||||||
|
{ id: 'trace-2', node_id: 'n1', execution_metadata: {} },
|
||||||
|
],
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
result.current.handleWorkflowAgentLog({
|
||||||
|
data: { node_id: 'n1', node_execution_id: 'trace-2', message_id: 'm2' },
|
||||||
|
} as AgentLogResponse)
|
||||||
|
|
||||||
|
const tracing = store.getState().workflowRunningData!.tracing!
|
||||||
|
expect(tracing[0].execution_metadata!.agent_log).toBeUndefined()
|
||||||
|
expect(tracing[1].execution_metadata!.agent_log).toHaveLength(1)
|
||||||
|
expect(tracing[1].execution_metadata!.agent_log![0].message_id).toBe('m2')
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe('useWorkflowNodeHumanInputFormFilled', () => {
|
describe('useWorkflowNodeHumanInputFormFilled', () => {
|
||||||
|
|
|
||||||
|
|
@ -109,7 +109,7 @@ describe('useWorkflowNodeStarted', () => {
|
||||||
|
|
||||||
act(() => {
|
act(() => {
|
||||||
result.current.handleWorkflowNodeStarted(
|
result.current.handleWorkflowNodeStarted(
|
||||||
{ data: { node_id: 'n1' } } as NodeStartedResponse,
|
{ data: { id: 'trace-n1', node_id: 'n1' } } as NodeStartedResponse,
|
||||||
containerParams,
|
containerParams,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
@ -138,7 +138,7 @@ describe('useWorkflowNodeStarted', () => {
|
||||||
|
|
||||||
act(() => {
|
act(() => {
|
||||||
result.current.handleWorkflowNodeStarted(
|
result.current.handleWorkflowNodeStarted(
|
||||||
{ data: { node_id: 'n2' } } as NodeStartedResponse,
|
{ data: { id: 'trace-n2', node_id: 'n2' } } as NodeStartedResponse,
|
||||||
containerParams,
|
containerParams,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
@ -157,8 +157,8 @@ describe('useWorkflowNodeStarted', () => {
|
||||||
initialStoreState: {
|
initialStoreState: {
|
||||||
workflowRunningData: baseRunningData({
|
workflowRunningData: baseRunningData({
|
||||||
tracing: [
|
tracing: [
|
||||||
{ node_id: 'n0', status: NodeRunningStatus.Succeeded },
|
{ id: 'trace-0', node_id: 'n0', status: NodeRunningStatus.Succeeded },
|
||||||
{ node_id: 'n1', status: NodeRunningStatus.Succeeded },
|
{ id: 'trace-1', node_id: 'n1', status: NodeRunningStatus.Succeeded },
|
||||||
],
|
],
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
|
|
@ -166,7 +166,7 @@ describe('useWorkflowNodeStarted', () => {
|
||||||
|
|
||||||
act(() => {
|
act(() => {
|
||||||
result.current.handleWorkflowNodeStarted(
|
result.current.handleWorkflowNodeStarted(
|
||||||
{ data: { node_id: 'n1' } } as NodeStartedResponse,
|
{ data: { id: 'trace-1', node_id: 'n1' } } as NodeStartedResponse,
|
||||||
containerParams,
|
containerParams,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
@ -175,6 +175,32 @@ describe('useWorkflowNodeStarted', () => {
|
||||||
expect(tracing).toHaveLength(2)
|
expect(tracing).toHaveLength(2)
|
||||||
expect(tracing[1].status).toBe(NodeRunningStatus.Running)
|
expect(tracing[1].status).toBe(NodeRunningStatus.Running)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should append a new tracing entry when the same node starts a new execution id', () => {
|
||||||
|
const { result, store } = renderViewportHook(() => useWorkflowNodeStarted(), {
|
||||||
|
initialStoreState: {
|
||||||
|
workflowRunningData: baseRunningData({
|
||||||
|
tracing: [
|
||||||
|
{ id: 'trace-0', node_id: 'n0', status: NodeRunningStatus.Succeeded },
|
||||||
|
{ id: 'trace-1', node_id: 'n1', status: NodeRunningStatus.Succeeded },
|
||||||
|
],
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
act(() => {
|
||||||
|
result.current.handleWorkflowNodeStarted(
|
||||||
|
{ data: { id: 'trace-2', node_id: 'n1' } } as NodeStartedResponse,
|
||||||
|
containerParams,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
const tracing = store.getState().workflowRunningData!.tracing!
|
||||||
|
expect(tracing).toHaveLength(3)
|
||||||
|
expect(tracing[2].id).toBe('trace-2')
|
||||||
|
expect(tracing[2].node_id).toBe('n1')
|
||||||
|
expect(tracing[2].status).toBe(NodeRunningStatus.Running)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe('useWorkflowNodeIterationStarted', () => {
|
describe('useWorkflowNodeIterationStarted', () => {
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ export const useWorkflowAgentLog = () => {
|
||||||
} = workflowStore.getState()
|
} = workflowStore.getState()
|
||||||
|
|
||||||
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
|
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
|
||||||
const currentIndex = draft.tracing!.findIndex(item => item.node_id === data.node_id)
|
const currentIndex = draft.tracing!.findIndex(item => item.id === data.node_execution_id)
|
||||||
if (currentIndex > -1) {
|
if (currentIndex > -1) {
|
||||||
const current = draft.tracing![currentIndex]
|
const current = draft.tracing![currentIndex]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,8 +33,8 @@ export const useWorkflowNodeStarted = () => {
|
||||||
transform,
|
transform,
|
||||||
} = store.getState()
|
} = store.getState()
|
||||||
const nodes = getNodes()
|
const nodes = getNodes()
|
||||||
const currentIndex = workflowRunningData?.tracing?.findIndex(item => item.node_id === data.node_id)
|
const currentIndex = workflowRunningData?.tracing?.findIndex(item => item.id === data.id)
|
||||||
if (currentIndex && currentIndex > -1) {
|
if (currentIndex !== undefined && currentIndex > -1) {
|
||||||
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
|
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
|
||||||
draft.tracing![currentIndex] = {
|
draft.tracing![currentIndex] = {
|
||||||
...data,
|
...data,
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,7 @@ import {
|
||||||
import { useHooksStore } from '../../hooks-store'
|
import { useHooksStore } from '../../hooks-store'
|
||||||
import { useWorkflowStore } from '../../store'
|
import { useWorkflowStore } from '../../store'
|
||||||
import { NodeRunningStatus, WorkflowRunningStatus } from '../../types'
|
import { NodeRunningStatus, WorkflowRunningStatus } from '../../types'
|
||||||
|
import { upsertTopLevelTracingNodeOnStart } from '../../utils/top-level-tracing'
|
||||||
|
|
||||||
type GetAbortController = (abortController: AbortController) => void
|
type GetAbortController = (abortController: AbortController) => void
|
||||||
type SendCallback = {
|
type SendCallback = {
|
||||||
|
|
@ -509,19 +510,13 @@ export const useChat = (
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
onNodeStarted: ({ data }) => {
|
onNodeStarted: ({ data }) => {
|
||||||
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id)
|
if (params.loop_id)
|
||||||
if (currentIndex > -1) {
|
return
|
||||||
responseItem.workflowProcess!.tracing![currentIndex] = {
|
|
||||||
...data,
|
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess!.tracing!, {
|
||||||
status: NodeRunningStatus.Running,
|
...data,
|
||||||
}
|
status: NodeRunningStatus.Running,
|
||||||
}
|
})
|
||||||
else {
|
|
||||||
responseItem.workflowProcess!.tracing!.push({
|
|
||||||
...data,
|
|
||||||
status: NodeRunningStatus.Running,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
updateCurrentQAOnTree({
|
updateCurrentQAOnTree({
|
||||||
placeholderQuestionId,
|
placeholderQuestionId,
|
||||||
questionItem,
|
questionItem,
|
||||||
|
|
@ -540,6 +535,9 @@ export const useChat = (
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
onNodeFinished: ({ data }) => {
|
onNodeFinished: ({ data }) => {
|
||||||
|
if (params.loop_id)
|
||||||
|
return
|
||||||
|
|
||||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||||
if (currentTracingIndex > -1) {
|
if (currentTracingIndex > -1) {
|
||||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||||
|
|
@ -555,7 +553,7 @@ export const useChat = (
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
onAgentLog: ({ data }) => {
|
onAgentLog: ({ data }) => {
|
||||||
const currentNodeIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id)
|
const currentNodeIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.node_execution_id)
|
||||||
if (currentNodeIndex > -1) {
|
if (currentNodeIndex > -1) {
|
||||||
const current = responseItem.workflowProcess!.tracing![currentNodeIndex]
|
const current = responseItem.workflowProcess!.tracing![currentNodeIndex]
|
||||||
|
|
||||||
|
|
@ -781,8 +779,7 @@ export const useChat = (
|
||||||
if (!responseItem.workflowProcess?.tracing)
|
if (!responseItem.workflowProcess?.tracing)
|
||||||
return
|
return
|
||||||
const tracing = responseItem.workflowProcess.tracing
|
const tracing = responseItem.workflowProcess.tracing
|
||||||
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
|
const iterationIndex = tracing.findIndex(item => item.id === iterationFinishedData.id)!
|
||||||
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
|
|
||||||
if (iterationIndex > -1) {
|
if (iterationIndex > -1) {
|
||||||
tracing[iterationIndex] = {
|
tracing[iterationIndex] = {
|
||||||
...tracing[iterationIndex],
|
...tracing[iterationIndex],
|
||||||
|
|
@ -799,22 +796,10 @@ export const useChat = (
|
||||||
if (!responseItem.workflowProcess.tracing)
|
if (!responseItem.workflowProcess.tracing)
|
||||||
responseItem.workflowProcess.tracing = []
|
responseItem.workflowProcess.tracing = []
|
||||||
|
|
||||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
|
upsertTopLevelTracingNodeOnStart(responseItem.workflowProcess.tracing, {
|
||||||
if (currentIndex > -1) {
|
...nodeStartedData,
|
||||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
status: NodeRunningStatus.Running,
|
||||||
...nodeStartedData,
|
})
|
||||||
status: NodeRunningStatus.Running,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if (nodeStartedData.iteration_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
responseItem.workflowProcess.tracing.push({
|
|
||||||
...nodeStartedData,
|
|
||||||
status: WorkflowRunningStatus.Running,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
onNodeFinished: ({ data: nodeFinishedData }) => {
|
onNodeFinished: ({ data: nodeFinishedData }) => {
|
||||||
|
|
@ -825,6 +810,9 @@ export const useChat = (
|
||||||
if (nodeFinishedData.iteration_id)
|
if (nodeFinishedData.iteration_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if (nodeFinishedData.loop_id)
|
||||||
|
return
|
||||||
|
|
||||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
||||||
if (!item.execution_metadata?.parallel_id)
|
if (!item.execution_metadata?.parallel_id)
|
||||||
return item.id === nodeFinishedData.id
|
return item.id === nodeFinishedData.id
|
||||||
|
|
@ -852,8 +840,7 @@ export const useChat = (
|
||||||
if (!responseItem.workflowProcess?.tracing)
|
if (!responseItem.workflowProcess?.tracing)
|
||||||
return
|
return
|
||||||
const tracing = responseItem.workflowProcess.tracing
|
const tracing = responseItem.workflowProcess.tracing
|
||||||
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
|
const loopIndex = tracing.findIndex(item => item.id === loopFinishedData.id)!
|
||||||
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
|
|
||||||
if (loopIndex > -1) {
|
if (loopIndex > -1) {
|
||||||
tracing[loopIndex] = {
|
tracing[loopIndex] = {
|
||||||
...tracing[loopIndex],
|
...tracing[loopIndex],
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,133 @@
|
||||||
|
import type { NodeTracing } from '@/types/workflow'
|
||||||
|
import { NodeRunningStatus } from '@/app/components/workflow/types'
|
||||||
|
import { upsertTopLevelTracingNodeOnStart } from './top-level-tracing'
|
||||||
|
|
||||||
|
const createTrace = (overrides: Partial<NodeTracing> = {}): NodeTracing => ({
|
||||||
|
id: 'trace-1',
|
||||||
|
index: 0,
|
||||||
|
predecessor_node_id: '',
|
||||||
|
node_id: 'node-1',
|
||||||
|
node_type: 'llm' as NodeTracing['node_type'],
|
||||||
|
title: 'Node 1',
|
||||||
|
inputs: {},
|
||||||
|
inputs_truncated: false,
|
||||||
|
process_data: {},
|
||||||
|
process_data_truncated: false,
|
||||||
|
outputs: {},
|
||||||
|
outputs_truncated: false,
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
elapsed_time: 0,
|
||||||
|
metadata: {
|
||||||
|
iterator_length: 0,
|
||||||
|
iterator_index: 0,
|
||||||
|
loop_length: 0,
|
||||||
|
loop_index: 0,
|
||||||
|
},
|
||||||
|
created_at: 0,
|
||||||
|
created_by: {
|
||||||
|
id: 'user-1',
|
||||||
|
name: 'User',
|
||||||
|
email: 'user@example.com',
|
||||||
|
},
|
||||||
|
finished_at: 0,
|
||||||
|
...overrides,
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('upsertTopLevelTracingNodeOnStart', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.clearAllMocks()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should append a new top-level node when no matching trace exists', () => {
|
||||||
|
const tracing: NodeTracing[] = []
|
||||||
|
const startedNode = createTrace({
|
||||||
|
id: 'trace-2',
|
||||||
|
node_id: 'node-2',
|
||||||
|
status: NodeRunningStatus.Running,
|
||||||
|
})
|
||||||
|
|
||||||
|
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||||
|
|
||||||
|
expect(updated).toBe(true)
|
||||||
|
expect(tracing).toEqual([startedNode])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should update an existing top-level node when the execution id matches', () => {
|
||||||
|
const tracing: NodeTracing[] = [
|
||||||
|
createTrace({
|
||||||
|
id: 'trace-1',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
}),
|
||||||
|
]
|
||||||
|
const startedNode = createTrace({
|
||||||
|
id: 'trace-1',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Running,
|
||||||
|
})
|
||||||
|
|
||||||
|
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||||
|
|
||||||
|
expect(updated).toBe(true)
|
||||||
|
expect(tracing).toEqual([startedNode])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should append a new top-level node when the same node starts with a new execution id', () => {
|
||||||
|
const existingTrace = createTrace({
|
||||||
|
id: 'trace-1',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
})
|
||||||
|
const tracing: NodeTracing[] = [existingTrace]
|
||||||
|
const startedNode = createTrace({
|
||||||
|
id: 'trace-2',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Running,
|
||||||
|
})
|
||||||
|
|
||||||
|
const updated = upsertTopLevelTracingNodeOnStart(tracing, startedNode)
|
||||||
|
|
||||||
|
expect(updated).toBe(true)
|
||||||
|
expect(tracing).toEqual([existingTrace, startedNode])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should ignore nested iteration node starts even when the node id matches a top-level trace', () => {
|
||||||
|
const existingTrace = createTrace({
|
||||||
|
id: 'top-level-trace',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
})
|
||||||
|
const tracing: NodeTracing[] = [existingTrace]
|
||||||
|
const nestedIterationTrace = createTrace({
|
||||||
|
id: 'iteration-trace',
|
||||||
|
node_id: 'node-1',
|
||||||
|
iteration_id: 'iteration-1',
|
||||||
|
status: NodeRunningStatus.Running,
|
||||||
|
})
|
||||||
|
|
||||||
|
const updated = upsertTopLevelTracingNodeOnStart(tracing, nestedIterationTrace)
|
||||||
|
|
||||||
|
expect(updated).toBe(false)
|
||||||
|
expect(tracing).toEqual([existingTrace])
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should ignore nested loop node starts even when the node id matches a top-level trace', () => {
|
||||||
|
const existingTrace = createTrace({
|
||||||
|
id: 'top-level-trace',
|
||||||
|
node_id: 'node-1',
|
||||||
|
status: NodeRunningStatus.Succeeded,
|
||||||
|
})
|
||||||
|
const tracing: NodeTracing[] = [existingTrace]
|
||||||
|
const nestedLoopTrace = createTrace({
|
||||||
|
id: 'loop-trace',
|
||||||
|
node_id: 'node-1',
|
||||||
|
loop_id: 'loop-1',
|
||||||
|
status: NodeRunningStatus.Running,
|
||||||
|
})
|
||||||
|
|
||||||
|
const updated = upsertTopLevelTracingNodeOnStart(tracing, nestedLoopTrace)
|
||||||
|
|
||||||
|
expect(updated).toBe(false)
|
||||||
|
expect(tracing).toEqual([existingTrace])
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
import type { NodeTracing } from '@/types/workflow'
|
||||||
|
|
||||||
|
const isNestedTracingNode = (trace: Pick<NodeTracing, 'iteration_id' | 'loop_id'>) => {
|
||||||
|
return Boolean(trace.iteration_id || trace.loop_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
export const upsertTopLevelTracingNodeOnStart = (
|
||||||
|
tracing: NodeTracing[],
|
||||||
|
startedNode: NodeTracing,
|
||||||
|
) => {
|
||||||
|
if (isNestedTracingNode(startedNode))
|
||||||
|
return false
|
||||||
|
|
||||||
|
const currentIndex = tracing.findIndex(item => item.id === startedNode.id)
|
||||||
|
if (currentIndex > -1)
|
||||||
|
// Started events are the authoritative snapshot for an execution; merging would retain stale client-side fields.
|
||||||
|
tracing[currentIndex] = startedNode
|
||||||
|
else
|
||||||
|
tracing.push(startedNode)
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue