From e44f024d663ddd62330870b7dbfe4a6fe24e3375 Mon Sep 17 00:00:00 2001 From: -LAN- Date: Tue, 17 Mar 2026 05:19:30 +0800 Subject: [PATCH] feat: show all variables in start node outputs Signed-off-by: -LAN- --- api/dify_graph/nodes/start/start_node.py | 3 +- api/dify_graph/runtime/variable_pool.py | 17 +++++ .../workflow_draft_variable_service.py | 46 ++++++++++--- .../nodes/test_start_node_json_object.py | 64 +++++++++++++++++++ .../test_workflow_draft_variable_service.py | 60 ++++++++++++++++- 5 files changed, 180 insertions(+), 10 deletions(-) diff --git a/api/dify_graph/nodes/start/start_node.py b/api/dify_graph/nodes/start/start_node.py index 3f18555d99..8d0589f150 100644 --- a/api/dify_graph/nodes/start/start_node.py +++ b/api/dify_graph/nodes/start/start_node.py @@ -20,7 +20,8 @@ class StartNode(Node[StartNodeData]): def _run(self) -> NodeRunResult: node_inputs = dict(self.graph_runtime_state.variable_pool.get_by_prefix(self.id)) self._validate_and_normalize_json_object_inputs(node_inputs) - outputs = dict(node_inputs) + outputs = dict(self.graph_runtime_state.variable_pool.flatten(unprefixed_node_id=self.id)) + outputs.update(node_inputs) return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=node_inputs, outputs=outputs) diff --git a/api/dify_graph/runtime/variable_pool.py b/api/dify_graph/runtime/variable_pool.py index 579d122bfe..29ba0496be 100644 --- a/api/dify_graph/runtime/variable_pool.py +++ b/api/dify_graph/runtime/variable_pool.py @@ -206,6 +206,23 @@ class VariablePool(BaseModel): return result + def flatten(self, *, unprefixed_node_id: str | None = None) -> Mapping[str, object]: + """Return a selector-style snapshot of the entire variable pool. + + Variables belonging to ``unprefixed_node_id`` keep their original names so callers + can expose the current node's values without duplicating its namespace. All other + entries are emitted as ``"."`` to preserve their source prefix in a + single flat mapping. + """ + + result: dict[str, object] = {} + for node_id, variables in self.variable_dictionary.items(): + for name, variable in variables.items(): + output_name = name if node_id == unprefixed_node_id else f"{node_id}.{name}" + result[output_name] = deepcopy(variable.value) + + return result + @classmethod def empty(cls) -> VariablePool: """Create an empty variable pool.""" diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index d98c36153e..15bb14b59b 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -19,6 +19,7 @@ from core.workflow.system_variables import SystemVariableKey from core.workflow.variable_prefixes import ( CONVERSATION_VARIABLE_NODE_ID, ENVIRONMENT_VARIABLE_NODE_ID, + RAG_PIPELINE_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID, ) from dify_graph.enums import NodeType @@ -897,9 +898,8 @@ class DraftVariableSaver: for name, value in output.items(): value_seg = _build_segment_for_serialized_values(value) node_id, name = self._normalize_variable_for_start_node(name) - # If node_id is not `sys`, it means that the variable is a user-defined input field - # in `Start` node. - if node_id != SYSTEM_VARIABLE_NODE_ID: + if node_id == self._node_id: + # Variables without a reserved prefix belong to the Start node itself. draft_vars.append( WorkflowDraftVariable.new_node_variable( app_id=self._app_id, @@ -913,7 +913,7 @@ class DraftVariableSaver: ) ) has_non_sys_variables = True - else: + elif node_id == SYSTEM_VARIABLE_NODE_ID: if name == SystemVariableKey.FILES: # Here we know the type of variable must be `array[file]`, we # just build files from the value. @@ -938,15 +938,45 @@ class DraftVariableSaver: editable=self._should_variable_be_editable(node_id, name), ) ) + elif node_id == CONVERSATION_VARIABLE_NODE_ID: + draft_vars.append( + WorkflowDraftVariable.new_conversation_variable( + app_id=self._app_id, + user_id=self._user.id, + name=name, + value=value_seg, + ) + ) + else: + draft_vars.append( + WorkflowDraftVariable.new_node_variable( + app_id=self._app_id, + user_id=self._user.id, + node_id=node_id, + name=name, + node_execution_id=self._node_execution_id, + value=value_seg, + visible=self._should_variable_be_visible(node_id, self._node_type, name), + editable=self._should_variable_be_editable(node_id, name), + ) + ) if not has_non_sys_variables: draft_vars.append(self._create_dummy_output_variable()) return draft_vars def _normalize_variable_for_start_node(self, name: str) -> tuple[str, str]: - if not name.startswith(f"{SYSTEM_VARIABLE_NODE_ID}."): - return self._node_id, name - _, name_ = name.split(".", maxsplit=1) - return SYSTEM_VARIABLE_NODE_ID, name_ + for reserved_node_id in ( + SYSTEM_VARIABLE_NODE_ID, + ENVIRONMENT_VARIABLE_NODE_ID, + CONVERSATION_VARIABLE_NODE_ID, + RAG_PIPELINE_VARIABLE_NODE_ID, + ): + prefix = f"{reserved_node_id}." + if name.startswith(prefix): + _, name_ = name.split(".", maxsplit=1) + return reserved_node_id, name_ + + return self._node_id, name def _build_variables_from_mapping(self, output: Mapping[str, Any]) -> list[WorkflowDraftVariable]: draft_vars = [] diff --git a/api/tests/unit_tests/core/workflow/nodes/test_start_node_json_object.py b/api/tests/unit_tests/core/workflow/nodes/test_start_node_json_object.py index 4b1545b9e8..80ee3858ae 100644 --- a/api/tests/unit_tests/core/workflow/nodes/test_start_node_json_object.py +++ b/api/tests/unit_tests/core/workflow/nodes/test_start_node_json_object.py @@ -5,10 +5,13 @@ import pytest from pydantic import ValidationError as PydanticValidationError from core.workflow.system_variables import build_system_variables +from core.workflow.variable_prefixes import CONVERSATION_VARIABLE_NODE_ID, ENVIRONMENT_VARIABLE_NODE_ID from dify_graph.nodes.start.entities import StartNodeData from dify_graph.nodes.start.start_node import StartNode from dify_graph.runtime import GraphRuntimeState +from dify_graph.variables import build_segment, segment_to_variable from dify_graph.variables.input_entities import VariableEntity, VariableEntityType +from dify_graph.variables.variables import Variable from tests.workflow_test_utils import build_test_graph_init_params, build_test_variable_pool @@ -232,3 +235,64 @@ def test_json_object_optional_variable_not_provided(): # Current implementation raises a validation error even when the variable is optional with pytest.raises(ValueError, match="profile is required in input form"): node._run() + + +def test_start_node_outputs_full_variable_pool_snapshot(): + variable_pool = build_test_variable_pool( + variables=[ + *build_system_variables(query="hello", workflow_run_id="run-123"), + _build_prefixed_variable(ENVIRONMENT_VARIABLE_NODE_ID, "API_KEY", "secret"), + _build_prefixed_variable(CONVERSATION_VARIABLE_NODE_ID, "session_id", "conversation-1"), + ], + node_id="start", + inputs={"profile": {"age": 20, "name": "Tom"}}, + ) + + config = { + "id": "start", + "data": StartNodeData( + title="Start", + variables=[ + VariableEntity( + variable="profile", + label="profile", + type=VariableEntityType.JSON_OBJECT, + required=True, + ) + ], + ).model_dump(), + } + + graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) + node = StartNode( + id="start", + config=config, + graph_init_params=build_test_graph_init_params( + workflow_id="wf", + graph_config={}, + tenant_id="tenant", + app_id="app", + user_id="u", + user_from="account", + invoke_from="debugger", + call_depth=0, + ), + graph_runtime_state=graph_runtime_state, + ) + + result = node._run() + + assert result.inputs == {"profile": {"age": 20, "name": "Tom"}} + assert result.outputs["profile"] == {"age": 20, "name": "Tom"} + assert result.outputs["sys.query"] == "hello" + assert result.outputs["sys.workflow_run_id"] == "run-123" + assert result.outputs["env.API_KEY"] == "secret" + assert result.outputs["conversation.session_id"] == "conversation-1" + + +def _build_prefixed_variable(node_id: str, name: str, value: object) -> Variable: + return segment_to_variable( + segment=build_segment(value), + selector=(node_id, name), + name=name, + ) diff --git a/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py b/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py index 0bd9478557..eb53edcffa 100644 --- a/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py +++ b/api/tests/unit_tests/services/workflow/test_workflow_draft_variable_service.py @@ -8,7 +8,11 @@ from sqlalchemy import Engine from sqlalchemy.orm import Session from core.workflow.system_variables import SystemVariableKey -from core.workflow.variable_prefixes import SYSTEM_VARIABLE_NODE_ID +from core.workflow.variable_prefixes import ( + CONVERSATION_VARIABLE_NODE_ID, + ENVIRONMENT_VARIABLE_NODE_ID, + SYSTEM_VARIABLE_NODE_ID, +) from dify_graph.enums import BuiltinNodeTypes from dify_graph.variables.segments import StringSegment from dify_graph.variables.types import SegmentType @@ -87,6 +91,20 @@ class TestDraftVariableSaver: expected_node_id=_NODE_ID, expected_name="start_input", ), + TestCase( + name="name with `env.` prefix should return the environment node_id", + input_node_id=_NODE_ID, + input_name="env.API_KEY", + expected_node_id=ENVIRONMENT_VARIABLE_NODE_ID, + expected_name="API_KEY", + ), + TestCase( + name="name with `conversation.` prefix should return the conversation node_id", + input_node_id=_NODE_ID, + input_name="conversation.session_id", + expected_node_id=CONVERSATION_VARIABLE_NODE_ID, + expected_name="session_id", + ), TestCase( name="dummy_variable should return the original input node_id", input_node_id=_NODE_ID, @@ -219,6 +237,46 @@ class TestDraftVariableSaver: str(SystemVariableKey.WORKFLOW_EXECUTION_ID), } + @patch("services.workflow_draft_variable_service._batch_upsert_draft_variable", autospec=True) + def test_start_node_save_normalizes_reserved_prefix_outputs(self, mock_batch_upsert): + mock_session = MagicMock(spec=Session) + mock_user = MagicMock(spec=Account) + mock_user.id = "test-user-id" + mock_user.tenant_id = "test-tenant-id" + + saver = DraftVariableSaver( + session=mock_session, + app_id="test-app-id", + node_id="start-node-id", + node_type=BuiltinNodeTypes.START, + node_execution_id="exec-id", + user=mock_user, + ) + + saver.save( + outputs={ + "env.API_KEY": "secret", + "conversation.session_id": "conversation-1", + "sys.workflow_run_id": "run-id-123", + } + ) + + mock_batch_upsert.assert_called_once() + draft_vars = mock_batch_upsert.call_args[0][1] + + assert len(draft_vars) == 4 + + env_var = next(v for v in draft_vars if v.node_id == ENVIRONMENT_VARIABLE_NODE_ID) + assert env_var.name == "API_KEY" + assert env_var.editable is False + + conversation_var = next(v for v in draft_vars if v.node_id == CONVERSATION_VARIABLE_NODE_ID) + assert conversation_var.name == "session_id" + assert conversation_var.node_execution_id is None + + sys_var = next(v for v in draft_vars if v.node_id == SYSTEM_VARIABLE_NODE_ID) + assert sys_var.name == str(SystemVariableKey.WORKFLOW_EXECUTION_ID) + class TestWorkflowDraftVariableService: def _get_test_app_id(self):