fix: replay workflow history results with first-run rendering

This commit is contained in:
Chen Yefan 2026-03-24 02:14:32 +08:00
parent 2cbc8da9cb
commit 7f3ef542cb
17 changed files with 1771 additions and 13 deletions

View File

@ -85,6 +85,8 @@ workflow_run_fields = {
"status": WorkflowRunStatusField,
"inputs": fields.Raw,
"outputs": WorkflowRunOutputsField,
"outputs_as_generation": fields.Boolean,
"result_replay": fields.Raw(attribute="result_replay_dict"),
"error": fields.String,
"total_steps": fields.Integer,
"total_tokens": fields.Integer,

View File

@ -15,6 +15,7 @@ from datetime import datetime
from typing import Any, Union
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity
from core.app.workflow.result_replay import WorkflowResultReplayBuilder, build_result_replay_from_node_executions
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID
@ -40,6 +41,7 @@ from core.workflow.graph_events import (
NodeRunPauseRequestedEvent,
NodeRunRetryEvent,
NodeRunStartedEvent,
NodeRunStreamChunkEvent,
NodeRunSucceededEvent,
)
from core.workflow.node_events import NodeRunResult
@ -94,6 +96,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._node_execution_cache: dict[str, WorkflowNodeExecution] = {}
self._node_snapshots: dict[str, _NodeRuntimeSnapshot] = {}
self._node_sequence: int = 0
self._result_replay_builder = WorkflowResultReplayBuilder()
# ------------------------------------------------------------------
# GraphEngineLayer lifecycle
@ -103,6 +106,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._node_execution_cache.clear()
self._node_snapshots.clear()
self._node_sequence = 0
self._result_replay_builder = WorkflowResultReplayBuilder()
def on_event(self, event: GraphEngineEvent) -> None:
if isinstance(event, GraphRunStartedEvent):
@ -129,6 +133,10 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
self._handle_graph_run_paused(event)
return
if isinstance(event, NodeRunStreamChunkEvent):
self._handle_node_stream_chunk(event)
return
if isinstance(event, NodeRunStartedEvent):
self._handle_node_started(event)
return
@ -176,6 +184,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
def _handle_graph_run_succeeded(self, event: GraphRunSucceededEvent) -> None:
execution = self._get_workflow_execution()
execution.outputs = event.outputs
execution.result_replay = self._build_result_replay(event.outputs)
execution.status = WorkflowExecutionStatus.SUCCEEDED
self._populate_completion_statistics(execution)
@ -185,6 +194,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
def _handle_graph_run_partial_succeeded(self, event: GraphRunPartialSucceededEvent) -> None:
execution = self._get_workflow_execution()
execution.outputs = event.outputs
execution.result_replay = self._build_result_replay(event.outputs)
execution.status = WorkflowExecutionStatus.PARTIAL_SUCCEEDED
execution.exceptions_count = event.exceptions_count
self._populate_completion_statistics(execution)
@ -194,6 +204,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
def _handle_graph_run_failed(self, event: GraphRunFailedEvent) -> None:
execution = self._get_workflow_execution()
execution.result_replay = self._build_result_replay(execution.outputs)
execution.status = WorkflowExecutionStatus.FAILED
execution.error_message = event.error
execution.exceptions_count = event.exceptions_count
@ -205,6 +216,8 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
def _handle_graph_run_aborted(self, event: GraphRunAbortedEvent) -> None:
execution = self._get_workflow_execution()
execution.outputs = event.outputs
execution.result_replay = self._build_result_replay(event.outputs)
execution.status = WorkflowExecutionStatus.STOPPED
execution.error_message = event.reason or "Workflow execution aborted"
self._populate_completion_statistics(execution)
@ -217,6 +230,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
execution = self._get_workflow_execution()
execution.status = WorkflowExecutionStatus.PAUSED
execution.outputs = event.outputs
execution.result_replay = self._build_result_replay(event.outputs)
self._populate_completion_statistics(execution, update_finished=False)
self._workflow_execution_repository.save(execution)
@ -262,6 +276,9 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
)
self._node_snapshots[event.id] = snapshot
def _handle_node_stream_chunk(self, event: NodeRunStreamChunkEvent) -> None:
self._result_replay_builder.add_stream_chunk(event)
def _handle_node_retry(self, event: NodeRunRetryEvent) -> None:
domain_execution = self._get_node_execution(event.id)
domain_execution.status = WorkflowNodeExecutionStatus.RETRY
@ -324,6 +341,14 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
handled = WorkflowEntry.handle_special_values(inputs)
return handled or {}
def _build_result_replay(self, outputs: Mapping[str, Any] | None) -> Mapping[str, Any] | None:
replay = self._result_replay_builder.build(outputs)
has_structured_items = bool(replay and replay.get("llm_generation_items"))
if has_structured_items:
return replay
return build_result_replay_from_node_executions(outputs, self._node_execution_cache.values()) or replay
def _get_workflow_execution(self) -> WorkflowExecution:
if self._workflow_execution is None:
raise ValueError("workflow execution not initialized")

View File

