feat: show all variables in start node outputs

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN- 2026-03-17 05:19:30 +08:00
parent 3be2315cd1
commit e44f024d66
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
5 changed files with 180 additions and 10 deletions

View File

@ -20,7 +20,8 @@ class StartNode(Node[StartNodeData]):
def _run(self) -> NodeRunResult:
node_inputs = dict(self.graph_runtime_state.variable_pool.get_by_prefix(self.id))
self._validate_and_normalize_json_object_inputs(node_inputs)
outputs = dict(node_inputs)
outputs = dict(self.graph_runtime_state.variable_pool.flatten(unprefixed_node_id=self.id))
outputs.update(node_inputs)
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=node_inputs, outputs=outputs)

View File

@ -206,6 +206,23 @@ class VariablePool(BaseModel):
return result
def flatten(self, *, unprefixed_node_id: str | None = None) -> Mapping[str, object]:
"""Return a selector-style snapshot of the entire variable pool.
Variables belonging to ``unprefixed_node_id`` keep their original names so callers
can expose the current node's values without duplicating its namespace. All other
entries are emitted as ``"<node_id>.<name>"`` to preserve their source prefix in a
single flat mapping.
"""
result: dict[str, object] = {}
for node_id, variables in self.variable_dictionary.items():
for name, variable in variables.items():
output_name = name if node_id == unprefixed_node_id else f"{node_id}.{name}"
result[output_name] = deepcopy(variable.value)
return result
@classmethod
def empty(cls) -> VariablePool:
"""Create an empty variable pool."""

View File

@ -19,6 +19,7 @@ from core.workflow.system_variables import SystemVariableKey
from core.workflow.variable_prefixes import (
CONVERSATION_VARIABLE_NODE_ID,
ENVIRONMENT_VARIABLE_NODE_ID,
RAG_PIPELINE_VARIABLE_NODE_ID,
SYSTEM_VARIABLE_NODE_ID,
)
from dify_graph.enums import NodeType
@ -897,9 +898,8 @@ class DraftVariableSaver:
for name, value in output.items():
value_seg = _build_segment_for_serialized_values(value)
node_id, name = self._normalize_variable_for_start_node(name)
# If node_id is not `sys`, it means that the variable is a user-defined input field
# in `Start` node.
if node_id != SYSTEM_VARIABLE_NODE_ID:
if node_id == self._node_id:
# Variables without a reserved prefix belong to the Start node itself.
draft_vars.append(
WorkflowDraftVariable.new_node_variable(
app_id=self._app_id,
@ -913,7 +913,7 @@ class DraftVariableSaver:
)
)
has_non_sys_variables = True
else:
elif node_id == SYSTEM_VARIABLE_NODE_ID:
if name == SystemVariableKey.FILES:
# Here we know the type of variable must be `array[file]`, we
# just build files from the value.
@ -938,15 +938,45 @@ class DraftVariableSaver:
editable=self._should_variable_be_editable(node_id, name),
)
)
elif node_id == CONVERSATION_VARIABLE_NODE_ID:
draft_vars.append(
WorkflowDraftVariable.new_conversation_variable(
app_id=self._app_id,
user_id=self._user.id,
name=name,
value=value_seg,
)
)
else:
draft_vars.append(
WorkflowDraftVariable.new_node_variable(
app_id=self._app_id,
user_id=self._user.id,
node_id=node_id,
name=name,
node_execution_id=self._node_execution_id,
value=value_seg,
visible=self._should_variable_be_visible(node_id, self._node_type, name),
editable=self._should_variable_be_editable(node_id, name),
)
)
if not has_non_sys_variables:
draft_vars.append(self._create_dummy_output_variable())
return draft_vars
def _normalize_variable_for_start_node(self, name: str) -> tuple[str, str]:
if not name.startswith(f"{SYSTEM_VARIABLE_NODE_ID}."):
return self._node_id, name
_, name_ = name.split(".", maxsplit=1)
return SYSTEM_VARIABLE_NODE_ID, name_
for reserved_node_id in (
SYSTEM_VARIABLE_NODE_ID,
ENVIRONMENT_VARIABLE_NODE_ID,
CONVERSATION_VARIABLE_NODE_ID,
RAG_PIPELINE_VARIABLE_NODE_ID,
):
prefix = f"{reserved_node_id}."
if name.startswith(prefix):
_, name_ = name.split(".", maxsplit=1)
return reserved_node_id, name_
return self._node_id, name
def _build_variables_from_mapping(self, output: Mapping[str, Any]) -> list[WorkflowDraftVariable]:
draft_vars = []

View File

