From 7f3ef542cbe4e0d7ee9b0db6fca4c68e12037b29 Mon Sep 17 00:00:00 2001 From: Chen Yefan Date: Tue, 24 Mar 2026 02:14:32 +0800 Subject: [PATCH] fix: replay workflow history results with first-run rendering --- api/controllers/service_api/app/workflow.py | 2 + api/core/app/workflow/layers/persistence.py | 25 + api/core/app/workflow/result_replay.py | 546 ++++++++++++++++++ ...qlalchemy_workflow_execution_repository.py | 6 + .../workflow/entities/workflow_execution.py | 1 + api/fields/workflow_run_fields.py | 1 + ...6c2a_add_result_replay_to_workflow_runs.py | 26 + api/models/workflow.py | 7 + api/tasks/workflow_execution_tasks.py | 5 + .../core/app/workflow/test_result_replay.py | 290 ++++++++++ ...qlalchemy_workflow_execution_repository.py | 87 +++ .../tasks/test_workflow_execution_tasks.py | 79 +++ web/app/components/workflow/panel/record.tsx | 1 + .../components/workflow/run/index.spec.tsx | 276 +++++++++ web/app/components/workflow/run/index.tsx | 389 ++++++++++++- .../components/workflow/run/result-text.tsx | 2 +- web/models/log.ts | 41 +- 17 files changed, 1771 insertions(+), 13 deletions(-) create mode 100644 api/core/app/workflow/result_replay.py create mode 100644 api/migrations/versions/2026_03_23_2350-b8d7fb4f6c2a_add_result_replay_to_workflow_runs.py create mode 100644 api/tests/unit_tests/core/app/workflow/test_result_replay.py create mode 100644 api/tests/unit_tests/core/repositories/test_sqlalchemy_workflow_execution_repository.py create mode 100644 api/tests/unit_tests/tasks/test_workflow_execution_tasks.py create mode 100644 web/app/components/workflow/run/index.spec.tsx diff --git a/api/controllers/service_api/app/workflow.py b/api/controllers/service_api/app/workflow.py index 6088b142c2..28c629fd5a 100644 --- a/api/controllers/service_api/app/workflow.py +++ b/api/controllers/service_api/app/workflow.py @@ -85,6 +85,8 @@ workflow_run_fields = { "status": WorkflowRunStatusField, "inputs": fields.Raw, "outputs": WorkflowRunOutputsField, + "outputs_as_generation": fields.Boolean, + "result_replay": fields.Raw(attribute="result_replay_dict"), "error": fields.String, "total_steps": fields.Integer, "total_tokens": fields.Integer, diff --git a/api/core/app/workflow/layers/persistence.py b/api/core/app/workflow/layers/persistence.py index 132302efe1..c8f798da06 100644 --- a/api/core/app/workflow/layers/persistence.py +++ b/api/core/app/workflow/layers/persistence.py @@ -15,6 +15,7 @@ from datetime import datetime from typing import Any, Union from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity +from core.app.workflow.result_replay import WorkflowResultReplayBuilder, build_result_replay_from_node_executions from core.ops.entities.trace_entity import TraceTaskName from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID @@ -40,6 +41,7 @@ from core.workflow.graph_events import ( NodeRunPauseRequestedEvent, NodeRunRetryEvent, NodeRunStartedEvent, + NodeRunStreamChunkEvent, NodeRunSucceededEvent, ) from core.workflow.node_events import NodeRunResult @@ -94,6 +96,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer): self._node_execution_cache: dict[str, WorkflowNodeExecution] = {} self._node_snapshots: dict[str, _NodeRuntimeSnapshot] = {} self._node_sequence: int = 0 + self._result_replay_builder = WorkflowResultReplayBuilder() # ------------------------------------------------------------------ # GraphEngineLayer lifecycle @@ -103,6 +106,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer): self._node_execution_cache.clear() self._node_snapshots.clear() self._node_sequence = 0 + self._result_replay_builder = WorkflowResultReplayBuilder() def on_event(self, event: GraphEngineEvent) -> None: if isinstance(event, GraphRunStartedEvent): @@ -129,6 +133,10 @@ class WorkflowPersistenceLayer(GraphEngineLayer): self._handle_graph_run_paused(event) return + if isinstance(event, NodeRunStreamChunkEvent): + self._handle_node_stream_chunk(event) + return + if isinstance(event, NodeRunStartedEvent): self._handle_node_started(event) return @@ -176,6 +184,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer): def _handle_graph_run_succeeded(self, event: GraphRunSucceededEvent) -> None: execution = self._get_workflow_execution() execution.outputs = event.outputs + execution.result_replay = self._build_result_replay(event.outputs) execution.status = WorkflowExecutionStatus.SUCCEEDED self._populate_completion_statistics(execution) @@ -185,6 +194,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer): def _handle_graph_run_partial_succeeded(self, event: GraphRunPartialSucceededEvent) -> None: execution = self._get_workflow_execution() execution.outputs = event.outputs + execution.result_replay = self._build_result_replay(event.outputs) execution.status = WorkflowExecutionStatus.PARTIAL_SUCCEEDED execution.exceptions_count = event.exceptions_count self._populate_completion_statistics(execution) @@ -194,6 +204,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer): def _handle_graph_run_failed(self, event: GraphRunFailedEvent) -> None: execution = self._get_workflow_execution() + execution.result_replay = self._build_result_replay(execution.outputs) execution.status = WorkflowExecutionStatus.FAILED execution.error_message = event.error execution.exceptions_count = event.exceptions_count @@ -205,6 +216,8 @@ class WorkflowPersistenceLayer(GraphEngineLayer): def _handle_graph_run_aborted(self, event: GraphRunAbortedEvent) -> None: execution = self._get_workflow_execution() + execution.outputs = event.outputs + execution.result_replay = self._build_result_replay(event.outputs) execution.status = WorkflowExecutionStatus.STOPPED execution.error_message = event.reason or "Workflow execution aborted" self._populate_completion_statistics(execution) @@ -217,6 +230,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer): execution = self._get_workflow_execution() execution.status = WorkflowExecutionStatus.PAUSED execution.outputs = event.outputs + execution.result_replay = self._build_result_replay(event.outputs) self._populate_completion_statistics(execution, update_finished=False) self._workflow_execution_repository.save(execution) @@ -262,6 +276,9 @@ class WorkflowPersistenceLayer(GraphEngineLayer): ) self._node_snapshots[event.id] = snapshot + def _handle_node_stream_chunk(self, event: NodeRunStreamChunkEvent) -> None: + self._result_replay_builder.add_stream_chunk(event) + def _handle_node_retry(self, event: NodeRunRetryEvent) -> None: domain_execution = self._get_node_execution(event.id) domain_execution.status = WorkflowNodeExecutionStatus.RETRY @@ -324,6 +341,14 @@ class WorkflowPersistenceLayer(GraphEngineLayer): handled = WorkflowEntry.handle_special_values(inputs) return handled or {} + def _build_result_replay(self, outputs: Mapping[str, Any] | None) -> Mapping[str, Any] | None: + replay = self._result_replay_builder.build(outputs) + has_structured_items = bool(replay and replay.get("llm_generation_items")) + if has_structured_items: + return replay + + return build_result_replay_from_node_executions(outputs, self._node_execution_cache.values()) or replay + def _get_workflow_execution(self) -> WorkflowExecution: if self._workflow_execution is None: raise ValueError("workflow execution not initialized") diff --git a/api/core/app/workflow/result_replay.py b/api/core/app/workflow/result_replay.py new file mode 100644 index 0000000000..4007d94a5d --- /dev/null +++ b/api/core/app/workflow/result_replay.py @@ -0,0 +1,546 @@ +from __future__ import annotations + +import json +from collections.abc import Iterable, Mapping, Sequence +from copy import deepcopy +from operator import itemgetter +from typing import Any + +from core.file import FILE_MODEL_IDENTITY, File +from core.variables.segments import ArrayFileSegment, FileSegment, Segment +from core.workflow.entities.tool_entities import ToolResultStatus +from core.workflow.enums import WorkflowNodeExecutionMetadataKey +from core.workflow.graph_events import ChunkType, NodeRunStreamChunkEvent + + +class WorkflowResultReplayBuilder: + """Build a persisted replay payload for workflow result rendering.""" + + def __init__(self) -> None: + self._text = "" + self._items: list[dict[str, Any]] = [] + self._active_thought_index: int | None = None + self._active_tool_key: str | None = None + self._tool_indexes: dict[str, int] = {} + + def add_stream_chunk(self, event: NodeRunStreamChunkEvent) -> None: + if _is_empty_terminal_stream_event(event): + return + + if event.chunk_type == ChunkType.TEXT: + self._append_text(event.chunk) + return + + if event.chunk_type == ChunkType.THOUGHT_START: + self._close_open_text_item() + self._active_thought_index = len(self._items) + self._items.append({ + "type": "thought", + "thought_output": event.chunk or "", + "thought_completed": False, + }) + return + + if event.chunk_type == ChunkType.THOUGHT: + thought_index = self._ensure_active_thought() + self._items[thought_index]["thought_output"] += event.chunk or "" + return + + if event.chunk_type == ChunkType.THOUGHT_END: + thought_index = self._ensure_active_thought() + self._items[thought_index]["thought_output"] += event.chunk or "" + self._items[thought_index]["thought_completed"] = True + self._active_thought_index = None + return + + if event.chunk_type == ChunkType.TOOL_CALL: + self._close_open_text_item() + tool_call = event.tool_call + tool_key = (tool_call.id if tool_call else None) or f"tool-{len(self._items)}" + tool_index = self._tool_indexes.get(tool_key) + + if tool_index is None: + self._items.append({ + "type": "tool", + "tool_name": tool_call.name if tool_call else None, + "tool_arguments": tool_call.arguments if tool_call else None, + "tool_icon": tool_call.icon if tool_call else None, + "tool_icon_dark": tool_call.icon_dark if tool_call else None, + }) + tool_index = len(self._items) - 1 + self._tool_indexes[tool_key] = tool_index + else: + payload = self._items[tool_index] + if tool_call: + if tool_call.name: + payload["tool_name"] = tool_call.name + if tool_call.arguments is not None: + payload["tool_arguments"] = tool_call.arguments + if tool_call.icon is not None: + payload["tool_icon"] = tool_call.icon + if tool_call.icon_dark is not None: + payload["tool_icon_dark"] = tool_call.icon_dark + + self._active_tool_key = tool_key + return + + if event.chunk_type == ChunkType.TOOL_RESULT: + tool_result = event.tool_result + tool_key = (tool_result.id if tool_result else None) or self._active_tool_key or f"tool-{len(self._items)}" + tool_index = self._tool_indexes.get(tool_key) + + if tool_index is None: + self._items.append({ + "type": "tool", + "tool_name": tool_result.name if tool_result else None, + }) + tool_index = len(self._items) - 1 + self._tool_indexes[tool_key] = tool_index + + payload = self._items[tool_index] + if tool_result: + if tool_result.name: + payload["tool_name"] = tool_result.name + if tool_result.output is not None: + payload["tool_output"] = tool_result.output + if tool_result.files: + payload["tool_files"] = [_normalize_file_like(file) for file in tool_result.files] + if tool_result.elapsed_time is not None: + payload["tool_duration"] = tool_result.elapsed_time + if tool_result.icon is not None: + payload["tool_icon"] = tool_result.icon + if tool_result.icon_dark is not None: + payload["tool_icon_dark"] = tool_result.icon_dark + if tool_result.status == ToolResultStatus.ERROR: + payload["tool_error"] = tool_result.output or "error" + elif "tool_error" in payload: + payload.pop("tool_error", None) + + self._active_tool_key = tool_key + + def _append_text(self, chunk: str) -> None: + self._text += chunk or "" + + if self._items and self._items[-1].get("type") == "text" and not self._items[-1].get("text_completed", False): + self._items[-1]["text"] += chunk or "" + return + + self._items.append({ + "type": "text", + "text": chunk or "", + "text_completed": False, + }) + + def _close_open_text_item(self) -> None: + if self._items and self._items[-1].get("type") == "text": + self._items[-1]["text_completed"] = True + + def _ensure_active_thought(self) -> int: + if self._active_thought_index is not None: + return self._active_thought_index + + self._close_open_text_item() + self._items.append({ + "type": "thought", + "thought_output": "", + "thought_completed": False, + }) + self._active_thought_index = len(self._items) - 1 + return self._active_thought_index + + def build(self, outputs: Mapping[str, Any] | None = None) -> dict[str, Any] | None: + replay_text = self._text or _get_single_output_text(outputs) + items = deepcopy(self._items) + + if not items: + generation = _get_single_generation_output(outputs) + if generation: + generation_text, generation_items = _build_generation_items_from_payload(generation) + replay_text = replay_text or generation_text + items = generation_items + + for item in items: + if item.get("type") == "text": + item["text_completed"] = True + if item.get("type") == "thought": + item["thought_completed"] = True + + files = _group_files_by_output_var(outputs) + + if not replay_text and not items and not files: + return None + + return { + "text": replay_text, + "llm_generation_items": items, + "files": files, + } + + +def build_result_replay_from_node_executions( + outputs: Mapping[str, Any] | None, + node_executions: Iterable[Any], +) -> dict[str, Any] | None: + preferred_text = _get_single_output_text(outputs) + generation = _get_single_generation_output(outputs) + if not preferred_text and generation: + preferred_text = generation.get("content") if isinstance(generation.get("content"), str) else "" + + files = _group_files_by_output_var(outputs) + candidates: list[tuple[int, int, str, Sequence[Mapping[str, Any]]]] = [] + + for node_execution in node_executions: + metadata = getattr(node_execution, "metadata", None) or {} + if not isinstance(metadata, Mapping): + continue + + llm_trace = metadata.get(WorkflowNodeExecutionMetadataKey.LLM_TRACE) or metadata.get( + WorkflowNodeExecutionMetadataKey.LLM_TRACE.value + ) + if not isinstance(llm_trace, Sequence): + continue + + node_outputs = getattr(node_execution, "outputs", None) or {} + if not isinstance(node_outputs, Mapping): + continue + + node_generation = ( + _get_single_generation_output({"generation": node_outputs.get("generation")}) + if node_outputs.get("generation") + else None + ) + node_text = node_outputs.get("text") + if not isinstance(node_text, str) and node_generation: + node_text = node_generation.get("content") + + if not isinstance(node_text, str): + node_text = "" + + score = 0 + if preferred_text and node_text: + if preferred_text == node_text: + score = 3 + elif preferred_text in node_text or node_text in preferred_text: + score = 2 + elif node_text: + score = 1 + + candidates.append((score, int(getattr(node_execution, "index", 0) or 0), node_text, llm_trace)) + + if not candidates: + return None + + candidates.sort(key=itemgetter(0, 1), reverse=True) + + for _, _, node_text, llm_trace in candidates: + replay = _build_result_replay_from_llm_trace( + llm_trace=llm_trace, + preferred_text=preferred_text or node_text, + files=files, + ) + if replay: + return replay + + return None + + +def _get_single_output_text(outputs: Mapping[str, Any] | None) -> str: + if not outputs or len(outputs) != 1: + return "" + + value = next(iter(outputs.values())) + return value if isinstance(value, str) else "" + + +def _get_single_generation_output(outputs: Mapping[str, Any] | None) -> Mapping[str, Any] | None: + if not outputs or len(outputs) != 1: + return None + + value = next(iter(outputs.values())) + return value if isinstance(value, Mapping) else None + + +def _build_generation_items_from_payload(generation: Mapping[str, Any]) -> tuple[str, list[dict[str, Any]]]: + result_text = generation.get("content") if isinstance(generation.get("content"), str) else "" + reasoning_content = ( + generation.get("reasoning_content") if isinstance(generation.get("reasoning_content"), list) else [] + ) + tool_calls = generation.get("tool_calls") if isinstance(generation.get("tool_calls"), list) else [] + sequence = generation.get("sequence") if isinstance(generation.get("sequence"), list) else [] + items: list[dict[str, Any]] = [] + + def append_tool(tool_call: Mapping[str, Any], index: int) -> None: + tool_output = tool_call.get("result") if "result" in tool_call else tool_call.get("output") + payload: dict[str, Any] = { + "type": "tool", + "tool_name": tool_call.get("name"), + "tool_arguments": tool_call.get("arguments"), + "tool_output": tool_output, + "tool_duration": tool_call.get("elapsed_time") or tool_call.get("time_cost"), + } + if tool_call.get("icon") is not None or tool_call.get("tool_icon") is not None: + payload["tool_icon"] = tool_call.get("icon") or tool_call.get("tool_icon") + if tool_call.get("icon_dark") is not None or tool_call.get("tool_icon_dark") is not None: + payload["tool_icon_dark"] = tool_call.get("icon_dark") or tool_call.get("tool_icon_dark") + if isinstance(tool_call.get("files"), list): + payload["tool_files"] = [_normalize_file_like(file) for file in tool_call.get("files", [])] + elif isinstance(tool_call.get("tool_files"), list): + payload["tool_files"] = [_normalize_file_like(file) for file in tool_call.get("tool_files", [])] + + if tool_call.get("status") == "error": + payload["tool_error"] = tool_call.get("error") or stringify_copy_value(tool_output) or "error" + + items.append(payload) + + for segment in sequence: + if not isinstance(segment, Mapping): + continue + + if segment.get("type") == "content": + start = segment.get("start", 0) + end = segment.get("end", start) + if isinstance(start, int) and isinstance(end, int): + text = result_text[start:end] + if text.strip(): + items.append({"type": "text", "text": text, "text_completed": True}) + elif segment.get("type") == "reasoning": + index = segment.get("index") + if isinstance(index, int) and index < len(reasoning_content) and isinstance(reasoning_content[index], str): + items.append({ + "type": "thought", + "thought_output": reasoning_content[index], + "thought_completed": True, + }) + elif segment.get("type") == "tool_call": + index = segment.get("index") + if isinstance(index, int) and index < len(tool_calls) and isinstance(tool_calls[index], Mapping): + append_tool(tool_calls[index], index) + + if not items and (reasoning_content or tool_calls or result_text): + synthetic_count = max(len(reasoning_content), len(tool_calls)) + for index in range(synthetic_count): + if ( + index < len(reasoning_content) + and isinstance(reasoning_content[index], str) + and reasoning_content[index] + ): + items.append({ + "type": "thought", + "thought_output": reasoning_content[index], + "thought_completed": True, + }) + + if index < len(tool_calls) and isinstance(tool_calls[index], Mapping): + append_tool(tool_calls[index], index) + + if result_text: + items.append({"type": "text", "text": result_text, "text_completed": True}) + + return result_text, items + + +def _build_result_replay_from_llm_trace( + *, + llm_trace: Sequence[Mapping[str, Any]], + preferred_text: str, + files: list[dict[str, Any]], +) -> dict[str, Any] | None: + items: list[dict[str, Any]] = [] + tool_indexes: dict[str, int] = {} + + for segment in llm_trace: + if not isinstance(segment, Mapping): + continue + + segment_type = segment.get("type") + output = segment.get("output") if isinstance(segment.get("output"), Mapping) else {} + + if segment_type == "model": + reasoning = output.get("reasoning") + if isinstance(reasoning, str) and reasoning: + items.append({ + "type": "thought", + "thought_output": reasoning, + "thought_completed": True, + }) + + text = output.get("text") + if isinstance(text, str) and text: + items.append({ + "type": "text", + "text": text, + "text_completed": True, + }) + + tool_calls = output.get("tool_calls") + if isinstance(tool_calls, list): + for tool_call in tool_calls: + if not isinstance(tool_call, Mapping): + continue + + tool_id = str(tool_call.get("id") or f"tool-{len(tool_indexes)}") + items.append({ + "type": "tool", + "tool_name": tool_call.get("name"), + "tool_arguments": tool_call.get("arguments"), + }) + tool_indexes[tool_id] = len(items) - 1 + + elif segment_type == "tool": + tool_id = str(output.get("id") or f"tool-{len(tool_indexes)}") + tool_index = tool_indexes.get(tool_id) + if tool_index is None: + items.append({ + "type": "tool", + "tool_name": output.get("name"), + }) + tool_index = len(items) - 1 + tool_indexes[tool_id] = tool_index + + payload = items[tool_index] + payload["tool_name"] = output.get("name") or payload.get("tool_name") + payload["tool_arguments"] = output.get("arguments") or payload.get("tool_arguments") + payload["tool_output"] = output.get("output") + payload["tool_duration"] = segment.get("duration") + if segment.get("icon") is not None: + payload["tool_icon"] = segment.get("icon") + if segment.get("icon_dark") is not None: + payload["tool_icon_dark"] = segment.get("icon_dark") + if isinstance(output.get("files"), list): + payload["tool_files"] = [_normalize_file_like(file) for file in output.get("files", [])] + if segment.get("status") == "error": + payload["tool_error"] = segment.get("error") or stringify_copy_value(output.get("output")) or "error" + + if not items and not preferred_text and not files: + return None + + combined_text = "".join( + item.get("text", "") + for item in items + if item.get("type") == "text" and isinstance(item.get("text"), str) + ) + if ( + preferred_text + and preferred_text != combined_text + and preferred_text not in combined_text + and combined_text not in preferred_text + ): + items.append({ + "type": "text", + "text": preferred_text, + "text_completed": True, + }) + + return { + "text": preferred_text or combined_text, + "llm_generation_items": items, + "files": files, + } + + +def _is_empty_terminal_stream_event(event: NodeRunStreamChunkEvent) -> bool: + if not event.is_final: + return False + + if event.chunk_type == ChunkType.TEXT: + return not event.chunk + + if event.chunk_type in {ChunkType.THOUGHT, ChunkType.THOUGHT_START, ChunkType.THOUGHT_END}: + return not event.chunk + + if event.chunk_type == ChunkType.TOOL_CALL: + tool_call = event.tool_call + if not tool_call: + return True + return not tool_call.id and not tool_call.name and not tool_call.arguments and not event.chunk + + if event.chunk_type == ChunkType.TOOL_RESULT: + tool_result = event.tool_result + if not tool_result: + return True + return ( + not tool_result.id + and not tool_result.name + and not tool_result.output + and not tool_result.files + and not event.chunk + ) + + return False + + +def _group_files_by_output_var(outputs: Mapping[str, Any] | None) -> list[dict[str, Any]]: + if not outputs: + return [] + + result: list[dict[str, Any]] = [] + for key, value in outputs.items(): + files = _fetch_files_from_variable_value(value) + if files: + result.append({ + "var_name": key, + "files": list(files), + }) + + return result + + +def _fetch_files_from_variable_value( + value: dict[str, Any] | list[Any] | Segment | File | None, +) -> Sequence[Mapping[str, Any]]: + if not value: + return [] + + files: list[Mapping[str, Any]] = [] + if isinstance(value, FileSegment): + files.append(value.value.to_dict()) + elif isinstance(value, ArrayFileSegment): + files.extend([item.to_dict() for item in value.value]) + elif isinstance(value, File): + files.append(value.to_dict()) + elif isinstance(value, list): + for item in value: + file = _get_file_var_from_value(item) + if file: + files.append(file) + elif isinstance(value, dict): + file = _get_file_var_from_value(value) + if file: + files.append(file) + + return files + + +def _get_file_var_from_value(value: Any) -> Mapping[str, Any] | None: + if not value: + return None + + if isinstance(value, dict) and value.get("dify_model_identity") == FILE_MODEL_IDENTITY: + return value + if isinstance(value, File): + return value.to_dict() + + return None + + +def _normalize_file_like(value: Any) -> Any: + if isinstance(value, File): + return value.to_dict() + + if isinstance(value, Mapping): + return dict(value) + + return value + + +def stringify_copy_value(value: Any) -> str: + if isinstance(value, str): + return value + + if value is None: + return "" + + try: + return str(value) if not isinstance(value, (Mapping, list)) else json.dumps(value, ensure_ascii=False) + except Exception: + return str(value) diff --git a/api/core/repositories/sqlalchemy_workflow_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_execution_repository.py index 9091a3190b..2d38f94d6d 100644 --- a/api/core/repositories/sqlalchemy_workflow_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_execution_repository.py @@ -109,6 +109,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): graph=graph, inputs=inputs, outputs=outputs, + result_replay=db_model.result_replay_dict, status=status, error_message=db_model.error or "", total_tokens=db_model.total_tokens, @@ -155,6 +156,11 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): if domain_model.outputs else None ) + db_model.result_replay = ( + json.dumps(WorkflowRuntimeTypeConverter().to_json_encodable(domain_model.result_replay)) + if domain_model.result_replay is not None + else None + ) db_model.status = domain_model.status db_model.error = domain_model.error_message or None db_model.total_tokens = domain_model.total_tokens diff --git a/api/core/workflow/entities/workflow_execution.py b/api/core/workflow/entities/workflow_execution.py index 1b3fb36f1f..5d88e52268 100644 --- a/api/core/workflow/entities/workflow_execution.py +++ b/api/core/workflow/entities/workflow_execution.py @@ -31,6 +31,7 @@ class WorkflowExecution(BaseModel): inputs: Mapping[str, Any] = Field(...) outputs: Mapping[str, Any] | None = None + result_replay: Mapping[str, Any] | None = None status: WorkflowExecutionStatus = WorkflowExecutionStatus.RUNNING error_message: str = Field(default="") diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index 6f20dd45c7..10fecaff56 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -95,6 +95,7 @@ workflow_run_detail_fields = { "status": fields.String, "outputs": fields.Raw(attribute="outputs_dict"), "outputs_as_generation": fields.Boolean, + "result_replay": fields.Raw(attribute="result_replay_dict"), "error": fields.String, "elapsed_time": fields.Float, "total_tokens": fields.Integer, diff --git a/api/migrations/versions/2026_03_23_2350-b8d7fb4f6c2a_add_result_replay_to_workflow_runs.py b/api/migrations/versions/2026_03_23_2350-b8d7fb4f6c2a_add_result_replay_to_workflow_runs.py new file mode 100644 index 0000000000..413307f019 --- /dev/null +++ b/api/migrations/versions/2026_03_23_2350-b8d7fb4f6c2a_add_result_replay_to_workflow_runs.py @@ -0,0 +1,26 @@ +"""add result replay to workflow runs + +Revision ID: b8d7fb4f6c2a +Revises: fce013ca180e +Create Date: 2026-03-23 23:50:00.000000 + +""" + +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "b8d7fb4f6c2a" +down_revision = "fce013ca180e" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("workflow_runs", sa.Column("result_replay", models.types.LongText(), nullable=True)) + + +def downgrade(): + op.drop_column("workflow_runs", "result_replay") diff --git a/api/models/workflow.py b/api/models/workflow.py index 23db5002e5..6fe5ef3873 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -652,6 +652,7 @@ class WorkflowRun(Base): nullable=False, ) outputs: Mapped[str | None] = mapped_column(LongText, default="{}") + result_replay: Mapped[str | None] = mapped_column(LongText) error: Mapped[str | None] = mapped_column(LongText) elapsed_time: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default=sa.text("0")) total_tokens: Mapped[int] = mapped_column(sa.BigInteger, server_default=sa.text("0")) @@ -697,6 +698,10 @@ class WorkflowRun(Base): def outputs_dict(self) -> Mapping[str, Any]: return json.loads(self.outputs) if self.outputs else {} + @property + def result_replay_dict(self) -> Mapping[str, Any] | None: + return json.loads(self.result_replay) if self.result_replay else None + @property @deprecated("This method is retained for historical reasons; avoid using it if possible.") def message(self): @@ -729,6 +734,7 @@ class WorkflowRun(Base): "status": self.status, "outputs": self.outputs_dict, "outputs_as_generation": self.outputs_as_generation, + "result_replay": self.result_replay_dict, "error": self.error, "elapsed_time": self.elapsed_time, "total_tokens": self.total_tokens, @@ -754,6 +760,7 @@ class WorkflowRun(Base): inputs=json.dumps(data.get("inputs")), status=data.get("status"), outputs=json.dumps(data.get("outputs")), + result_replay=json.dumps(data.get("result_replay")) if data.get("result_replay") is not None else None, error=data.get("error"), elapsed_time=data.get("elapsed_time"), total_tokens=data.get("total_tokens"), diff --git a/api/tasks/workflow_execution_tasks.py b/api/tasks/workflow_execution_tasks.py index 3b3c6e5313..6af259f0c2 100644 --- a/api/tasks/workflow_execution_tasks.py +++ b/api/tasks/workflow_execution_tasks.py @@ -104,6 +104,9 @@ def _create_workflow_run_from_execution( workflow_run.outputs = ( json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}" ) + workflow_run.result_replay = ( + json.dumps(json_converter.to_json_encodable(execution.result_replay)) if execution.result_replay else None + ) workflow_run.error = execution.error_message workflow_run.elapsed_time = execution.elapsed_time workflow_run.total_tokens = execution.total_tokens @@ -125,6 +128,8 @@ def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: Wo workflow_run.outputs = ( json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}" ) + if execution.result_replay is not None: + workflow_run.result_replay = json.dumps(json_converter.to_json_encodable(execution.result_replay)) workflow_run.error = execution.error_message workflow_run.elapsed_time = execution.elapsed_time workflow_run.total_tokens = execution.total_tokens diff --git a/api/tests/unit_tests/core/app/workflow/test_result_replay.py b/api/tests/unit_tests/core/app/workflow/test_result_replay.py new file mode 100644 index 0000000000..6386df880e --- /dev/null +++ b/api/tests/unit_tests/core/app/workflow/test_result_replay.py @@ -0,0 +1,290 @@ +import json +from datetime import datetime + +from core.app.workflow.result_replay import WorkflowResultReplayBuilder, build_result_replay_from_node_executions +from core.file import FILE_MODEL_IDENTITY +from core.workflow.entities import ToolCall, ToolResult, ToolResultStatus, WorkflowNodeExecution +from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.graph_events import ChunkType, NodeRunStreamChunkEvent + + +def _stream_event( + *, + chunk_type: ChunkType, + chunk: str = "", + tool_call: ToolCall | None = None, + tool_result: ToolResult | None = None, +) -> NodeRunStreamChunkEvent: + return NodeRunStreamChunkEvent( + id="execution-1", + node_id="answer-node", + node_type=NodeType.ANSWER, + selector=["answer-node", "answer"], + chunk=chunk, + is_final=False, + chunk_type=chunk_type, + tool_call=tool_call, + tool_result=tool_result, + ) + + +def test_workflow_result_replay_builder_preserves_generation_sequence_and_files() -> None: + builder = WorkflowResultReplayBuilder() + file_payload = { + "dify_model_identity": FILE_MODEL_IDENTITY, + "related_id": "file-1", + "filename": "report.pdf", + "size": 128, + "mime_type": "application/pdf", + "transfer_method": "local_file", + "type": "document", + "url": "https://example.com/report.pdf", + "upload_file_id": "upload-file-1", + "remote_url": "", + } + + builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.THOUGHT_START)) + builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.THOUGHT, chunk="Need to inspect the workspace.")) + builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.THOUGHT_END)) + builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.TEXT, chunk="I checked the directory.\n")) + builder.add_stream_chunk( + _stream_event( + chunk_type=ChunkType.TOOL_CALL, + tool_call=ToolCall( + id="tool-call-1", + name="bash", + arguments=json.dumps({"command": "ls"}), + icon="light-icon", + icon_dark="dark-icon", + ), + ) + ) + builder.add_stream_chunk( + _stream_event( + chunk_type=ChunkType.TOOL_RESULT, + chunk="output/", + tool_result=ToolResult( + id="tool-call-1", + name="bash", + output="output/", + files=["https://example.com/report.pdf"], + status=ToolResultStatus.SUCCESS, + elapsed_time=0.3, + icon="light-icon", + icon_dark="dark-icon", + ), + ) + ) + builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.TEXT, chunk="Finished.")) + + replay = builder.build(outputs={"files": [file_payload]}) + + assert replay is not None + assert replay["text"] == "I checked the directory.\nFinished." + assert replay["files"] == [{"var_name": "files", "files": [file_payload]}] + assert replay["llm_generation_items"] == [ + { + "type": "thought", + "thought_output": "Need to inspect the workspace.", + "thought_completed": True, + }, + { + "type": "text", + "text": "I checked the directory.\n", + "text_completed": True, + }, + { + "type": "tool", + "tool_name": "bash", + "tool_arguments": "{\"command\": \"ls\"}", + "tool_icon": "light-icon", + "tool_icon_dark": "dark-icon", + "tool_output": "output/", + "tool_files": ["https://example.com/report.pdf"], + "tool_duration": 0.3, + }, + { + "type": "text", + "text": "Finished.", + "text_completed": True, + }, + ] + + +def test_workflow_result_replay_builder_synthesizes_items_from_generation_without_sequence() -> None: + builder = WorkflowResultReplayBuilder() + + replay = builder.build( + outputs={ + "generation": { + "content": "Workspace is clean.", + "reasoning_content": [], + "tool_calls": [ + { + "name": "bash", + "arguments": "{\"bash\":\"ls\"}", + "output": "output/", + "status": "success", + "elapsed_time": 0.2, + } + ], + "sequence": [], + } + } + ) + + assert replay is not None + assert replay["text"] == "Workspace is clean." + assert replay["llm_generation_items"] == [ + { + "type": "tool", + "tool_name": "bash", + "tool_arguments": "{\"bash\":\"ls\"}", + "tool_output": "output/", + "tool_duration": 0.2, + }, + { + "type": "text", + "text": "Workspace is clean.", + "text_completed": True, + }, + ] + + +def test_workflow_result_replay_builder_ignores_empty_terminal_stream_events() -> None: + builder = WorkflowResultReplayBuilder() + + builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.TEXT, chunk="Hello")) + builder.add_stream_chunk( + NodeRunStreamChunkEvent( + id="execution-1", + node_id="answer-node", + node_type=NodeType.ANSWER, + selector=["answer-node", "generation", "thought"], + chunk="", + is_final=True, + chunk_type=ChunkType.THOUGHT, + ) + ) + builder.add_stream_chunk( + NodeRunStreamChunkEvent( + id="execution-1", + node_id="answer-node", + node_type=NodeType.ANSWER, + selector=["answer-node", "generation", "tool_calls"], + chunk="", + is_final=True, + chunk_type=ChunkType.TOOL_CALL, + tool_call=ToolCall(id="", name="", arguments=""), + ) + ) + builder.add_stream_chunk( + NodeRunStreamChunkEvent( + id="execution-1", + node_id="answer-node", + node_type=NodeType.ANSWER, + selector=["answer-node", "generation", "tool_results"], + chunk="", + is_final=True, + chunk_type=ChunkType.TOOL_RESULT, + tool_result=ToolResult(id="", name="", output="", files=[], status=ToolResultStatus.SUCCESS), + ) + ) + + replay = builder.build(outputs={"answer": "Hello"}) + + assert replay is not None + assert replay["llm_generation_items"] == [ + { + "type": "text", + "text": "Hello", + "text_completed": True, + }, + ] + + +def test_build_result_replay_from_node_executions_uses_llm_trace_when_outputs_are_flattened() -> None: + node_execution = WorkflowNodeExecution( + id="node-execution-1", + node_execution_id="node-execution-1", + workflow_id="workflow-1", + workflow_execution_id="workflow-run-1", + index=1, + node_id="agent-node", + node_type=NodeType.LLM, + title="Agent", + status=WorkflowNodeExecutionStatus.SUCCEEDED, + outputs={ + "generation": { + "content": "Workspace is clean.", + "reasoning_content": [], + "tool_calls": [], + "sequence": [], + } + }, + metadata={ + WorkflowNodeExecutionMetadataKey.LLM_TRACE: [ + { + "type": "model", + "output": { + "text": "Let me inspect the workspace:", + "reasoning": None, + "tool_calls": [ + { + "id": "tool-1", + "name": "bash", + "arguments": "{\"bash\":\"ls\"}", + } + ], + }, + }, + { + "type": "tool", + "duration": 0.2, + "status": "success", + "output": { + "id": "tool-1", + "name": "bash", + "arguments": "{\"bash\":\"ls\"}", + "output": "output/", + }, + }, + { + "type": "model", + "output": { + "text": "Workspace is clean.", + "reasoning": None, + "tool_calls": [], + }, + }, + ] + }, + created_at=datetime.utcnow(), + ) + + replay = build_result_replay_from_node_executions( + outputs={"answer": "Workspace is clean."}, + node_executions=[node_execution], + ) + + assert replay is not None + assert replay["text"] == "Workspace is clean." + assert replay["llm_generation_items"] == [ + { + "type": "text", + "text": "Let me inspect the workspace:", + "text_completed": True, + }, + { + "type": "tool", + "tool_name": "bash", + "tool_arguments": "{\"bash\":\"ls\"}", + "tool_output": "output/", + "tool_duration": 0.2, + }, + { + "type": "text", + "text": "Workspace is clean.", + "text_completed": True, + }, + ] diff --git a/api/tests/unit_tests/core/repositories/test_sqlalchemy_workflow_execution_repository.py b/api/tests/unit_tests/core/repositories/test_sqlalchemy_workflow_execution_repository.py new file mode 100644 index 0000000000..2a2a7b4726 --- /dev/null +++ b/api/tests/unit_tests/core/repositories/test_sqlalchemy_workflow_execution_repository.py @@ -0,0 +1,87 @@ +from unittest.mock import MagicMock +from uuid import uuid4 + +from sqlalchemy.orm import sessionmaker + +from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository +from core.workflow.entities.workflow_execution import WorkflowExecution +from core.workflow.enums import WorkflowExecutionStatus, WorkflowType +from libs.datetime_utils import naive_utc_now +from models import Account, CreatorUserRole, WorkflowRun +from models.enums import WorkflowRunTriggeredFrom + + +def _build_repo() -> SQLAlchemyWorkflowExecutionRepository: + session_factory = MagicMock(spec=sessionmaker) + account = MagicMock(spec=Account) + account.id = str(uuid4()) + account.current_tenant_id = str(uuid4()) + + return SQLAlchemyWorkflowExecutionRepository( + session_factory=session_factory, + user=account, + app_id="test-app-id", + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + ) + + +def test_to_db_model_preserves_result_replay() -> None: + repo = _build_repo() + execution = WorkflowExecution.new( + id_=str(uuid4()), + workflow_id=str(uuid4()), + workflow_type=WorkflowType.WORKFLOW, + workflow_version="1.0", + graph={"nodes": [], "edges": []}, + inputs={"query": "hello"}, + started_at=naive_utc_now(), + ) + execution.outputs = {"answer": "hello"} + execution.result_replay = { + "text": "hello", + "llm_generation_items": [ + {"type": "text", "text": "hello", "text_completed": True}, + {"type": "tool", "tool_name": "bash", "tool_arguments": "ls", "tool_output": "output"}, + ], + "files": [{"var_name": "files", "files": [{"id": "file-1"}]}], + } + execution.status = WorkflowExecutionStatus.SUCCEEDED + + db_model = repo._to_db_model(execution) + + assert db_model.result_replay_dict == execution.result_replay + + +def test_to_domain_model_preserves_result_replay() -> None: + repo = _build_repo() + workflow_run = WorkflowRun( + id=str(uuid4()), + tenant_id=str(uuid4()), + app_id=str(uuid4()), + workflow_id=str(uuid4()), + type=WorkflowType.WORKFLOW.value, + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING.value, + version="1.0", + graph='{"nodes":[],"edges":[]}', + inputs='{"query":"hello"}', + status=WorkflowExecutionStatus.SUCCEEDED, + outputs='{"answer":"hello"}', + result_replay='{"text":"hello","llm_generation_items":[{"type":"tool","tool_name":"bash"}],"files":[]}', + error=None, + elapsed_time=1.2, + total_tokens=5, + total_steps=1, + exceptions_count=0, + created_by_role=CreatorUserRole.ACCOUNT, + created_by=str(uuid4()), + created_at=naive_utc_now(), + finished_at=naive_utc_now(), + ) + + domain_model = repo._to_domain_model(workflow_run) + + assert domain_model.result_replay == { + "text": "hello", + "llm_generation_items": [{"type": "tool", "tool_name": "bash"}], + "files": [], + } diff --git a/api/tests/unit_tests/tasks/test_workflow_execution_tasks.py b/api/tests/unit_tests/tasks/test_workflow_execution_tasks.py new file mode 100644 index 0000000000..b1661de996 --- /dev/null +++ b/api/tests/unit_tests/tasks/test_workflow_execution_tasks.py @@ -0,0 +1,79 @@ +import json +from uuid import uuid4 + +from core.workflow.entities.workflow_execution import WorkflowExecution +from core.workflow.enums import WorkflowExecutionStatus, WorkflowType +from libs.datetime_utils import naive_utc_now +from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom +from tasks.workflow_execution_tasks import _create_workflow_run_from_execution, _update_workflow_run_from_execution + + +def _build_execution() -> WorkflowExecution: + execution = WorkflowExecution.new( + id_=str(uuid4()), + workflow_id=str(uuid4()), + workflow_type=WorkflowType.WORKFLOW, + workflow_version="draft", + graph={"nodes": [], "edges": []}, + inputs={"query": "hello"}, + started_at=naive_utc_now(), + ) + execution.status = WorkflowExecutionStatus.SUCCEEDED + execution.outputs = {"answer": "done"} + execution.result_replay = { + "text": "done", + "llm_generation_items": [ + { + "type": "tool", + "tool_name": "bash", + "tool_arguments": "{\"command\": \"pwd\"}", + "tool_output": "/workspace", + }, + ], + "files": [], + } + execution.finished_at = naive_utc_now() + return execution + + +def test_create_workflow_run_from_execution_persists_result_replay() -> None: + execution = _build_execution() + + workflow_run = _create_workflow_run_from_execution( + execution=execution, + tenant_id=str(uuid4()), + app_id=str(uuid4()), + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + creator_user_id=str(uuid4()), + creator_user_role=CreatorUserRole.ACCOUNT, + ) + + assert json.loads(workflow_run.result_replay or "{}") == execution.result_replay + assert workflow_run.result_replay_dict == execution.result_replay + + +def test_update_workflow_run_from_execution_preserves_existing_replay_until_new_value_arrives() -> None: + execution = _build_execution() + workflow_run = _create_workflow_run_from_execution( + execution=execution, + tenant_id=str(uuid4()), + app_id=str(uuid4()), + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, + creator_user_id=str(uuid4()), + creator_user_role=CreatorUserRole.ACCOUNT, + ) + + previous_result_replay = workflow_run.result_replay + execution.result_replay = None + _update_workflow_run_from_execution(workflow_run, execution) + + assert workflow_run.result_replay == previous_result_replay + + execution.result_replay = { + "text": "updated", + "llm_generation_items": [], + "files": [], + } + _update_workflow_run_from_execution(workflow_run, execution) + + assert workflow_run.result_replay_dict == execution.result_replay diff --git a/web/app/components/workflow/panel/record.tsx b/web/app/components/workflow/panel/record.tsx index ee32cdea2d..c689b3063c 100644 --- a/web/app/components/workflow/panel/record.tsx +++ b/web/app/components/workflow/panel/record.tsx @@ -29,6 +29,7 @@ const Record = () => { runDetailUrl={getWorkflowRunAndTraceUrl(historyWorkflowData?.id).runUrl} tracingListUrl={getWorkflowRunAndTraceUrl(historyWorkflowData?.id).traceUrl} getResultCallback={handleResultCallback} + useFirstRunResultView /> ) diff --git a/web/app/components/workflow/run/index.spec.tsx b/web/app/components/workflow/run/index.spec.tsx new file mode 100644 index 0000000000..517b386fa7 --- /dev/null +++ b/web/app/components/workflow/run/index.spec.tsx @@ -0,0 +1,276 @@ +import type { WorkflowRunDetailResponse } from '@/models/log' +import { render, screen, waitFor } from '@testing-library/react' +import userEvent from '@testing-library/user-event' +import copy from 'copy-to-clipboard' +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { ToastContext } from '@/app/components/base/toast' +import Run from './index' + +type MockGenerationItem = { + id: string + toolName?: string + thoughtOutput?: string + text?: string +} + +type MockFileGroup = { + varName: string +} + +const fetchRunDetailMock = vi.fn() +const fetchTracingListMock = vi.fn() + +vi.mock('react-i18next', () => ({ + useTranslation: () => ({ + t: (key: string) => key, + }), +})) + +vi.mock('copy-to-clipboard', () => ({ + default: vi.fn(), +})) + +vi.mock('@/service/log', () => ({ + fetchRunDetail: (...args: unknown[]) => fetchRunDetailMock(...args), + fetchTracingList: (...args: unknown[]) => fetchTracingListMock(...args), +})) + +vi.mock('../store', () => ({ + useStore: (selector: (state: { isListening: boolean }) => unknown) => selector({ isListening: false }), +})) + +vi.mock('./output-panel', () => ({ + default: () =>
output-panel
, +})) + +vi.mock('./result-panel', () => ({ + default: () =>
result-panel
, +})) + +vi.mock('./status', () => ({ + default: () =>
status-panel
, +})) + +vi.mock('./tracing-panel', () => ({ + default: ({ list }: { list: unknown[] }) =>
{list.length}
, +})) + +vi.mock('./result-text', () => ({ + default: ({ + outputs, + llmGenerationItems, + allFiles, + }: { + outputs?: string + llmGenerationItems?: MockGenerationItem[] + allFiles?: MockFileGroup[] + }) => ( +
+ {outputs || ''} + {(llmGenerationItems || []).map(item => ( +
{item.toolName || item.thoughtOutput || item.text}
+ ))} + {(allFiles || []).map(item => ( +
{item.varName}
+ ))} +
+ ), +})) + +const baseRunDetail: WorkflowRunDetailResponse = { + id: 'run-1', + version: 'draft', + graph: { + nodes: [], + edges: [], + }, + inputs: {}, + inputs_truncated: false, + status: 'succeeded', + outputs: {}, + outputs_truncated: false, + total_steps: 1, + created_by_role: 'account', + created_by_account: { + id: 'account-1', + name: 'Tester', + email: 'tester@example.com', + }, + created_at: 1, + finished_at: 2, +} + +const renderRun = () => { + const notify = vi.fn() + + render( + + + , + ) + + return { notify } +} + +describe('Run history result replay', () => { + beforeEach(() => { + vi.clearAllMocks() + fetchTracingListMock.mockResolvedValue({ data: [] }) + }) + + it('renders workflow history result from result_replay and copies structured content', async () => { + fetchRunDetailMock.mockResolvedValue({ + ...baseRunDetail, + result_replay: { + text: 'I checked the workspace.', + llm_generation_items: [ + { + type: 'text', + text: 'I checked the workspace.', + text_completed: true, + }, + { + type: 'tool', + tool_name: 'bash', + tool_arguments: '{"command":"ls"}', + tool_output: 'output/', + tool_duration: 0.3, + }, + ], + files: [ + { + var_name: 'files', + files: [ + { + related_id: 'file-1', + extension: 'pdf', + filename: 'report.pdf', + size: 128, + mime_type: 'application/pdf', + transfer_method: 'local_file', + type: 'document', + url: 'https://example.com/report.pdf', + upload_file_id: 'upload-file-1', + remote_url: '', + }, + ], + }, + ], + }, + }) + + renderRun() + + await waitFor(() => expect(screen.getByTestId('result-text')).toBeInTheDocument()) + expect(screen.queryByTestId('output-panel')).not.toBeInTheDocument() + expect(screen.getByTestId('result-text')).toHaveAttribute('data-output', 'I checked the workspace.') + expect(screen.getByTestId('result-text')).toHaveAttribute('data-item-count', '2') + expect(screen.getByTestId('result-text')).toHaveAttribute('data-file-count', '1') + + await userEvent.setup().click(screen.getByRole('button', { name: 'operation.copy' })) + + expect(copy).toHaveBeenCalledWith( + expect.stringContaining('[TOOL] bash'), + ) + expect(copy).toHaveBeenCalledWith( + expect.stringContaining('INPUT:\n{"command":"ls"}'), + ) + }) + + it('falls back to generation outputs for old runs without result_replay', async () => { + fetchRunDetailMock.mockResolvedValue({ + ...baseRunDetail, + outputs_as_generation: true, + outputs: { + generation: { + content: 'Hello', + reasoning_content: ['Need to answer'], + tool_calls: [ + { + name: 'bash', + arguments: '{"command":"pwd"}', + result: '/workspace', + elapsed_time: 0.2, + status: 'success', + }, + ], + sequence: [ + { type: 'reasoning', index: 0 }, + { type: 'tool_call', index: 0 }, + { type: 'content', start: 0, end: 5 }, + ], + }, + }, + }) + + renderRun() + + await waitFor(() => expect(screen.getByTestId('result-text')).toBeInTheDocument()) + expect(screen.getByTestId('result-text')).toHaveAttribute('data-output', 'Hello') + expect(screen.getByTestId('result-text')).toHaveAttribute('data-item-count', '3') + expect(screen.getByText('Need to answer')).toBeInTheDocument() + expect(screen.getByText('bash')).toBeInTheDocument() + }) + + it('synthesizes tool items for generation outputs without sequence and does not leak a zero placeholder', async () => { + fetchRunDetailMock.mockResolvedValue({ + ...baseRunDetail, + outputs_as_generation: true, + outputs: { + generation: { + content: 'Workspace is clean.', + reasoning_content: [], + tool_calls: [ + { + name: 'bash', + arguments: '{"command":"ls"}', + output: 'output/', + elapsed_time: 0.2, + status: 'success', + }, + ], + sequence: [], + }, + }, + }) + + renderRun() + + await waitFor(() => expect(screen.getByTestId('result-text')).toBeInTheDocument()) + expect(screen.getByTestId('result-text')).toHaveAttribute('data-output', 'Workspace is clean.') + expect(screen.getByTestId('result-text')).toHaveAttribute('data-item-count', '2') + expect(screen.getByText('bash')).toBeInTheDocument() + expect(screen.queryByText(/^0$/)).not.toBeInTheDocument() + }) + + it('keeps default output-panel behavior when history replay mode is disabled', async () => { + fetchRunDetailMock.mockResolvedValue({ + ...baseRunDetail, + outputs: { + answer: 'Hello', + }, + }) + + const notify = vi.fn() + render( + + + , + ) + + await waitFor(() => expect(screen.getByTestId('output-panel')).toBeInTheDocument()) + expect(screen.queryByTestId('result-text')).not.toBeInTheDocument() + }) +}) diff --git a/web/app/components/workflow/run/index.tsx b/web/app/components/workflow/run/index.tsx index a39b695d1f..815b930a4d 100644 --- a/web/app/components/workflow/run/index.tsx +++ b/web/app/components/workflow/run/index.tsx @@ -1,10 +1,17 @@ 'use client' import type { FC } from 'react' -import type { WorkflowRunDetailResponse } from '@/models/log' -import type { NodeTracing } from '@/types/workflow' +import type { + WorkflowRunDetailResponse, + WorkflowRunReplayGenerationItem, + WorkflowRunResultReplay, +} from '@/models/log' +import type { LLMGenerationItem, NodeTracing } from '@/types/workflow' +import copy from 'copy-to-clipboard' import { useCallback, useEffect, useMemo, useRef, useState } from 'react' import { useTranslation } from 'react-i18next' import { useContext } from 'use-context-selector' +import Button from '@/app/components/base/button' +import { getFilesInLogs, getProcessedFilesFromResponse } from '@/app/components/base/file-uploader/utils' import Loading from '@/app/components/base/loading' import { ToastContext } from '@/app/components/base/toast' import { WorkflowRunningStatus } from '@/app/components/workflow/types' @@ -13,15 +20,343 @@ import { cn } from '@/utils/classnames' import { useStore } from '../store' import OutputPanel from './output-panel' import ResultPanel from './result-panel' +import ResultText from './result-text' import StatusPanel from './status' import TracingPanel from './tracing-panel' +type GenerationToolCall = { + name?: string + arguments?: string + result?: Record | string + output?: Record | string + elapsed_time?: number + time_cost?: number + icon?: string | { background: string, content: string } + icon_dark?: string | { background: string, content: string } + tool_icon?: string | { background: string, content: string } + tool_icon_dark?: string | { background: string, content: string } + files?: unknown[] + tool_files?: unknown[] + error?: string | null + status?: string +} + +type GenerationSequenceSegment = { + type: 'content' | 'reasoning' | 'tool_call' + start?: number + end?: number + index?: number +} + +type GenerationPayload = { + content?: string + reasoning_content?: string[] + tool_calls?: GenerationToolCall[] + sequence?: GenerationSequenceSegment[] +} + +type HistoryResultView = { + resultText?: string + llmGenerationItems: LLMGenerationItem[] + allFiles: ReturnType + copyContent: string +} + export type RunProps = { hideResult?: boolean activeTab?: 'RESULT' | 'DETAIL' | 'TRACING' getResultCallback?: (result: WorkflowRunDetailResponse) => void runDetailUrl: string tracingListUrl: string + useFirstRunResultView?: boolean +} + +const stringifyCopyValue = (value: unknown) => { + if (typeof value === 'string') + return value + + if (value === null || typeof value === 'undefined') + return '' + + try { + return JSON.stringify(value, null, 2) + } + catch { + return String(value) + } +} + +const buildCopyContentFromLLMGenerationItems = (llmGenerationItems?: LLMGenerationItem[]) => { + if (!llmGenerationItems?.length) + return '' + + const hasStructuredItems = llmGenerationItems.some(item => item.type !== 'text') + if (!hasStructuredItems) + return '' + + return llmGenerationItems + .map((item) => { + if (item.type === 'text') + return item.text || '' + + if (item.type === 'thought') + return item.thoughtOutput ? `[THOUGHT]\n${item.thoughtOutput}` : '' + + if (item.type === 'tool') { + const sections = [ + `[TOOL] ${item.toolName || ''}`.trim(), + ] + + if (item.toolArguments) + sections.push(`INPUT:\n${stringifyCopyValue(item.toolArguments)}`) + if (typeof item.toolOutput !== 'undefined') + sections.push(`OUTPUT:\n${stringifyCopyValue(item.toolOutput)}`) + if (item.toolError) + sections.push(`ERROR:\n${item.toolError}`) + + return sections.join('\n') + } + + return '' + }) + .filter(Boolean) + .join('\n\n') +} + +const isRecord = (value: unknown): value is Record => { + return !!value && typeof value === 'object' && !Array.isArray(value) +} + +const getSingleStringOutput = (outputs?: WorkflowRunDetailResponse['outputs']) => { + if (!isRecord(outputs)) + return undefined + + const entries = Object.entries(outputs) + if (entries.length !== 1) + return undefined + + const [, value] = entries[0] + return typeof value === 'string' ? value : undefined +} + +const getSingleGenerationOutput = (outputs?: WorkflowRunDetailResponse['outputs']) => { + if (!isRecord(outputs)) + return undefined + + const values = Object.values(outputs) + if (values.length !== 1) + return undefined + + const [value] = values + if (!isRecord(value)) + return undefined + + return value as GenerationPayload +} + +const buildGenerationItemsFromPayload = (generation?: GenerationPayload): { + resultText?: string + llmGenerationItems: LLMGenerationItem[] +} => { + if (!generation) + return { resultText: undefined, llmGenerationItems: [] } + + const resultText = typeof generation.content === 'string' ? generation.content : undefined + const reasoningContent = Array.isArray(generation.reasoning_content) ? generation.reasoning_content : [] + const toolCalls = Array.isArray(generation.tool_calls) ? generation.tool_calls : [] + const sequence = Array.isArray(generation.sequence) ? generation.sequence : [] + const llmGenerationItems: LLMGenerationItem[] = [] + const appendSyntheticToolItem = (toolCall: GenerationToolCall, index: number) => { + const toolOutput = typeof toolCall.result !== 'undefined' ? toolCall.result : toolCall.output + + llmGenerationItems.push({ + id: `generation-tool-${index}`, + type: 'tool', + toolName: toolCall.name, + toolArguments: toolCall.arguments, + toolOutput, + toolDuration: toolCall.elapsed_time ?? toolCall.time_cost, + toolIcon: toolCall.icon ?? toolCall.tool_icon, + toolIconDark: toolCall.icon_dark ?? toolCall.tool_icon_dark, + toolFiles: Array.isArray(toolCall.files) ? toolCall.files : toolCall.tool_files, + toolError: toolCall.status === 'error' + ? (toolCall.error || stringifyCopyValue(toolOutput) || 'error') + : undefined, + }) + } + + sequence.forEach((segment, index) => { + if (segment.type === 'content') { + const start = typeof segment.start === 'number' ? segment.start : 0 + const end = typeof segment.end === 'number' ? segment.end : start + const text = resultText?.substring(start, end) + if (text?.trim()) { + llmGenerationItems.push({ + id: `generation-text-${index}`, + type: 'text', + text, + textCompleted: true, + }) + } + return + } + + if (segment.type === 'reasoning') { + const reasoning = typeof segment.index === 'number' ? reasoningContent[segment.index] : undefined + if (reasoning) { + llmGenerationItems.push({ + id: `generation-thought-${index}`, + type: 'thought', + thoughtOutput: reasoning, + thoughtCompleted: true, + }) + } + return + } + + if (segment.type === 'tool_call') { + const toolCall = typeof segment.index === 'number' ? toolCalls[segment.index] : undefined + if (!toolCall) + return + + appendSyntheticToolItem(toolCall, index) + } + }) + + if (!llmGenerationItems.length && (reasoningContent.length || toolCalls.length || resultText)) { + const syntheticSegmentCount = Math.max(reasoningContent.length, toolCalls.length) + + for (let i = 0; i < syntheticSegmentCount; i += 1) { + const reasoning = reasoningContent[i] + if (reasoning) { + llmGenerationItems.push({ + id: `generation-thought-${i}`, + type: 'thought', + thoughtOutput: reasoning, + thoughtCompleted: true, + }) + } + + const toolCall = toolCalls[i] + if (toolCall) + appendSyntheticToolItem(toolCall, i) + } + + if (resultText) { + llmGenerationItems.push({ + id: 'generation-text-final', + type: 'text', + text: resultText, + textCompleted: true, + }) + } + } + + return { + resultText, + llmGenerationItems, + } +} + +const buildGenerationItemsFromReplay = (items?: WorkflowRunReplayGenerationItem[]) => { + if (!items?.length) + return [] + + return items + .filter((item) => { + if (item.type === 'thought') + return !!item.thought_output + + if (item.type === 'tool') + return !!(item.tool_name || item.tool_arguments || item.tool_output || item.tool_error || item.tool_files?.length) + + if (item.type === 'text') + return typeof item.text === 'string' && item.text.length > 0 + + return true + }) + .map((item, index) => ({ + id: `replay-item-${index}`, + type: item.type, + text: item.text, + textCompleted: item.text_completed, + thoughtOutput: item.thought_output, + thoughtCompleted: item.thought_completed, + toolName: item.tool_name, + toolArguments: item.tool_arguments, + toolOutput: item.tool_output, + toolFiles: item.tool_files, + toolError: item.tool_error, + toolDuration: item.tool_duration, + toolIcon: item.tool_icon, + toolIconDark: item.tool_icon_dark, + } satisfies LLMGenerationItem)) +} + +const buildAllFilesFromReplay = (resultReplay?: WorkflowRunResultReplay | null) => { + if (!resultReplay?.files?.length) + return [] + + return resultReplay.files + .map(fileGroup => ({ + varName: fileGroup.var_name, + list: getProcessedFilesFromResponse(fileGroup.files || []), + })) + .filter(fileGroup => fileGroup.list.length > 0) +} + +const buildHistoryResultView = (runDetail?: WorkflowRunDetailResponse): HistoryResultView => { + if (!runDetail) { + return { + resultText: undefined, + llmGenerationItems: [], + allFiles: [], + copyContent: '', + } + } + + if (runDetail.result_replay) { + const llmGenerationItems = buildGenerationItemsFromReplay(runDetail.result_replay.llm_generation_items) + const resultText = runDetail.result_replay.text + const copyContent = buildCopyContentFromLLMGenerationItems(llmGenerationItems) || resultText || '' + + return { + resultText, + llmGenerationItems, + allFiles: buildAllFilesFromReplay(runDetail.result_replay), + copyContent, + } + } + + const allFiles = isRecord(runDetail.outputs) ? getFilesInLogs(runDetail.outputs) : [] + + const singleTextOutput = getSingleStringOutput(runDetail.outputs) + if (singleTextOutput) { + return { + resultText: singleTextOutput, + llmGenerationItems: [], + allFiles, + copyContent: singleTextOutput, + } + } + + if (runDetail.outputs_as_generation) { + const generationPayload = getSingleGenerationOutput(runDetail.outputs) + const { resultText, llmGenerationItems } = buildGenerationItemsFromPayload(generationPayload) + return { + resultText, + llmGenerationItems, + allFiles, + copyContent: buildCopyContentFromLLMGenerationItems(llmGenerationItems) || resultText || '', + } + } + + return { + resultText: undefined, + llmGenerationItems: [], + allFiles, + copyContent: '', + } } const RunPanel: FC = ({ @@ -30,6 +365,7 @@ const RunPanel: FC = ({ getResultCallback, runDetailUrl, tracingListUrl, + useFirstRunResultView = false, }) => { const { t } = useTranslation() const { notify } = useContext(ToastContext) @@ -47,6 +383,11 @@ const RunPanel: FC = ({ return 'N/A' }, [runDetail]) + const historyResultView = useMemo( + () => buildHistoryResultView(runDetail), + [runDetail], + ) + const getResult = useCallback(async () => { try { const res = await fetchRunDetail(runDetailUrl) @@ -100,10 +441,9 @@ const RunPanel: FC = ({ }, [isListening]) useEffect(() => { - // fetch data if (runDetailUrl && tracingListUrl) getData() - }, [runDetailUrl, tracingListUrl]) + }, [getData, runDetailUrl, tracingListUrl]) const [height, setHeight] = useState(0) const ref = useRef(null) @@ -119,7 +459,6 @@ const RunPanel: FC = ({ return (
- {/* tab */}
{!hideResult && (
= ({ {t('tracing', { ns: 'runLog' })}
- {/* panel detail */} -
+
{loading && (
)} - {!loading && currentTab === 'RESULT' && runDetail && ( + {!loading && currentTab === 'RESULT' && runDetail && !useFirstRunResultView && ( )} + {!loading && currentTab === 'RESULT' && runDetail && useFirstRunResultView && ( +
+ { void switchTab('DETAIL') }} + /> + {runDetail.status !== WorkflowRunningStatus.Running && historyResultView.copyContent && ( + + )} +
+ )} {!loading && currentTab === 'DETAIL' && runDetail && ( = ({ )} {!loading && currentTab === 'TRACING' && ( )} diff --git a/web/app/components/workflow/run/result-text.tsx b/web/app/components/workflow/run/result-text.tsx index ef736848de..c3a15b5428 100644 --- a/web/app/components/workflow/run/result-text.tsx +++ b/web/app/components/workflow/run/result-text.tsx @@ -29,7 +29,7 @@ const ResultText: FC = ({ allFiles, }) => { const { t } = useTranslation() - const generationContentRenderIsUsed = llmGenerationItems?.length && llmGenerationItems.some((item) => { + const generationContentRenderIsUsed = !!llmGenerationItems?.length && llmGenerationItems.some((item) => { return item.type === 'tool' || item.type === 'thought' }) diff --git a/web/models/log.ts b/web/models/log.ts index ab1282b8af..2b14e68e71 100644 --- a/web/models/log.ts +++ b/web/models/log.ts @@ -286,6 +286,39 @@ export type WorkflowLogsRequest = { limit: number // The default value is 20 and the range is 1-100 } +export type WorkflowRunReplayFileGroup = { + var_name: string + files: FileResponse[] +} + +export type WorkflowRunReplayGenerationItem = { + type: 'text' | 'thought' | 'tool' + text?: string + text_completed?: boolean + thought_output?: string + thought_completed?: boolean + tool_name?: string + tool_arguments?: string + tool_output?: Record | string + tool_files?: unknown[] + tool_error?: string + tool_duration?: number + tool_icon?: string | { + background: string + content: string + } + tool_icon_dark?: string | { + background: string + content: string + } +} + +export type WorkflowRunResultReplay = { + text?: string + llm_generation_items?: WorkflowRunReplayGenerationItem[] + files?: WorkflowRunReplayFileGroup[] +} + export type WorkflowRunDetailResponse = { id: string version: string @@ -294,10 +327,12 @@ export type WorkflowRunDetailResponse = { edges: Edge[] viewport?: Viewport } - inputs: string + inputs: Record | string inputs_truncated: boolean - status: 'running' | 'succeeded' | 'failed' | 'stopped' - outputs?: string + status: 'running' | 'succeeded' | 'failed' | 'stopped' | 'paused' | 'partial-succeeded' + outputs?: Record | string + outputs_as_generation?: boolean + result_replay?: WorkflowRunResultReplay | null outputs_truncated: boolean outputs_full_content?: { download_url: string