@ -0,0 +1,546 @@
from __future__ import annotations
import json
from collections.abc import Iterable, Mapping, Sequence
from copy import deepcopy
from operator import itemgetter
from typing import Any
from core.file import FILE_MODEL_IDENTITY, File
from core.variables.segments import ArrayFileSegment, FileSegment, Segment
from core.workflow.entities.tool_entities import ToolResultStatus
from core.workflow.enums import WorkflowNodeExecutionMetadataKey
from core.workflow.graph_events import ChunkType, NodeRunStreamChunkEvent
class WorkflowResultReplayBuilder:
"""Build a persisted replay payload for workflow result rendering."""
def __init__(self) -> None:
self._text = ""
self._items: list[dict[str, Any]] = []
self._active_thought_index: int | None = None
self._active_tool_key: str | None = None
self._tool_indexes: dict[str, int] = {}
def add_stream_chunk(self, event: NodeRunStreamChunkEvent) -> None:
if _is_empty_terminal_stream_event(event):
return
if event.chunk_type == ChunkType.TEXT:
self._append_text(event.chunk)
return
if event.chunk_type == ChunkType.THOUGHT_START:
self._close_open_text_item()
self._active_thought_index = len(self._items)
self._items.append({
"type": "thought",
"thought_output": event.chunk or "",
"thought_completed": False,
})
return
if event.chunk_type == ChunkType.THOUGHT:
thought_index = self._ensure_active_thought()
self._items[thought_index]["thought_output"] += event.chunk or ""
return
if event.chunk_type == ChunkType.THOUGHT_END:
thought_index = self._ensure_active_thought()
self._items[thought_index]["thought_output"] += event.chunk or ""
self._items[thought_index]["thought_completed"] = True
self._active_thought_index = None
return
if event.chunk_type == ChunkType.TOOL_CALL:
self._close_open_text_item()
tool_call = event.tool_call
tool_key = (tool_call.id if tool_call else None) or f"tool-{len(self._items)}"
tool_index = self._tool_indexes.get(tool_key)
if tool_index is None:
self._items.append({
"type": "tool",
"tool_name": tool_call.name if tool_call else None,
"tool_arguments": tool_call.arguments if tool_call else None,
"tool_icon": tool_call.icon if tool_call else None,
"tool_icon_dark": tool_call.icon_dark if tool_call else None,
})
tool_index = len(self._items) - 1
self._tool_indexes[tool_key] = tool_index
else:
payload = self._items[tool_index]
if tool_call:
if tool_call.name:
payload["tool_name"] = tool_call.name
if tool_call.arguments is not None:
payload["tool_arguments"] = tool_call.arguments
if tool_call.icon is not None:
payload["tool_icon"] = tool_call.icon
if tool_call.icon_dark is not None:
payload["tool_icon_dark"] = tool_call.icon_dark
self._active_tool_key = tool_key
return
if event.chunk_type == ChunkType.TOOL_RESULT:
tool_result = event.tool_result
tool_key = (tool_result.id if tool_result else None) or self._active_tool_key or f"tool-{len(self._items)}"
tool_index = self._tool_indexes.get(tool_key)
if tool_index is None:
self._items.append({
"type": "tool",
"tool_name": tool_result.name if tool_result else None,
})
tool_index = len(self._items) - 1
self._tool_indexes[tool_key] = tool_index
payload = self._items[tool_index]
if tool_result:
if tool_result.name:
payload["tool_name"] = tool_result.name
if tool_result.output is not None:
payload["tool_output"] = tool_result.output
if tool_result.files:
payload["tool_files"] = [_normalize_file_like(file) for file in tool_result.files]
if tool_result.elapsed_time is not None:
payload["tool_duration"] = tool_result.elapsed_time
if tool_result.icon is not None:
payload["tool_icon"] = tool_result.icon
if tool_result.icon_dark is not None:
payload["tool_icon_dark"] = tool_result.icon_dark
if tool_result.status == ToolResultStatus.ERROR:
payload["tool_error"] = tool_result.output or "error"
elif "tool_error" in payload:
payload.pop("tool_error", None)
self._active_tool_key = tool_key
def _append_text(self, chunk: str) -> None:
self._text += chunk or ""
if self._items and self._items[-1].get("type") == "text" and not self._items[-1].get("text_completed", False):
self._items[-1]["text"] += chunk or ""
return
self._items.append({
"type": "text",
"text": chunk or "",
"text_completed": False,
})
def _close_open_text_item(self) -> None:
if self._items and self._items[-1].get("type") == "text":
self._items[-1]["text_completed"] = True
def _ensure_active_thought(self) -> int:
if self._active_thought_index is not None:
return self._active_thought_index
self._close_open_text_item()
self._items.append({
"type": "thought",
"thought_output": "",
"thought_completed": False,
})
self._active_thought_index = len(self._items) - 1
return self._active_thought_index
def build(self, outputs: Mapping[str, Any] | None = None) -> dict[str, Any] | None:
replay_text = self._text or _get_single_output_text(outputs)
items = deepcopy(self._items)
if not items:
generation = _get_single_generation_output(outputs)
if generation:
generation_text, generation_items = _build_generation_items_from_payload(generation)
replay_text = replay_text or generation_text
items = generation_items
for item in items:
if item.get("type") == "text":
item["text_completed"] = True
if item.get("type") == "thought":
item["thought_completed"] = True
files = _group_files_by_output_var(outputs)
if not replay_text and not items and not files:
return None
return {
"text": replay_text,
"llm_generation_items": items,
"files": files,
}
def build_result_replay_from_node_executions(
outputs: Mapping[str, Any] | None,
node_executions: Iterable[Any],
) -> dict[str, Any] | None:
preferred_text = _get_single_output_text(outputs)
generation = _get_single_generation_output(outputs)
if not preferred_text and generation:
preferred_text = generation.get("content") if isinstance(generation.get("content"), str) else ""
files = _group_files_by_output_var(outputs)
candidates: list[tuple[int, int, str, Sequence[Mapping[str, Any]]]] = []
for node_execution in node_executions:
metadata = getattr(node_execution, "metadata", None) or {}
if not isinstance(metadata, Mapping):
continue
llm_trace = metadata.get(WorkflowNodeExecutionMetadataKey.LLM_TRACE) or metadata.get(
WorkflowNodeExecutionMetadataKey.LLM_TRACE.value
)
if not isinstance(llm_trace, Sequence):
continue
node_outputs = getattr(node_execution, "outputs", None) or {}
if not isinstance(node_outputs, Mapping):
continue
node_generation = (
_get_single_generation_output({"generation": node_outputs.get("generation")})
if node_outputs.get("generation")
else None
)
node_text = node_outputs.get("text")
if not isinstance(node_text, str) and node_generation:
node_text = node_generation.get("content")
if not isinstance(node_text, str):
node_text = ""
score = 0
if preferred_text and node_text:
if preferred_text == node_text:
score = 3
elif preferred_text in node_text or node_text in preferred_text:
score = 2
elif node_text:
score = 1
candidates.append((score, int(getattr(node_execution, "index", 0) or 0), node_text, llm_trace))
if not candidates:
return None
candidates.sort(key=itemgetter(0, 1), reverse=True)
for _, _, node_text, llm_trace in candidates:
replay = _build_result_replay_from_llm_trace(
llm_trace=llm_trace,
preferred_text=preferred_text or node_text,
files=files,
)
if replay:
return replay
return None
def _get_single_output_text(outputs: Mapping[str, Any] | None) -> str:
if not outputs or len(outputs) != 1:
return ""
value = next(iter(outputs.values()))
return value if isinstance(value, str) else ""
def _get_single_generation_output(outputs: Mapping[str, Any] | None) -> Mapping[str, Any] | None:
if not outputs or len(outputs) != 1:
return None
value = next(iter(outputs.values()))
return value if isinstance(value, Mapping) else None
def _build_generation_items_from_payload(generation: Mapping[str, Any]) -> tuple[str, list[dict[str, Any]]]:
result_text = generation.get("content") if isinstance(generation.get("content"), str) else ""
reasoning_content = (
generation.get("reasoning_content") if isinstance(generation.get("reasoning_content"), list) else []
)
tool_calls = generation.get("tool_calls") if isinstance(generation.get("tool_calls"), list) else []
sequence = generation.get("sequence") if isinstance(generation.get("sequence"), list) else []
items: list[dict[str, Any]] = []
def append_tool(tool_call: Mapping[str, Any], index: int) -> None:
tool_output = tool_call.get("result") if "result" in tool_call else tool_call.get("output")
payload: dict[str, Any] = {
"type": "tool",
"tool_name": tool_call.get("name"),
"tool_arguments": tool_call.get("arguments"),
"tool_output": tool_output,
"tool_duration": tool_call.get("elapsed_time") or tool_call.get("time_cost"),
}
if tool_call.get("icon") is not None or tool_call.get("tool_icon") is not None:
payload["tool_icon"] = tool_call.get("icon") or tool_call.get("tool_icon")
if tool_call.get("icon_dark") is not None or tool_call.get("tool_icon_dark") is not None:
payload["tool_icon_dark"] = tool_call.get("icon_dark") or tool_call.get("tool_icon_dark")
if isinstance(tool_call.get("files"), list):
payload["tool_files"] = [_normalize_file_like(file) for file in tool_call.get("files", [])]
elif isinstance(tool_call.get("tool_files"), list):
payload["tool_files"] = [_normalize_file_like(file) for file in tool_call.get("tool_files", [])]
if tool_call.get("status") == "error":
payload["tool_error"] = tool_call.get("error") or stringify_copy_value(tool_output) or "error"
items.append(payload)
for segment in sequence:
if not isinstance(segment, Mapping):
continue
if segment.get("type") == "content":
start = segment.get("start", 0)
end = segment.get("end", start)
if isinstance(start, int) and isinstance(end, int):
text = result_text[start:end]
if text.strip():
items.append({"type": "text", "text": text, "text_completed": True})
elif segment.get("type") == "reasoning":
index = segment.get("index")
if isinstance(index, int) and index < len(reasoning_content) and isinstance(reasoning_content[index], str):
items.append({
"type": "thought",
"thought_output": reasoning_content[index],
"thought_completed": True,
})
elif segment.get("type") == "tool_call":
index = segment.get("index")
if isinstance(index, int) and index < len(tool_calls) and isinstance(tool_calls[index], Mapping):
append_tool(tool_calls[index], index)
if not items and (reasoning_content or tool_calls or result_text):
synthetic_count = max(len(reasoning_content), len(tool_calls))
for index in range(synthetic_count):
if (
index < len(reasoning_content)
and isinstance(reasoning_content[index], str)
and reasoning_content[index]
):
items.append({
"type": "thought",
"thought_output": reasoning_content[index],
"thought_completed": True,
})
if index < len(tool_calls) and isinstance(tool_calls[index], Mapping):
append_tool(tool_calls[index], index)
if result_text:
items.append({"type": "text", "text": result_text, "text_completed": True})
return result_text, items
def _build_result_replay_from_llm_trace(
*,
llm_trace: Sequence[Mapping[str, Any]],
preferred_text: str,
files: list[dict[str, Any]],
) -> dict[str, Any] | None:
items: list[dict[str, Any]] = []
tool_indexes: dict[str, int] = {}
for segment in llm_trace:
if not isinstance(segment, Mapping):
continue
segment_type = segment.get("type")
output = segment.get("output") if isinstance(segment.get("output"), Mapping) else {}
if segment_type == "model":
reasoning = output.get("reasoning")
if isinstance(reasoning, str) and reasoning:
items.append({
"type": "thought",
"thought_output": reasoning,
"thought_completed": True,
})
text = output.get("text")
if isinstance(text, str) and text:
items.append({
"type": "text",
"text": text,
"text_completed": True,
})
tool_calls = output.get("tool_calls")
if isinstance(tool_calls, list):
for tool_call in tool_calls:
if not isinstance(tool_call, Mapping):
continue
tool_id = str(tool_call.get("id") or f"tool-{len(tool_indexes)}")
items.append({
"type": "tool",
"tool_name": tool_call.get("name"),
"tool_arguments": tool_call.get("arguments"),
})
tool_indexes[tool_id] = len(items) - 1
elif segment_type == "tool":
tool_id = str(output.get("id") or f"tool-{len(tool_indexes)}")
tool_index = tool_indexes.get(tool_id)
if tool_index is None:
items.append({
"type": "tool",
"tool_name": output.get("name"),
})
tool_index = len(items) - 1
tool_indexes[tool_id] = tool_index
payload = items[tool_index]
payload["tool_name"] = output.get("name") or payload.get("tool_name")
payload["tool_arguments"] = output.get("arguments") or payload.get("tool_arguments")
payload["tool_output"] = output.get("output")
payload["tool_duration"] = segment.get("duration")
if segment.get("icon") is not None:
payload["tool_icon"] = segment.get("icon")
if segment.get("icon_dark") is not None:
payload["tool_icon_dark"] = segment.get("icon_dark")
if isinstance(output.get("files"), list):
payload["tool_files"] = [_normalize_file_like(file) for file in output.get("files", [])]
if segment.get("status") == "error":
payload["tool_error"] = segment.get("error") or stringify_copy_value(output.get("output")) or "error"
if not items and not preferred_text and not files:
return None
combined_text = "".join(
item.get("text", "")
for item in items
if item.get("type") == "text" and isinstance(item.get("text"), str)
)
if (
preferred_text
and preferred_text != combined_text
and preferred_text not in combined_text
and combined_text not in preferred_text
):
items.append({
"type": "text",
"text": preferred_text,
"text_completed": True,
})
return {
"text": preferred_text or combined_text,
"llm_generation_items": items,
"files": files,
}
def _is_empty_terminal_stream_event(event: NodeRunStreamChunkEvent) -> bool:
if not event.is_final:
return False
if event.chunk_type == ChunkType.TEXT:
return not event.chunk
if event.chunk_type in {ChunkType.THOUGHT, ChunkType.THOUGHT_START, ChunkType.THOUGHT_END}:
return not event.chunk
if event.chunk_type == ChunkType.TOOL_CALL:
tool_call = event.tool_call
if not tool_call:
return True
return not tool_call.id and not tool_call.name and not tool_call.arguments and not event.chunk
if event.chunk_type == ChunkType.TOOL_RESULT:
tool_result = event.tool_result
if not tool_result:
return True
return (
not tool_result.id
and not tool_result.name
and not tool_result.output
and not tool_result.files
and not event.chunk
)
return False
def _group_files_by_output_var(outputs: Mapping[str, Any] | None) -> list[dict[str, Any]]:
if not outputs:
return []
result: list[dict[str, Any]] = []
for key, value in outputs.items():
files = _fetch_files_from_variable_value(value)
if files:
result.append({
"var_name": key,
"files": list(files),
})
return result
def _fetch_files_from_variable_value(
value: dict[str, Any] | list[Any] | Segment | File | None,
) -> Sequence[Mapping[str, Any]]:
if not value:
return []
files: list[Mapping[str, Any]] = []
if isinstance(value, FileSegment):
files.append(value.value.to_dict())
elif isinstance(value, ArrayFileSegment):
files.extend([item.to_dict() for item in value.value])
elif isinstance(value, File):
files.append(value.to_dict())
elif isinstance(value, list):
for item in value:
file = _get_file_var_from_value(item)
if file:
files.append(file)
elif isinstance(value, dict):
file = _get_file_var_from_value(value)
if file:
files.append(file)
return files
def _get_file_var_from_value(value: Any) -> Mapping[str, Any] | None:
if not value:
return None
if isinstance(value, dict) and value.get("dify_model_identity") == FILE_MODEL_IDENTITY:
return value
if isinstance(value, File):
return value.to_dict()
return None
def _normalize_file_like(value: Any) -> Any:
if isinstance(value, File):
return value.to_dict()
if isinstance(value, Mapping):
return dict(value)
return value
def stringify_copy_value(value: Any) -> str:
if isinstance(value, str):
return value
if value is None:
return ""
try:
return str(value) if not isinstance(value, (Mapping, list)) else json.dumps(value, ensure_ascii=False)
except Exception:
return str(value)