@ -5,10 +5,13 @@ import pytest
from pydantic import ValidationError as PydanticValidationError
from core.workflow.system_variables import build_system_variables
from core.workflow.variable_prefixes import CONVERSATION_VARIABLE_NODE_ID, ENVIRONMENT_VARIABLE_NODE_ID
from dify_graph.nodes.start.entities import StartNodeData
from dify_graph.nodes.start.start_node import StartNode
from dify_graph.runtime import GraphRuntimeState
from dify_graph.variables import build_segment, segment_to_variable
from dify_graph.variables.input_entities import VariableEntity, VariableEntityType
from dify_graph.variables.variables import Variable
from tests.workflow_test_utils import build_test_graph_init_params, build_test_variable_pool
@ -232,3 +235,64 @@ def test_json_object_optional_variable_not_provided():
# Current implementation raises a validation error even when the variable is optional
with pytest.raises(ValueError, match="profile is required in input form"):
node._run()
def test_start_node_outputs_full_variable_pool_snapshot():
variable_pool = build_test_variable_pool(
variables=[
*build_system_variables(query="hello", workflow_run_id="run-123"),
_build_prefixed_variable(ENVIRONMENT_VARIABLE_NODE_ID, "API_KEY", "secret"),
_build_prefixed_variable(CONVERSATION_VARIABLE_NODE_ID, "session_id", "conversation-1"),
],
node_id="start",
inputs={"profile": {"age": 20, "name": "Tom"}},
)
config = {
"id": "start",
"data": StartNodeData(
title="Start",
variables=[
VariableEntity(
variable="profile",
label="profile",
type=VariableEntityType.JSON_OBJECT,
required=True,
)
],
).model_dump(),
}
graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter())
node = StartNode(
id="start",
config=config,
graph_init_params=build_test_graph_init_params(
workflow_id="wf",
graph_config={},
tenant_id="tenant",
app_id="app",
user_id="u",
user_from="account",
invoke_from="debugger",
call_depth=0,
),
graph_runtime_state=graph_runtime_state,
)
result = node._run()
assert result.inputs == {"profile": {"age": 20, "name": "Tom"}}
assert result.outputs["profile"] == {"age": 20, "name": "Tom"}
assert result.outputs["sys.query"] == "hello"
assert result.outputs["sys.workflow_run_id"] == "run-123"
assert result.outputs["env.API_KEY"] == "secret"
assert result.outputs["conversation.session_id"] == "conversation-1"
def _build_prefixed_variable(node_id: str, name: str, value: object) -> Variable:
return segment_to_variable(
segment=build_segment(value),
selector=(node_id, name),
name=name,
)

View File

@ -8,7 +8,11 @@ from sqlalchemy import Engine
from sqlalchemy.orm import Session
from core.workflow.system_variables import SystemVariableKey
from core.workflow.variable_prefixes import SYSTEM_VARIABLE_NODE_ID
from core.workflow.variable_prefixes import (
CONVERSATION_VARIABLE_NODE_ID,
ENVIRONMENT_VARIABLE_NODE_ID,
SYSTEM_VARIABLE_NODE_ID,
)
from dify_graph.enums import BuiltinNodeTypes
from dify_graph.variables.segments import StringSegment
from dify_graph.variables.types import SegmentType
@ -87,6 +91,20 @@ class TestDraftVariableSaver:
expected_node_id=_NODE_ID,
expected_name="start_input",
),
TestCase(
name="name with `env.` prefix should return the environment node_id",
input_node_id=_NODE_ID,
input_name="env.API_KEY",
expected_node_id=ENVIRONMENT_VARIABLE_NODE_ID,
expected_name="API_KEY",
),
TestCase(
name="name with `conversation.` prefix should return the conversation node_id",
input_node_id=_NODE_ID,
input_name="conversation.session_id",
expected_node_id=CONVERSATION_VARIABLE_NODE_ID,
expected_name="session_id",
),
TestCase(
name="dummy_variable should return the original input node_id",
input_node_id=_NODE_ID,
@ -219,6 +237,46 @@ class TestDraftVariableSaver:
str(SystemVariableKey.WORKFLOW_EXECUTION_ID),
}
@patch("services.workflow_draft_variable_service._batch_upsert_draft_variable", autospec=True)
def test_start_node_save_normalizes_reserved_prefix_outputs(self, mock_batch_upsert):
mock_session = MagicMock(spec=Session)
mock_user = MagicMock(spec=Account)
mock_user.id = "test-user-id"
mock_user.tenant_id = "test-tenant-id"
saver = DraftVariableSaver(
session=mock_session,
app_id="test-app-id",
node_id="start-node-id",
node_type=BuiltinNodeTypes.START,
node_execution_id="exec-id",
user=mock_user,
)
saver.save(
outputs={
"env.API_KEY": "secret",
"conversation.session_id": "conversation-1",
"sys.workflow_run_id": "run-id-123",
}
)
mock_batch_upsert.assert_called_once()
draft_vars = mock_batch_upsert.call_args[0][1]
assert len(draft_vars) == 4
env_var = next(v for v in draft_vars if v.node_id == ENVIRONMENT_VARIABLE_NODE_ID)
assert env_var.name == "API_KEY"
assert env_var.editable is False
conversation_var = next(v for v in draft_vars if v.node_id == CONVERSATION_VARIABLE_NODE_ID)
assert conversation_var.name == "session_id"
assert conversation_var.node_execution_id is None
sys_var = next(v for v in draft_vars if v.node_id == SYSTEM_VARIABLE_NODE_ID)
assert sys_var.name == str(SystemVariableKey.WORKFLOW_EXECUTION_ID)
class TestWorkflowDraftVariableService:
def _get_test_app_id(self):