View File

@ -109,6 +109,7 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
graph=graph,
inputs=inputs,
outputs=outputs,
result_replay=db_model.result_replay_dict,
status=status,
error_message=db_model.error or "",
total_tokens=db_model.total_tokens,
@ -155,6 +156,11 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
if domain_model.outputs
else None
)
db_model.result_replay = (
json.dumps(WorkflowRuntimeTypeConverter().to_json_encodable(domain_model.result_replay))
if domain_model.result_replay is not None
else None
)
db_model.status = domain_model.status
db_model.error = domain_model.error_message or None
db_model.total_tokens = domain_model.total_tokens

View File

@ -31,6 +31,7 @@ class WorkflowExecution(BaseModel):
inputs: Mapping[str, Any] = Field(...)
outputs: Mapping[str, Any] | None = None
result_replay: Mapping[str, Any] | None = None
status: WorkflowExecutionStatus = WorkflowExecutionStatus.RUNNING
error_message: str = Field(default="")

View File

@ -95,6 +95,7 @@ workflow_run_detail_fields = {
"status": fields.String,
"outputs": fields.Raw(attribute="outputs_dict"),
"outputs_as_generation": fields.Boolean,
"result_replay": fields.Raw(attribute="result_replay_dict"),
"error": fields.String,
"elapsed_time": fields.Float,
"total_tokens": fields.Integer,

View File

@ -0,0 +1,26 @@
"""add result replay to workflow runs
Revision ID: b8d7fb4f6c2a
Revises: fce013ca180e
Create Date: 2026-03-23 23:50:00.000000
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "b8d7fb4f6c2a"
down_revision = "fce013ca180e"
branch_labels = None
depends_on = None
def upgrade():
op.add_column("workflow_runs", sa.Column("result_replay", models.types.LongText(), nullable=True))
def downgrade():
op.drop_column("workflow_runs", "result_replay")

View File

@ -652,6 +652,7 @@ class WorkflowRun(Base):
nullable=False,
)
outputs: Mapped[str | None] = mapped_column(LongText, default="{}")
result_replay: Mapped[str | None] = mapped_column(LongText)
error: Mapped[str | None] = mapped_column(LongText)
elapsed_time: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default=sa.text("0"))
total_tokens: Mapped[int] = mapped_column(sa.BigInteger, server_default=sa.text("0"))
@ -697,6 +698,10 @@ class WorkflowRun(Base):
def outputs_dict(self) -> Mapping[str, Any]:
return json.loads(self.outputs) if self.outputs else {}
@property
def result_replay_dict(self) -> Mapping[str, Any] | None:
return json.loads(self.result_replay) if self.result_replay else None
@property
@deprecated("This method is retained for historical reasons; avoid using it if possible.")
def message(self):
@ -729,6 +734,7 @@ class WorkflowRun(Base):
"status": self.status,
"outputs": self.outputs_dict,
"outputs_as_generation": self.outputs_as_generation,
"result_replay": self.result_replay_dict,
"error": self.error,
"elapsed_time": self.elapsed_time,
"total_tokens": self.total_tokens,
@ -754,6 +760,7 @@ class WorkflowRun(Base):
inputs=json.dumps(data.get("inputs")),
status=data.get("status"),
outputs=json.dumps(data.get("outputs")),
result_replay=json.dumps(data.get("result_replay")) if data.get("result_replay") is not None else None,
error=data.get("error"),
elapsed_time=data.get("elapsed_time"),
total_tokens=data.get("total_tokens"),

View File

@ -104,6 +104,9 @@ def _create_workflow_run_from_execution(
workflow_run.outputs = (
json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
)
workflow_run.result_replay = (
json.dumps(json_converter.to_json_encodable(execution.result_replay)) if execution.result_replay else None
)
workflow_run.error = execution.error_message
workflow_run.elapsed_time = execution.elapsed_time
workflow_run.total_tokens = execution.total_tokens
@ -125,6 +128,8 @@ def _update_workflow_run_from_execution(workflow_run: WorkflowRun, execution: Wo
workflow_run.outputs = (
json.dumps(json_converter.to_json_encodable(execution.outputs)) if execution.outputs else "{}"
)
if execution.result_replay is not None:
workflow_run.result_replay = json.dumps(json_converter.to_json_encodable(execution.result_replay))
workflow_run.error = execution.error_message
workflow_run.elapsed_time = execution.elapsed_time
workflow_run.total_tokens = execution.total_tokens

View File

@ -0,0 +1,290 @@
import json
from datetime import datetime
from core.app.workflow.result_replay import WorkflowResultReplayBuilder, build_result_replay_from_node_executions
from core.file import FILE_MODEL_IDENTITY
from core.workflow.entities import ToolCall, ToolResult, ToolResultStatus, WorkflowNodeExecution
from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph_events import ChunkType, NodeRunStreamChunkEvent
def _stream_event(
*,
chunk_type: ChunkType,
chunk: str = "",
tool_call: ToolCall | None = None,
tool_result: ToolResult | None = None,
) -> NodeRunStreamChunkEvent:
return NodeRunStreamChunkEvent(
id="execution-1",
node_id="answer-node",
node_type=NodeType.ANSWER,
selector=["answer-node", "answer"],
chunk=chunk,
is_final=False,
chunk_type=chunk_type,
tool_call=tool_call,
tool_result=tool_result,
)
def test_workflow_result_replay_builder_preserves_generation_sequence_and_files() -> None:
builder = WorkflowResultReplayBuilder()
file_payload = {
"dify_model_identity": FILE_MODEL_IDENTITY,
"related_id": "file-1",
"filename": "report.pdf",
"size": 128,
"mime_type": "application/pdf",
"transfer_method": "local_file",
"type": "document",
"url": "https://example.com/report.pdf",
"upload_file_id": "upload-file-1",
"remote_url": "",
}
builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.THOUGHT_START))
builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.THOUGHT, chunk="Need to inspect the workspace."))
builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.THOUGHT_END))
builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.TEXT, chunk="I checked the directory.\n"))
builder.add_stream_chunk(
_stream_event(
chunk_type=ChunkType.TOOL_CALL,
tool_call=ToolCall(
id="tool-call-1",
name="bash",
arguments=json.dumps({"command": "ls"}),
icon="light-icon",
icon_dark="dark-icon",
),
)
)
builder.add_stream_chunk(
_stream_event(
chunk_type=ChunkType.TOOL_RESULT,
chunk="output/",
tool_result=ToolResult(
id="tool-call-1",
name="bash",
output="output/",
files=["https://example.com/report.pdf"],
status=ToolResultStatus.SUCCESS,
elapsed_time=0.3,
icon="light-icon",
icon_dark="dark-icon",
),
)
)
builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.TEXT, chunk="Finished."))
replay = builder.build(outputs={"files": [file_payload]})
assert replay is not None
assert replay["text"] == "I checked the directory.\nFinished."
assert replay["files"] == [{"var_name": "files", "files": [file_payload]}]
assert replay["llm_generation_items"] == [
{
"type": "thought",
"thought_output": "Need to inspect the workspace.",
"thought_completed": True,
},
{
"type": "text",
"text": "I checked the directory.\n",
"text_completed": True,
},
{
"type": "tool",
"tool_name": "bash",
"tool_arguments": "{\"command\": \"ls\"}",
"tool_icon": "light-icon",
"tool_icon_dark": "dark-icon",
"tool_output": "output/",
"tool_files": ["https://example.com/report.pdf"],
"tool_duration": 0.3,
},
{
"type": "text",
"text": "Finished.",
"text_completed": True,
},
]
def test_workflow_result_replay_builder_synthesizes_items_from_generation_without_sequence() -> None:
builder = WorkflowResultReplayBuilder()
replay = builder.build(
outputs={
"generation": {
"content": "Workspace is clean.",
"reasoning_content": [],
"tool_calls": [
{
"name": "bash",
"arguments": "{\"bash\":\"ls\"}",
"output": "output/",
"status": "success",
"elapsed_time": 0.2,
}
],
"sequence": [],
}
}
)
assert replay is not None
assert replay["text"] == "Workspace is clean."
assert replay["llm_generation_items"] == [
{
"type": "tool",
"tool_name": "bash",
"tool_arguments": "{\"bash\":\"ls\"}",
"tool_output": "output/",
"tool_duration": 0.2,
},
{
"type": "text",
"text": "Workspace is clean.",
"text_completed": True,
},
]
def test_workflow_result_replay_builder_ignores_empty_terminal_stream_events() -> None:
builder = WorkflowResultReplayBuilder()
builder.add_stream_chunk(_stream_event(chunk_type=ChunkType.TEXT, chunk="Hello"))
builder.add_stream_chunk(
NodeRunStreamChunkEvent(
id="execution-1",
node_id="answer-node",
node_type=NodeType.ANSWER,
selector=["answer-node", "generation", "thought"],
chunk="",
is_final=True,
chunk_type=ChunkType.THOUGHT,
)
)
builder.add_stream_chunk(
NodeRunStreamChunkEvent(
id="execution-1",
node_id="answer-node",
node_type=NodeType.ANSWER,
selector=["answer-node", "generation", "tool_calls"],
chunk="",
is_final=True,
chunk_type=ChunkType.TOOL_CALL,
tool_call=ToolCall(id="", name="", arguments=""),
)
)
builder.add_stream_chunk(
NodeRunStreamChunkEvent(
id="execution-1",
node_id="answer-node",
node_type=NodeType.ANSWER,
selector=["answer-node", "generation", "tool_results"],
chunk="",
is_final=True,
chunk_type=ChunkType.TOOL_RESULT,
tool_result=ToolResult(id="", name="", output="", files=[], status=ToolResultStatus.SUCCESS),
)
)
replay = builder.build(outputs={"answer": "Hello"})
assert replay is not None
assert replay["llm_generation_items"] == [
{
"type": "text",
"text": "Hello",
"text_completed": True,
},
]
def test_build_result_replay_from_node_executions_uses_llm_trace_when_outputs_are_flattened() -> None:
node_execution = WorkflowNodeExecution(
id="node-execution-1",
node_execution_id="node-execution-1",
workflow_id="workflow-1",
workflow_execution_id="workflow-run-1",
index=1,
node_id="agent-node",
node_type=NodeType.LLM,
title="Agent",
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={
"generation": {
"content": "Workspace is clean.",
"reasoning_content": [],
"tool_calls": [],
"sequence": [],
}
},
metadata={
WorkflowNodeExecutionMetadataKey.LLM_TRACE: [
{
"type": "model",
"output": {
"text": "Let me inspect the workspace:",
"reasoning": None,
"tool_calls": [
{
"id": "tool-1",
"name": "bash",
"arguments": "{\"bash\":\"ls\"}",
}
],
},
},
{
"type": "tool",
"duration": 0.2,
"status": "success",
"output": {
"id": "tool-1",
"name": "bash",
"arguments": "{\"bash\":\"ls\"}",
"output": "output/",
},
},
{
"type": "model",
"output": {
"text": "Workspace is clean.",
"reasoning": None,
"tool_calls": [],
},
},
]
},
created_at=datetime.utcnow(),
)
replay = build_result_replay_from_node_executions(
outputs={"answer": "Workspace is clean."},
node_executions=[node_execution],
)
assert replay is not None
assert replay["text"] == "Workspace is clean."
assert replay["llm_generation_items"] == [
{
"type": "text",
"text": "Let me inspect the workspace:",
"text_completed": True,
},
{
"type": "tool",
"tool_name": "bash",
"tool_arguments": "{\"bash\":\"ls\"}",
"tool_output": "output/",
"tool_duration": 0.2,
},
{
"type": "text",
"text": "Workspace is clean.",
"text_completed": True,
},
]

View File

@ -0,0 +1,87 @@
from unittest.mock import MagicMock
from uuid import uuid4
from sqlalchemy.orm import sessionmaker
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.workflow.entities.workflow_execution import WorkflowExecution
from core.workflow.enums import WorkflowExecutionStatus, WorkflowType
from libs.datetime_utils import naive_utc_now
from models import Account, CreatorUserRole, WorkflowRun
from models.enums import WorkflowRunTriggeredFrom
def _build_repo() -> SQLAlchemyWorkflowExecutionRepository:
session_factory = MagicMock(spec=sessionmaker)
account = MagicMock(spec=Account)
account.id = str(uuid4())
account.current_tenant_id = str(uuid4())
return SQLAlchemyWorkflowExecutionRepository(
session_factory=session_factory,
user=account,
app_id="test-app-id",
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
)
def test_to_db_model_preserves_result_replay() -> None:
repo = _build_repo()
execution = WorkflowExecution.new(
id_=str(uuid4()),
workflow_id=str(uuid4()),
workflow_type=WorkflowType.WORKFLOW,
workflow_version="1.0",
graph={"nodes": [], "edges": []},
inputs={"query": "hello"},
started_at=naive_utc_now(),
)
execution.outputs = {"answer": "hello"}
execution.result_replay = {
"text": "hello",
"llm_generation_items": [
{"type": "text", "text": "hello", "text_completed": True},
{"type": "tool", "tool_name": "bash", "tool_arguments": "ls", "tool_output": "output"},
],
"files": [{"var_name": "files", "files": [{"id": "file-1"}]}],
}
execution.status = WorkflowExecutionStatus.SUCCEEDED
db_model = repo._to_db_model(execution)
assert db_model.result_replay_dict == execution.result_replay
def test_to_domain_model_preserves_result_replay() -> None:
repo = _build_repo()
workflow_run = WorkflowRun(
id=str(uuid4()),
tenant_id=str(uuid4()),
app_id=str(uuid4()),
workflow_id=str(uuid4()),
type=WorkflowType.WORKFLOW.value,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING.value,
version="1.0",
graph='{"nodes":[],"edges":[]}',
inputs='{"query":"hello"}',
status=WorkflowExecutionStatus.SUCCEEDED,
outputs='{"answer":"hello"}',
result_replay='{"text":"hello","llm_generation_items":[{"type":"tool","tool_name":"bash"}],"files":[]}',
error=None,
elapsed_time=1.2,
total_tokens=5,
total_steps=1,
exceptions_count=0,
created_by_role=CreatorUserRole.ACCOUNT,
created_by=str(uuid4()),
created_at=naive_utc_now(),
finished_at=naive_utc_now(),
)
domain_model = repo._to_domain_model(workflow_run)
assert domain_model.result_replay == {
"text": "hello",
"llm_generation_items": [{"type": "tool", "tool_name": "bash"}],
"files": [],
}

View File

@ -0,0 +1,79 @@
import json
from uuid import uuid4
from core.workflow.entities.workflow_execution import WorkflowExecution
from core.workflow.enums import WorkflowExecutionStatus, WorkflowType
from libs.datetime_utils import naive_utc_now
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
from tasks.workflow_execution_tasks import _create_workflow_run_from_execution, _update_workflow_run_from_execution
def _build_execution() -> WorkflowExecution:
execution = WorkflowExecution.new(
id_=str(uuid4()),
workflow_id=str(uuid4()),
workflow_type=WorkflowType.WORKFLOW,
workflow_version="draft",
graph={"nodes": [], "edges": []},
inputs={"query": "hello"},
started_at=naive_utc_now(),
)
execution.status = WorkflowExecutionStatus.SUCCEEDED
execution.outputs = {"answer": "done"}
execution.result_replay = {
"text": "done",
"llm_generation_items": [
{
"type": "tool",
"tool_name": "bash",
"tool_arguments": "{\"command\": \"pwd\"}",
"tool_output": "/workspace",
},
],
"files": [],
}
execution.finished_at = naive_utc_now()
return execution
def test_create_workflow_run_from_execution_persists_result_replay() -> None:
execution = _build_execution()
workflow_run = _create_workflow_run_from_execution(
execution=execution,
tenant_id=str(uuid4()),
app_id=str(uuid4()),
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
creator_user_id=str(uuid4()),
creator_user_role=CreatorUserRole.ACCOUNT,
)
assert json.loads(workflow_run.result_replay or "{}") == execution.result_replay
assert workflow_run.result_replay_dict == execution.result_replay
def test_update_workflow_run_from_execution_preserves_existing_replay_until_new_value_arrives() -> None:
execution = _build_execution()
workflow_run = _create_workflow_run_from_execution(
execution=execution,
tenant_id=str(uuid4()),
app_id=str(uuid4()),
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
creator_user_id=str(uuid4()),
creator_user_role=CreatorUserRole.ACCOUNT,
)
previous_result_replay = workflow_run.result_replay
execution.result_replay = None
_update_workflow_run_from_execution(workflow_run, execution)
assert workflow_run.result_replay == previous_result_replay
execution.result_replay = {
"text": "updated",
"llm_generation_items": [],
"files": [],
}
_update_workflow_run_from_execution(workflow_run, execution)
assert workflow_run.result_replay_dict == execution.result_replay

View File

@ -29,6 +29,7 @@ const Record = () => {
runDetailUrl={getWorkflowRunAndTraceUrl(historyWorkflowData?.id).runUrl}
tracingListUrl={getWorkflowRunAndTraceUrl(historyWorkflowData?.id).traceUrl}
getResultCallback={handleResultCallback}
useFirstRunResultView
/>
</div>
)

View File

@ -0,0 +1,276 @@
import type { WorkflowRunDetailResponse } from '@/models/log'
import { render, screen, waitFor } from '@testing-library/react'
import userEvent from '@testing-library/user-event'
import copy from 'copy-to-clipboard'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { ToastContext } from '@/app/components/base/toast'
import Run from './index'
type MockGenerationItem = {
id: string
toolName?: string
thoughtOutput?: string
text?: string
}
type MockFileGroup = {
varName: string
}
const fetchRunDetailMock = vi.fn()
const fetchTracingListMock = vi.fn()
vi.mock('react-i18next', () => ({
useTranslation: () => ({
t: (key: string) => key,
}),
}))
vi.mock('copy-to-clipboard', () => ({
default: vi.fn(),
}))
vi.mock('@/service/log', () => ({
fetchRunDetail: (...args: unknown[]) => fetchRunDetailMock(...args),
fetchTracingList: (...args: unknown[]) => fetchTracingListMock(...args),
}))
vi.mock('../store', () => ({
useStore: (selector: (state: { isListening: boolean }) => unknown) => selector({ isListening: false }),
}))
vi.mock('./output-panel', () => ({
default: () => <div data-testid="output-panel">output-panel</div>,
}))
vi.mock('./result-panel', () => ({
default: () => <div data-testid="result-panel">result-panel</div>,
}))
vi.mock('./status', () => ({
default: () => <div data-testid="status-panel">status-panel</div>,
}))
vi.mock('./tracing-panel', () => ({
default: ({ list }: { list: unknown[] }) => <div data-testid="tracing-panel">{list.length}</div>,
}))
vi.mock('./result-text', () => ({
default: ({
outputs,
llmGenerationItems,
allFiles,
}: {
outputs?: string
llmGenerationItems?: MockGenerationItem[]
allFiles?: MockFileGroup[]
}) => (
<div
data-testid="result-text"
data-output={outputs || ''}
data-item-count={llmGenerationItems?.length || 0}
data-file-count={allFiles?.length || 0}
>
{outputs || ''}
{(llmGenerationItems || []).map(item => (
<div key={item.id}>{item.toolName || item.thoughtOutput || item.text}</div>
))}
{(allFiles || []).map(item => (
<div key={item.varName}>{item.varName}</div>
))}
</div>
),
}))
const baseRunDetail: WorkflowRunDetailResponse = {
id: 'run-1',
version: 'draft',
graph: {
nodes: [],
edges: [],
},
inputs: {},
inputs_truncated: false,
status: 'succeeded',
outputs: {},
outputs_truncated: false,
total_steps: 1,
created_by_role: 'account',
created_by_account: {
id: 'account-1',
name: 'Tester',
email: 'tester@example.com',
},
created_at: 1,
finished_at: 2,
}
const renderRun = () => {
const notify = vi.fn()
render(
<ToastContext.Provider value={{ notify, close: vi.fn() }}>
<Run
runDetailUrl="/apps/app-1/workflow-runs/run-1"
tracingListUrl="/apps/app-1/workflow-runs/run-1/node-executions"
useFirstRunResultView
/>
</ToastContext.Provider>,
)
return { notify }
}
describe('Run history result replay', () => {
beforeEach(() => {
vi.clearAllMocks()
fetchTracingListMock.mockResolvedValue({ data: [] })
})
it('renders workflow history result from result_replay and copies structured content', async () => {
fetchRunDetailMock.mockResolvedValue({
...baseRunDetail,
result_replay: {
text: 'I checked the workspace.',
llm_generation_items: [
{
type: 'text',
text: 'I checked the workspace.',
text_completed: true,
},
{
type: 'tool',
tool_name: 'bash',
tool_arguments: '{"command":"ls"}',
tool_output: 'output/',
tool_duration: 0.3,
},
],
files: [
{
var_name: 'files',
files: [
{
related_id: 'file-1',
extension: 'pdf',
filename: 'report.pdf',
size: 128,
mime_type: 'application/pdf',
transfer_method: 'local_file',
type: 'document',
url: 'https://example.com/report.pdf',
upload_file_id: 'upload-file-1',
remote_url: '',
},
],
},
],
},
})
renderRun()
await waitFor(() => expect(screen.getByTestId('result-text')).toBeInTheDocument())
expect(screen.queryByTestId('output-panel')).not.toBeInTheDocument()
expect(screen.getByTestId('result-text')).toHaveAttribute('data-output', 'I checked the workspace.')
expect(screen.getByTestId('result-text')).toHaveAttribute('data-item-count', '2')
expect(screen.getByTestId('result-text')).toHaveAttribute('data-file-count', '1')
await userEvent.setup().click(screen.getByRole('button', { name: 'operation.copy' }))
expect(copy).toHaveBeenCalledWith(
expect.stringContaining('[TOOL] bash'),
)
expect(copy).toHaveBeenCalledWith(
expect.stringContaining('INPUT:\n{"command":"ls"}'),
)
})
it('falls back to generation outputs for old runs without result_replay', async () => {
fetchRunDetailMock.mockResolvedValue({
...baseRunDetail,
outputs_as_generation: true,
outputs: {
generation: {
content: 'Hello',
reasoning_content: ['Need to answer'],
tool_calls: [
{
name: 'bash',
arguments: '{"command":"pwd"}',
result: '/workspace',
elapsed_time: 0.2,
status: 'success',
},
],
sequence: [
{ type: 'reasoning', index: 0 },
{ type: 'tool_call', index: 0 },
{ type: 'content', start: 0, end: 5 },
],
},
},
})
renderRun()
await waitFor(() => expect(screen.getByTestId('result-text')).toBeInTheDocument())
expect(screen.getByTestId('result-text')).toHaveAttribute('data-output', 'Hello')
expect(screen.getByTestId('result-text')).toHaveAttribute('data-item-count', '3')
expect(screen.getByText('Need to answer')).toBeInTheDocument()
expect(screen.getByText('bash')).toBeInTheDocument()
})
it('synthesizes tool items for generation outputs without sequence and does not leak a zero placeholder', async () => {
fetchRunDetailMock.mockResolvedValue({
...baseRunDetail,
outputs_as_generation: true,
outputs: {
generation: {
content: 'Workspace is clean.',
reasoning_content: [],
tool_calls: [
{
name: 'bash',
arguments: '{"command":"ls"}',
output: 'output/',
elapsed_time: 0.2,
status: 'success',
},
],
sequence: [],
},
},
})
renderRun()
await waitFor(() => expect(screen.getByTestId('result-text')).toBeInTheDocument())
expect(screen.getByTestId('result-text')).toHaveAttribute('data-output', 'Workspace is clean.')
expect(screen.getByTestId('result-text')).toHaveAttribute('data-item-count', '2')
expect(screen.getByText('bash')).toBeInTheDocument()
expect(screen.queryByText(/^0$/)).not.toBeInTheDocument()
})
it('keeps default output-panel behavior when history replay mode is disabled', async () => {
fetchRunDetailMock.mockResolvedValue({
...baseRunDetail,
outputs: {
answer: 'Hello',
},
})
const notify = vi.fn()
render(
<ToastContext.Provider value={{ notify, close: vi.fn() }}>
<Run
runDetailUrl="/apps/app-1/workflow-runs/run-1"
tracingListUrl="/apps/app-1/workflow-runs/run-1/node-executions"
/>
</ToastContext.Provider>,
)
await waitFor(() => expect(screen.getByTestId('output-panel')).toBeInTheDocument())
expect(screen.queryByTestId('result-text')).not.toBeInTheDocument()
})
})

View File

@ -1,10 +1,17 @@
'use client'
import type { FC } from 'react'
import type { WorkflowRunDetailResponse } from '@/models/log'
import type { NodeTracing } from '@/types/workflow'
import type {
WorkflowRunDetailResponse,
WorkflowRunReplayGenerationItem,
WorkflowRunResultReplay,
} from '@/models/log'
import type { LLMGenerationItem, NodeTracing } from '@/types/workflow'
import copy from 'copy-to-clipboard'
import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
import { useTranslation } from 'react-i18next'
import { useContext } from 'use-context-selector'
import Button from '@/app/components/base/button'
import { getFilesInLogs, getProcessedFilesFromResponse } from '@/app/components/base/file-uploader/utils'
import Loading from '@/app/components/base/loading'
import { ToastContext } from '@/app/components/base/toast'
import { WorkflowRunningStatus } from '@/app/components/workflow/types'
@ -13,15 +20,343 @@ import { cn } from '@/utils/classnames'
import { useStore } from '../store'
import OutputPanel from './output-panel'
import ResultPanel from './result-panel'
import ResultText from './result-text'
import StatusPanel from './status'
import TracingPanel from './tracing-panel'
type GenerationToolCall = {
name?: string
arguments?: string
result?: Record<string, unknown> | string
output?: Record<string, unknown> | string
elapsed_time?: number
time_cost?: number
icon?: string | { background: string, content: string }
icon_dark?: string | { background: string, content: string }
tool_icon?: string | { background: string, content: string }
tool_icon_dark?: string | { background: string, content: string }
files?: unknown[]
tool_files?: unknown[]
error?: string | null
status?: string
}
type GenerationSequenceSegment = {
type: 'content' | 'reasoning' | 'tool_call'
start?: number
end?: number
index?: number
}
type GenerationPayload = {
content?: string
reasoning_content?: string[]
tool_calls?: GenerationToolCall[]
sequence?: GenerationSequenceSegment[]
}
type HistoryResultView = {
resultText?: string
llmGenerationItems: LLMGenerationItem[]
allFiles: ReturnType<typeof getFilesInLogs>
copyContent: string
}
export type RunProps = {
hideResult?: boolean
activeTab?: 'RESULT' | 'DETAIL' | 'TRACING'
getResultCallback?: (result: WorkflowRunDetailResponse) => void
runDetailUrl: string
tracingListUrl: string
useFirstRunResultView?: boolean
}
const stringifyCopyValue = (value: unknown) => {
if (typeof value === 'string')
return value
if (value === null || typeof value === 'undefined')
return ''
try {
return JSON.stringify(value, null, 2)
}
catch {
return String(value)
}
}
const buildCopyContentFromLLMGenerationItems = (llmGenerationItems?: LLMGenerationItem[]) => {
if (!llmGenerationItems?.length)
return ''
const hasStructuredItems = llmGenerationItems.some(item => item.type !== 'text')
if (!hasStructuredItems)
return ''
return llmGenerationItems
.map((item) => {
if (item.type === 'text')
return item.text || ''
if (item.type === 'thought')
return item.thoughtOutput ? `[THOUGHT]\n${item.thoughtOutput}` : ''
if (item.type === 'tool') {
const sections = [
`[TOOL] ${item.toolName || ''}`.trim(),
]
if (item.toolArguments)
sections.push(`INPUT:\n${stringifyCopyValue(item.toolArguments)}`)
if (typeof item.toolOutput !== 'undefined')
sections.push(`OUTPUT:\n${stringifyCopyValue(item.toolOutput)}`)
if (item.toolError)
sections.push(`ERROR:\n${item.toolError}`)
return sections.join('\n')
}
return ''
})
.filter(Boolean)
.join('\n\n')
}
const isRecord = (value: unknown): value is Record<string, unknown> => {
return !!value && typeof value === 'object' && !Array.isArray(value)
}
const getSingleStringOutput = (outputs?: WorkflowRunDetailResponse['outputs']) => {
if (!isRecord(outputs))
return undefined
const entries = Object.entries(outputs)
if (entries.length !== 1)
return undefined
const [, value] = entries[0]
return typeof value === 'string' ? value : undefined
}
const getSingleGenerationOutput = (outputs?: WorkflowRunDetailResponse['outputs']) => {
if (!isRecord(outputs))
return undefined
const values = Object.values(outputs)
if (values.length !== 1)
return undefined
const [value] = values
if (!isRecord(value))
return undefined
return value as GenerationPayload
}
const buildGenerationItemsFromPayload = (generation?: GenerationPayload): {
resultText?: string
llmGenerationItems: LLMGenerationItem[]
} => {
if (!generation)
return { resultText: undefined, llmGenerationItems: [] }
const resultText = typeof generation.content === 'string' ? generation.content : undefined
const reasoningContent = Array.isArray(generation.reasoning_content) ? generation.reasoning_content : []
const toolCalls = Array.isArray(generation.tool_calls) ? generation.tool_calls : []
const sequence = Array.isArray(generation.sequence) ? generation.sequence : []
const llmGenerationItems: LLMGenerationItem[] = []
const appendSyntheticToolItem = (toolCall: GenerationToolCall, index: number) => {
const toolOutput = typeof toolCall.result !== 'undefined' ? toolCall.result : toolCall.output
llmGenerationItems.push({
id: `generation-tool-${index}`,
type: 'tool',
toolName: toolCall.name,
toolArguments: toolCall.arguments,
toolOutput,
toolDuration: toolCall.elapsed_time ?? toolCall.time_cost,
toolIcon: toolCall.icon ?? toolCall.tool_icon,
toolIconDark: toolCall.icon_dark ?? toolCall.tool_icon_dark,
toolFiles: Array.isArray(toolCall.files) ? toolCall.files : toolCall.tool_files,
toolError: toolCall.status === 'error'
? (toolCall.error || stringifyCopyValue(toolOutput) || 'error')
: undefined,
})
}
sequence.forEach((segment, index) => {
if (segment.type === 'content') {
const start = typeof segment.start === 'number' ? segment.start : 0
const end = typeof segment.end === 'number' ? segment.end : start
const text = resultText?.substring(start, end)
if (text?.trim()) {
llmGenerationItems.push({
id: `generation-text-${index}`,
type: 'text',
text,
textCompleted: true,
})
}
return
}
if (segment.type === 'reasoning') {
const reasoning = typeof segment.index === 'number' ? reasoningContent[segment.index] : undefined
if (reasoning) {
llmGenerationItems.push({
id: `generation-thought-${index}`,
type: 'thought',
thoughtOutput: reasoning,
thoughtCompleted: true,
})
}
return
}
if (segment.type === 'tool_call') {
const toolCall = typeof segment.index === 'number' ? toolCalls[segment.index] : undefined
if (!toolCall)
return
appendSyntheticToolItem(toolCall, index)
}
})
if (!llmGenerationItems.length && (reasoningContent.length || toolCalls.length || resultText)) {
const syntheticSegmentCount = Math.max(reasoningContent.length, toolCalls.length)
for (let i = 0; i < syntheticSegmentCount; i += 1) {
const reasoning = reasoningContent[i]
if (reasoning) {
llmGenerationItems.push({
id: `generation-thought-${i}`,
type: 'thought',
thoughtOutput: reasoning,
thoughtCompleted: true,
})
}
const toolCall = toolCalls[i]
if (toolCall)
appendSyntheticToolItem(toolCall, i)
}
if (resultText) {
llmGenerationItems.push({
id: 'generation-text-final',
type: 'text',
text: resultText,
textCompleted: true,
})
}
}
return {
resultText,
llmGenerationItems,
}
}
const buildGenerationItemsFromReplay = (items?: WorkflowRunReplayGenerationItem[]) => {
if (!items?.length)
return []
return items
.filter((item) => {
if (item.type === 'thought')
return !!item.thought_output
if (item.type === 'tool')
return !!(item.tool_name || item.tool_arguments || item.tool_output || item.tool_error || item.tool_files?.length)
if (item.type === 'text')
return typeof item.text === 'string' && item.text.length > 0
return true
})
.map((item, index) => ({
id: `replay-item-${index}`,
type: item.type,
text: item.text,
textCompleted: item.text_completed,
thoughtOutput: item.thought_output,
thoughtCompleted: item.thought_completed,
toolName: item.tool_name,
toolArguments: item.tool_arguments,
toolOutput: item.tool_output,
toolFiles: item.tool_files,
toolError: item.tool_error,
toolDuration: item.tool_duration,
toolIcon: item.tool_icon,
toolIconDark: item.tool_icon_dark,
} satisfies LLMGenerationItem))
}
const buildAllFilesFromReplay = (resultReplay?: WorkflowRunResultReplay | null) => {
if (!resultReplay?.files?.length)
return []
return resultReplay.files
.map(fileGroup => ({
varName: fileGroup.var_name,
list: getProcessedFilesFromResponse(fileGroup.files || []),
}))
.filter(fileGroup => fileGroup.list.length > 0)
}
const buildHistoryResultView = (runDetail?: WorkflowRunDetailResponse): HistoryResultView => {
if (!runDetail) {
return {
resultText: undefined,
llmGenerationItems: [],
allFiles: [],
copyContent: '',
}
}
if (runDetail.result_replay) {
const llmGenerationItems = buildGenerationItemsFromReplay(runDetail.result_replay.llm_generation_items)
const resultText = runDetail.result_replay.text
const copyContent = buildCopyContentFromLLMGenerationItems(llmGenerationItems) || resultText || ''
return {
resultText,
llmGenerationItems,
allFiles: buildAllFilesFromReplay(runDetail.result_replay),
copyContent,
}
}
const allFiles = isRecord(runDetail.outputs) ? getFilesInLogs(runDetail.outputs) : []
const singleTextOutput = getSingleStringOutput(runDetail.outputs)
if (singleTextOutput) {
return {
resultText: singleTextOutput,
llmGenerationItems: [],
allFiles,
copyContent: singleTextOutput,
}
}
if (runDetail.outputs_as_generation) {
const generationPayload = getSingleGenerationOutput(runDetail.outputs)
const { resultText, llmGenerationItems } = buildGenerationItemsFromPayload(generationPayload)
return {
resultText,
llmGenerationItems,
allFiles,
copyContent: buildCopyContentFromLLMGenerationItems(llmGenerationItems) || resultText || '',
}
}
return {
resultText: undefined,
llmGenerationItems: [],
allFiles,
copyContent: '',
}
}
const RunPanel: FC<RunProps> = ({
@ -30,6 +365,7 @@ const RunPanel: FC<RunProps> = ({
getResultCallback,
runDetailUrl,
tracingListUrl,
useFirstRunResultView = false,
}) => {
const { t } = useTranslation()
const { notify } = useContext(ToastContext)
@ -47,6 +383,11 @@ const RunPanel: FC<RunProps> = ({
return 'N/A'
}, [runDetail])
const historyResultView = useMemo(
() => buildHistoryResultView(runDetail),
[runDetail],
)
const getResult = useCallback(async () => {
try {
const res = await fetchRunDetail(runDetailUrl)
@ -100,10 +441,9 @@ const RunPanel: FC<RunProps> = ({
}, [isListening])
useEffect(() => {
// fetch data
if (runDetailUrl && tracingListUrl)
getData()
}, [runDetailUrl, tracingListUrl])
}, [getData, runDetailUrl, tracingListUrl])
const [height, setHeight] = useState(0)
const ref = useRef<HTMLDivElement>(null)
@ -119,7 +459,6 @@ const RunPanel: FC<RunProps> = ({
return (
<div className="relative flex grow flex-col">
{/* tab */}
<div className="flex shrink-0 items-center border-b-[0.5px] border-divider-subtle px-4">
{!hideResult && (
<div
@ -151,20 +490,52 @@ const RunPanel: FC<RunProps> = ({
{t('tracing', { ns: 'runLog' })}
</div>
</div>
{/* panel detail */}
<div ref={ref} className={cn('relative h-0 grow overflow-y-auto rounded-b-xl bg-background-section')}>
<div
ref={ref}
className={cn(
'relative h-0 grow overflow-y-auto rounded-b-xl bg-background-section',
useFirstRunResultView && (currentTab === 'RESULT' || currentTab === 'TRACING') && '!bg-background-section-burn',
)}
>
{loading && (
<div className="flex h-full items-center justify-center bg-components-panel-bg">
<Loading />
</div>
)}
{!loading && currentTab === 'RESULT' && runDetail && (
{!loading && currentTab === 'RESULT' && runDetail && !useFirstRunResultView && (
<OutputPanel
outputs={runDetail.outputs}
error={runDetail.error}
height={height}
/>
)}
{!loading && currentTab === 'RESULT' && runDetail && useFirstRunResultView && (
<div className="p-2">
<ResultText
isRunning={runDetail.status === WorkflowRunningStatus.Running}
outputs={historyResultView.resultText}
llmGenerationItems={historyResultView.llmGenerationItems}
allFiles={historyResultView.allFiles}
error={runDetail.error}
onClick={() => { void switchTab('DETAIL') }}
/>
{runDetail.status !== WorkflowRunningStatus.Running && historyResultView.copyContent && (
<Button
className={cn('mb-4 ml-4 space-x-1')}
onClick={() => {
copy(historyResultView.copyContent)
notify({
type: 'success',
message: t('actionMsg.copySuccessfully', { ns: 'common' }),
})
}}
>
<span className="i-ri-clipboard-line h-3.5 w-3.5" />
<div>{t('operation.copy', { ns: 'common' })}</div>
</Button>
)}
</div>
)}
{!loading && currentTab === 'DETAIL' && runDetail && (
<ResultPanel
inputs={runDetail.inputs}
@ -192,7 +563,7 @@ const RunPanel: FC<RunProps> = ({
)}
{!loading && currentTab === 'TRACING' && (
<TracingPanel
className="bg-background-section"
className={useFirstRunResultView ? 'bg-background-section-burn' : 'bg-background-section'}
list={list}
/>
)}

View File

@ -29,7 +29,7 @@ const ResultText: FC<ResultTextProps> = ({
allFiles,
}) => {
const { t } = useTranslation()
const generationContentRenderIsUsed = llmGenerationItems?.length && llmGenerationItems.some((item) => {
const generationContentRenderIsUsed = !!llmGenerationItems?.length && llmGenerationItems.some((item) => {
return item.type === 'tool' || item.type === 'thought'
})

View File

@ -286,6 +286,39 @@ export type WorkflowLogsRequest = {
limit: number // The default value is 20 and the range is 1-100
}
export type WorkflowRunReplayFileGroup = {
var_name: string
files: FileResponse[]
}
export type WorkflowRunReplayGenerationItem = {
type: 'text' | 'thought' | 'tool'
text?: string
text_completed?: boolean
thought_output?: string
thought_completed?: boolean
tool_name?: string
tool_arguments?: string
tool_output?: Record<string, unknown> | string
tool_files?: unknown[]
tool_error?: string
tool_duration?: number
tool_icon?: string | {
background: string
content: string
}
tool_icon_dark?: string | {
background: string
content: string
}
}
export type WorkflowRunResultReplay = {
text?: string
llm_generation_items?: WorkflowRunReplayGenerationItem[]
files?: WorkflowRunReplayFileGroup[]
}
export type WorkflowRunDetailResponse = {
id: string
version: string
@ -294,10 +327,12 @@ export type WorkflowRunDetailResponse = {
edges: Edge[]
viewport?: Viewport
}
inputs: string
inputs: Record<string, unknown> | string
inputs_truncated: boolean
status: 'running' | 'succeeded' | 'failed' | 'stopped'
outputs?: string
status: 'running' | 'succeeded' | 'failed' | 'stopped' | 'paused' | 'partial-succeeded'
outputs?: Record<string, unknown> | string
outputs_as_generation?: boolean
result_replay?: WorkflowRunResultReplay | null
outputs_truncated: boolean
outputs_full_content?: {
download_url: string