From 1b6e695520f94731afb6d9217ea7d1939dc10764 Mon Sep 17 00:00:00 2001 From: 99 Date: Sat, 14 Mar 2026 22:33:13 +0800 Subject: [PATCH] refactor(workflow): move agent node back to core workflow (#33431) --- api/.importlinter | 12 - api/controllers/console/workspace/plugin.py | 29 +- api/core/agent/cot_agent_runner.py | 2 +- api/core/agent/errors.py | 9 + api/core/agent/fc_agent_runner.py | 2 +- api/core/app/apps/workflow_app_runner.py | 21 +- api/core/app/entities/__init__.py | 3 + api/core/app/entities/agent_strategy.py | 8 + api/core/app/entities/queue_entities.py | 4 +- api/core/app/entities/task_entities.py | 4 +- api/core/workflow/node_factory.py | 34 +- api/core/workflow/node_resolution.py | 42 + api/core/workflow/nodes/__init__.py | 0 api/core/workflow/nodes/agent/__init__.py | 4 + api/core/workflow/nodes/agent/agent_node.py | 188 +++++ .../workflow}/nodes/agent/entities.py | 4 +- .../workflow/nodes/agent/exceptions.py} | 11 - .../nodes/agent/message_transformer.py | 292 +++++++ .../nodes/agent/plugin_strategy_adapter.py | 40 + .../workflow/nodes/agent/runtime_support.py | 276 +++++++ .../nodes/agent/strategy_protocols.py | 39 + api/core/workflow/workflow_entry.py | 4 +- api/dify_graph/entities/__init__.py | 2 - api/dify_graph/entities/agent.py | 8 - api/dify_graph/graph_events/node.py | 3 +- api/dify_graph/nodes/agent/__init__.py | 3 - api/dify_graph/nodes/agent/agent_node.py | 761 ------------------ api/dify_graph/nodes/base/node.py | 53 +- .../nodes/datasource/datasource_node.py | 4 + .../nodes/iteration/iteration_node.py | 7 +- api/dify_graph/nodes/loop/loop_node.py | 7 +- api/dify_graph/nodes/node_mapping.py | 23 +- api/dify_graph/nodes/tool/tool_node.py | 4 + .../trigger_plugin/trigger_event_node.py | 3 + api/services/rag_pipeline/rag_pipeline.py | 9 +- api/services/workflow_service.py | 9 +- .../workflow/nodes/test_tool.py | 46 +- .../console/workspace/test_plugin.py | 6 + .../core/agent/test_cot_agent_runner.py | 2 +- .../core/agent/test_fc_agent_runner.py | 2 +- .../app/apps/test_workflow_app_runner_core.py | 8 +- .../workflow/graph_engine/test_mock_nodes.py | 10 +- .../core/workflow/test_node_factory.py | 31 +- .../workflow/test_workflow_entry_helpers.py | 12 +- .../services/test_workflow_service.py | 38 +- 45 files changed, 1115 insertions(+), 964 deletions(-) create mode 100644 api/core/agent/errors.py create mode 100644 api/core/app/entities/agent_strategy.py create mode 100644 api/core/workflow/node_resolution.py create mode 100644 api/core/workflow/nodes/__init__.py create mode 100644 api/core/workflow/nodes/agent/__init__.py create mode 100644 api/core/workflow/nodes/agent/agent_node.py rename api/{dify_graph => core/workflow}/nodes/agent/entities.py (93%) rename api/{dify_graph/nodes/agent/exc.py => core/workflow/nodes/agent/exceptions.py} (90%) create mode 100644 api/core/workflow/nodes/agent/message_transformer.py create mode 100644 api/core/workflow/nodes/agent/plugin_strategy_adapter.py create mode 100644 api/core/workflow/nodes/agent/runtime_support.py create mode 100644 api/core/workflow/nodes/agent/strategy_protocols.py delete mode 100644 api/dify_graph/entities/agent.py delete mode 100644 api/dify_graph/nodes/agent/__init__.py delete mode 100644 api/dify_graph/nodes/agent/agent_node.py diff --git a/api/.importlinter b/api/.importlinter index 5c0a6e1288..8dffc3506b 100644 --- a/api/.importlinter +++ b/api/.importlinter @@ -43,7 +43,6 @@ forbidden_modules = extensions.ext_redis allow_indirect_imports = True ignore_imports = - dify_graph.nodes.agent.agent_node -> extensions.ext_database dify_graph.nodes.llm.node -> extensions.ext_database dify_graph.model_runtime.model_providers.__base.ai_model -> extensions.ext_redis dify_graph.model_runtime.model_providers.model_provider_factory -> extensions.ext_redis @@ -90,9 +89,6 @@ forbidden_modules = core.trigger core.variables ignore_imports = - dify_graph.nodes.agent.agent_node -> core.model_manager - dify_graph.nodes.agent.agent_node -> core.provider_manager - dify_graph.nodes.agent.agent_node -> core.tools.tool_manager dify_graph.nodes.llm.llm_utils -> core.model_manager dify_graph.nodes.llm.protocols -> core.model_manager dify_graph.nodes.llm.llm_utils -> dify_graph.model_runtime.model_providers.__base.large_language_model @@ -100,8 +96,6 @@ ignore_imports = dify_graph.nodes.tool.tool_node -> core.callback_handler.workflow_tool_callback_handler dify_graph.nodes.tool.tool_node -> core.tools.tool_engine dify_graph.nodes.tool.tool_node -> core.tools.tool_manager - dify_graph.nodes.agent.agent_node -> core.agent.entities - dify_graph.nodes.agent.agent_node -> core.agent.plugin_entities dify_graph.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.app_config.entities dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.advanced_prompt_transform dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.simple_prompt_transform @@ -110,12 +104,10 @@ ignore_imports = dify_graph.nodes.parameter_extractor.parameter_extractor_node -> core.model_manager dify_graph.nodes.question_classifier.question_classifier_node -> core.model_manager dify_graph.nodes.tool.tool_node -> core.tools.utils.message_transformer - dify_graph.nodes.agent.agent_node -> models.model dify_graph.nodes.llm.node -> core.helper.code_executor dify_graph.nodes.llm.node -> core.llm_generator.output_parser.errors dify_graph.nodes.llm.node -> core.llm_generator.output_parser.structured_output dify_graph.nodes.llm.node -> core.model_manager - dify_graph.nodes.agent.entities -> core.prompt.entities.advanced_prompt_entities dify_graph.nodes.llm.entities -> core.prompt.entities.advanced_prompt_entities dify_graph.nodes.llm.node -> core.prompt.entities.advanced_prompt_entities dify_graph.nodes.llm.node -> core.prompt.utils.prompt_message_util @@ -126,15 +118,11 @@ ignore_imports = dify_graph.nodes.question_classifier.question_classifier_node -> core.prompt.utils.prompt_message_util dify_graph.nodes.knowledge_index.entities -> core.rag.retrieval.retrieval_methods dify_graph.nodes.llm.node -> models.dataset - dify_graph.nodes.agent.agent_node -> core.tools.utils.message_transformer dify_graph.nodes.llm.file_saver -> core.tools.signature dify_graph.nodes.llm.file_saver -> core.tools.tool_file_manager dify_graph.nodes.tool.tool_node -> core.tools.errors - dify_graph.nodes.agent.agent_node -> extensions.ext_database dify_graph.nodes.llm.node -> extensions.ext_database - dify_graph.nodes.agent.agent_node -> models dify_graph.nodes.llm.node -> models.model - dify_graph.nodes.agent.agent_node -> services dify_graph.nodes.tool.tool_node -> services dify_graph.model_runtime.model_providers.__base.ai_model -> configs dify_graph.model_runtime.model_providers.__base.ai_model -> extensions.ext_redis diff --git a/api/controllers/console/workspace/plugin.py b/api/controllers/console/workspace/plugin.py index 2f06f72f29..ee537367c7 100644 --- a/api/controllers/console/workspace/plugin.py +++ b/api/controllers/console/workspace/plugin.py @@ -5,6 +5,7 @@ from typing import Any, Literal from flask import request, send_file from flask_restx import Resource from pydantic import BaseModel, Field +from werkzeug.datastructures import FileStorage from werkzeug.exceptions import Forbidden from configs import dify_config @@ -169,6 +170,20 @@ register_enum_models( ) +def _read_upload_content(file: FileStorage, max_size: int) -> bytes: + """ + Read the uploaded file and validate its actual size before delegating to the plugin service. + + FileStorage.content_length is not reliable for multipart test uploads and may be zero even when + content exists, so the controllers validate against the loaded bytes instead. + """ + content = file.read() + if len(content) > max_size: + raise ValueError("File size exceeds the maximum allowed size") + + return content + + @console_ns.route("/workspaces/current/plugin/debugging-key") class PluginDebuggingKeyApi(Resource): @setup_required @@ -284,12 +299,7 @@ class PluginUploadFromPkgApi(Resource): _, tenant_id = current_account_with_tenant() file = request.files["pkg"] - - # check file size - if file.content_length > dify_config.PLUGIN_MAX_PACKAGE_SIZE: - raise ValueError("File size exceeds the maximum allowed size") - - content = file.read() + content = _read_upload_content(file, dify_config.PLUGIN_MAX_PACKAGE_SIZE) try: response = PluginService.upload_pkg(tenant_id, content) except PluginDaemonClientSideError as e: @@ -328,12 +338,7 @@ class PluginUploadFromBundleApi(Resource): _, tenant_id = current_account_with_tenant() file = request.files["bundle"] - - # check file size - if file.content_length > dify_config.PLUGIN_MAX_BUNDLE_SIZE: - raise ValueError("File size exceeds the maximum allowed size") - - content = file.read() + content = _read_upload_content(file, dify_config.PLUGIN_MAX_BUNDLE_SIZE) try: response = PluginService.upload_bundle(tenant_id, content) except PluginDaemonClientSideError as e: diff --git a/api/core/agent/cot_agent_runner.py b/api/core/agent/cot_agent_runner.py index c6ecd5509b..9271ed10bd 100644 --- a/api/core/agent/cot_agent_runner.py +++ b/api/core/agent/cot_agent_runner.py @@ -6,6 +6,7 @@ from typing import Any from core.agent.base_agent_runner import BaseAgentRunner from core.agent.entities import AgentScratchpadUnit +from core.agent.errors import AgentMaxIterationError from core.agent.output_parser.cot_output_parser import CotAgentOutputParser from core.app.apps.base_app_queue_manager import PublishFrom from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent @@ -22,7 +23,6 @@ from dify_graph.model_runtime.entities.message_entities import ( ToolPromptMessage, UserPromptMessage, ) -from dify_graph.nodes.agent.exc import AgentMaxIterationError from models.model import Message logger = logging.getLogger(__name__) diff --git a/api/core/agent/errors.py b/api/core/agent/errors.py new file mode 100644 index 0000000000..ed504d500a --- /dev/null +++ b/api/core/agent/errors.py @@ -0,0 +1,9 @@ +class AgentMaxIterationError(Exception): + """Raised when an agent runner exceeds the configured max iteration count.""" + + def __init__(self, max_iteration: int): + self.max_iteration = max_iteration + super().__init__( + f"Agent exceeded the maximum iteration limit of {max_iteration}. " + f"The agent was unable to complete the task within the allowed number of iterations." + ) diff --git a/api/core/agent/fc_agent_runner.py b/api/core/agent/fc_agent_runner.py index 3271fe319b..5e13a13b21 100644 --- a/api/core/agent/fc_agent_runner.py +++ b/api/core/agent/fc_agent_runner.py @@ -5,6 +5,7 @@ from copy import deepcopy from typing import Any, Union from core.agent.base_agent_runner import BaseAgentRunner +from core.agent.errors import AgentMaxIterationError from core.app.apps.base_app_queue_manager import PublishFrom from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform @@ -25,7 +26,6 @@ from dify_graph.model_runtime.entities import ( UserPromptMessage, ) from dify_graph.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes -from dify_graph.nodes.agent.exc import AgentMaxIterationError from models.model import Message logger = logging.getLogger(__name__) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 3812fac28a..8986164fe7 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -3,7 +3,10 @@ import time from collections.abc import Mapping, Sequence from typing import Any, cast +from pydantic import ValidationError + from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom +from core.app.entities.agent_strategy import AgentStrategyInfo from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context from core.app.entities.queue_entities import ( AppQueueEvent, @@ -30,6 +33,7 @@ from core.app.entities.queue_entities import ( QueueWorkflowSucceededEvent, ) from core.workflow.node_factory import DifyNodeFactory +from core.workflow.node_resolution import resolve_workflow_node_class from core.workflow.workflow_entry import WorkflowEntry from dify_graph.entities import GraphInitParams from dify_graph.entities.graph_config import NodeConfigDictAdapter @@ -63,7 +67,6 @@ from dify_graph.graph_events import ( NodeRunSucceededEvent, ) from dify_graph.graph_events.graph import GraphRunAbortedEvent -from dify_graph.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING from dify_graph.runtime import GraphRuntimeState, VariablePool from dify_graph.system_variable import SystemVariable from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool @@ -308,7 +311,7 @@ class WorkflowBasedAppRunner: # Get node class node_type = target_node_config["data"].type node_version = str(target_node_config["data"].version) - node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version] + node_cls = resolve_workflow_node_class(node_type=node_type, node_version=node_version) # Use the variable pool from graph_runtime_state instead of creating a new one variable_pool = graph_runtime_state.variable_pool @@ -336,6 +339,18 @@ class WorkflowBasedAppRunner: return graph, variable_pool + @staticmethod + def _build_agent_strategy_info(event: NodeRunStartedEvent) -> AgentStrategyInfo | None: + raw_agent_strategy = event.extras.get("agent_strategy") + if raw_agent_strategy is None: + return None + + try: + return AgentStrategyInfo.model_validate(raw_agent_strategy) + except ValidationError: + logger.warning("Invalid agent strategy payload for node %s", event.node_id, exc_info=True) + return None + def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent): """ Handle event @@ -421,7 +436,7 @@ class WorkflowBasedAppRunner: start_at=event.start_at, in_iteration_id=event.in_iteration_id, in_loop_id=event.in_loop_id, - agent_strategy=event.agent_strategy, + agent_strategy=self._build_agent_strategy_info(event), provider_type=event.provider_type, provider_id=event.provider_id, ) diff --git a/api/core/app/entities/__init__.py b/api/core/app/entities/__init__.py index e69de29bb2..8e41acee32 100644 --- a/api/core/app/entities/__init__.py +++ b/api/core/app/entities/__init__.py @@ -0,0 +1,3 @@ +from .agent_strategy import AgentStrategyInfo + +__all__ = ["AgentStrategyInfo"] diff --git a/api/core/app/entities/agent_strategy.py b/api/core/app/entities/agent_strategy.py new file mode 100644 index 0000000000..b063a12f4f --- /dev/null +++ b/api/core/app/entities/agent_strategy.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel, ConfigDict + + +class AgentStrategyInfo(BaseModel): + name: str + icon: str | None = None + + model_config = ConfigDict(extra="forbid") diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index d42df0d1bf..2d1508f0cb 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -5,8 +5,8 @@ from typing import Any from pydantic import BaseModel, ConfigDict, Field +from core.app.entities.agent_strategy import AgentStrategyInfo from core.rag.entities.citation_metadata import RetrievalSourceMetadata -from dify_graph.entities import AgentNodeStrategyInit from dify_graph.entities.pause_reason import PauseReason from dify_graph.entities.workflow_start_reason import WorkflowStartReason from dify_graph.enums import WorkflowNodeExecutionMetadataKey @@ -314,7 +314,7 @@ class QueueNodeStartedEvent(AppQueueEvent): in_iteration_id: str | None = None in_loop_id: str | None = None start_at: datetime - agent_strategy: AgentNodeStrategyInit | None = None + agent_strategy: AgentStrategyInfo | None = None # FIXME(-LAN-): only for ToolNode, need to refactor provider_type: str # should be a core.tools.entities.tool_entities.ToolProviderType diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index b58dae0ff2..46a8ab52f2 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -4,8 +4,8 @@ from typing import Any from pydantic import BaseModel, ConfigDict, Field +from core.app.entities.agent_strategy import AgentStrategyInfo from core.rag.entities.citation_metadata import RetrievalSourceMetadata -from dify_graph.entities import AgentNodeStrategyInit from dify_graph.entities.workflow_start_reason import WorkflowStartReason from dify_graph.enums import WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from dify_graph.model_runtime.entities.llm_entities import LLMResult, LLMUsage @@ -349,7 +349,7 @@ class NodeStartStreamResponse(StreamResponse): extras: dict[str, object] = Field(default_factory=dict) iteration_id: str | None = None loop_id: str | None = None - agent_strategy: AgentNodeStrategyInit | None = None + agent_strategy: AgentStrategyInfo | None = None event: StreamEvent = StreamEvent.NODE_STARTED workflow_run_id: str diff --git a/api/core/workflow/node_factory.py b/api/core/workflow/node_factory.py index d7f2a67c06..bc4e0eda71 100644 --- a/api/core/workflow/node_factory.py +++ b/api/core/workflow/node_factory.py @@ -22,6 +22,13 @@ from core.rag.retrieval.dataset_retrieval import DatasetRetrieval from core.rag.summary_index.summary_index import SummaryIndex from core.repositories.human_input_repository import HumanInputFormRepositoryImpl from core.tools.tool_file_manager import ToolFileManager +from core.workflow.node_resolution import resolve_workflow_node_class +from core.workflow.nodes.agent.message_transformer import AgentMessageTransformer +from core.workflow.nodes.agent.plugin_strategy_adapter import ( + PluginAgentStrategyPresentationProvider, + PluginAgentStrategyResolver, +) +from core.workflow.nodes.agent.runtime_support import AgentRuntimeSupport from dify_graph.entities.base_node_data import BaseNodeData from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY @@ -39,7 +46,6 @@ from dify_graph.nodes.document_extractor import UnstructuredApiConfig from dify_graph.nodes.http_request import build_http_request_config from dify_graph.nodes.llm.entities import LLMNodeData from dify_graph.nodes.llm.exc import LLMModeRequiredError, ModelNotExistError -from dify_graph.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from dify_graph.nodes.parameter_extractor.entities import ParameterExtractorNodeData from dify_graph.nodes.question_classifier.entities import QuestionClassifierNodeData from dify_graph.nodes.template_transform.template_renderer import ( @@ -97,10 +103,7 @@ class DefaultWorkflowCodeExecutor: @final class DifyNodeFactory(NodeFactory): """ - Default implementation of NodeFactory that uses the traditional node mapping. - - This factory creates nodes by looking up their types in NODE_TYPE_CLASSES_MAPPING - and instantiating the appropriate node class. + Default implementation of NodeFactory that resolves node classes from the live registry. """ def __init__( @@ -143,6 +146,10 @@ class DifyNodeFactory(NodeFactory): ) self._llm_credentials_provider, self._llm_model_factory = build_dify_model_access(self._dify_context.tenant_id) + self._agent_strategy_resolver = PluginAgentStrategyResolver() + self._agent_strategy_presentation_provider = PluginAgentStrategyPresentationProvider() + self._agent_runtime_support = AgentRuntimeSupport() + self._agent_message_transformer = AgentMessageTransformer() @staticmethod def _resolve_dify_context(run_context: Mapping[str, Any]) -> DifyRunContext: @@ -219,6 +226,12 @@ class DifyNodeFactory(NodeFactory): NodeType.TOOL: lambda: { "tool_file_manager_factory": self._http_request_tool_file_manager_factory(), }, + NodeType.AGENT: lambda: { + "strategy_resolver": self._agent_strategy_resolver, + "presentation_provider": self._agent_strategy_presentation_provider, + "runtime_support": self._agent_runtime_support, + "message_transformer": self._agent_message_transformer, + }, } node_init_kwargs = node_init_kwargs_factories.get(node_type, lambda: {})() return node_class( @@ -238,16 +251,7 @@ class DifyNodeFactory(NodeFactory): @staticmethod def _resolve_node_class(*, node_type: NodeType, node_version: str) -> type[Node]: - node_mapping = NODE_TYPE_CLASSES_MAPPING.get(node_type) - if not node_mapping: - raise ValueError(f"No class mapping found for node type: {node_type}") - - latest_node_class = node_mapping.get(LATEST_VERSION) - matched_node_class = node_mapping.get(node_version) - node_class = matched_node_class or latest_node_class - if not node_class: - raise ValueError(f"No latest version class found for node type: {node_type}") - return node_class + return resolve_workflow_node_class(node_type=node_type, node_version=node_version) def _build_llm_compatible_node_init_kwargs( self, diff --git a/api/core/workflow/node_resolution.py b/api/core/workflow/node_resolution.py new file mode 100644 index 0000000000..b922c28165 --- /dev/null +++ b/api/core/workflow/node_resolution.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from collections.abc import Mapping +from importlib import import_module + +from dify_graph.enums import NodeType +from dify_graph.nodes.base.node import Node +from dify_graph.nodes.node_mapping import LATEST_VERSION, get_node_type_classes_mapping + +_WORKFLOW_NODE_MODULES = ("core.workflow.nodes.agent",) +_workflow_nodes_registered = False + + +def ensure_workflow_nodes_registered() -> None: + """Import workflow-local node modules so they can register with `Node.__init_subclass__`.""" + global _workflow_nodes_registered + + if _workflow_nodes_registered: + return + + for module_name in _WORKFLOW_NODE_MODULES: + import_module(module_name) + + _workflow_nodes_registered = True + + +def get_workflow_node_type_classes_mapping() -> Mapping[NodeType, Mapping[str, type[Node]]]: + ensure_workflow_nodes_registered() + return get_node_type_classes_mapping() + + +def resolve_workflow_node_class(*, node_type: NodeType, node_version: str) -> type[Node]: + node_mapping = get_workflow_node_type_classes_mapping().get(node_type) + if not node_mapping: + raise ValueError(f"No class mapping found for node type: {node_type}") + + latest_node_class = node_mapping.get(LATEST_VERSION) + matched_node_class = node_mapping.get(node_version) + node_class = matched_node_class or latest_node_class + if not node_class: + raise ValueError(f"No latest version class found for node type: {node_type}") + return node_class diff --git a/api/core/workflow/nodes/__init__.py b/api/core/workflow/nodes/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/workflow/nodes/agent/__init__.py b/api/core/workflow/nodes/agent/__init__.py new file mode 100644 index 0000000000..ba6c667194 --- /dev/null +++ b/api/core/workflow/nodes/agent/__init__.py @@ -0,0 +1,4 @@ +from .agent_node import AgentNode +from .entities import AgentNodeData + +__all__ = ["AgentNode", "AgentNodeData"] diff --git a/api/core/workflow/nodes/agent/agent_node.py b/api/core/workflow/nodes/agent/agent_node.py new file mode 100644 index 0000000000..c1b423d69d --- /dev/null +++ b/api/core/workflow/nodes/agent/agent_node.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +from collections.abc import Generator, Mapping, Sequence +from typing import TYPE_CHECKING, Any + +from dify_graph.entities.graph_config import NodeConfigDict +from dify_graph.enums import NodeType, SystemVariableKey, WorkflowNodeExecutionStatus +from dify_graph.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent +from dify_graph.nodes.base.node import Node +from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser + +from .entities import AgentNodeData +from .exceptions import ( + AgentInvocationError, + AgentMessageTransformError, +) +from .message_transformer import AgentMessageTransformer +from .runtime_support import AgentRuntimeSupport +from .strategy_protocols import AgentStrategyPresentationProvider, AgentStrategyResolver + +if TYPE_CHECKING: + from dify_graph.entities import GraphInitParams + from dify_graph.runtime import GraphRuntimeState + + +class AgentNode(Node[AgentNodeData]): + node_type = NodeType.AGENT + + _strategy_resolver: AgentStrategyResolver + _presentation_provider: AgentStrategyPresentationProvider + _runtime_support: AgentRuntimeSupport + _message_transformer: AgentMessageTransformer + + def __init__( + self, + id: str, + config: NodeConfigDict, + graph_init_params: GraphInitParams, + graph_runtime_state: GraphRuntimeState, + *, + strategy_resolver: AgentStrategyResolver, + presentation_provider: AgentStrategyPresentationProvider, + runtime_support: AgentRuntimeSupport, + message_transformer: AgentMessageTransformer, + ) -> None: + super().__init__( + id=id, + config=config, + graph_init_params=graph_init_params, + graph_runtime_state=graph_runtime_state, + ) + self._strategy_resolver = strategy_resolver + self._presentation_provider = presentation_provider + self._runtime_support = runtime_support + self._message_transformer = message_transformer + + @classmethod + def version(cls) -> str: + return "1" + + def populate_start_event(self, event) -> None: + dify_ctx = self.require_dify_context() + event.extras["agent_strategy"] = { + "name": self.node_data.agent_strategy_name, + "icon": self._presentation_provider.get_icon( + tenant_id=dify_ctx.tenant_id, + agent_strategy_provider_name=self.node_data.agent_strategy_provider_name, + ), + } + + def _run(self) -> Generator[NodeEventBase, None, None]: + from core.plugin.impl.exc import PluginDaemonClientSideError + + dify_ctx = self.require_dify_context() + + try: + strategy = self._strategy_resolver.resolve( + tenant_id=dify_ctx.tenant_id, + agent_strategy_provider_name=self.node_data.agent_strategy_provider_name, + agent_strategy_name=self.node_data.agent_strategy_name, + ) + except Exception as e: + yield StreamCompletedEvent( + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs={}, + error=f"Failed to get agent strategy: {str(e)}", + ), + ) + return + + agent_parameters = strategy.get_parameters() + + parameters = self._runtime_support.build_parameters( + agent_parameters=agent_parameters, + variable_pool=self.graph_runtime_state.variable_pool, + node_data=self.node_data, + strategy=strategy, + tenant_id=dify_ctx.tenant_id, + app_id=dify_ctx.app_id, + invoke_from=dify_ctx.invoke_from, + ) + parameters_for_log = self._runtime_support.build_parameters( + agent_parameters=agent_parameters, + variable_pool=self.graph_runtime_state.variable_pool, + node_data=self.node_data, + strategy=strategy, + tenant_id=dify_ctx.tenant_id, + app_id=dify_ctx.app_id, + invoke_from=dify_ctx.invoke_from, + for_log=True, + ) + credentials = self._runtime_support.build_credentials(parameters=parameters) + + conversation_id = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID]) + + try: + message_stream = strategy.invoke( + params=parameters, + user_id=dify_ctx.user_id, + app_id=dify_ctx.app_id, + conversation_id=conversation_id.text if conversation_id else None, + credentials=credentials, + ) + except Exception as e: + error = AgentInvocationError(f"Failed to invoke agent: {str(e)}", original_error=e) + yield StreamCompletedEvent( + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=parameters_for_log, + error=str(error), + ) + ) + return + + try: + yield from self._message_transformer.transform( + messages=message_stream, + tool_info={ + "icon": self._presentation_provider.get_icon( + tenant_id=dify_ctx.tenant_id, + agent_strategy_provider_name=self.node_data.agent_strategy_provider_name, + ), + "agent_strategy": self.node_data.agent_strategy_name, + }, + parameters_for_log=parameters_for_log, + user_id=dify_ctx.user_id, + tenant_id=dify_ctx.tenant_id, + node_type=self.node_type, + node_id=self._node_id, + node_execution_id=self.id, + ) + except PluginDaemonClientSideError as e: + transform_error = AgentMessageTransformError( + f"Failed to transform agent message: {str(e)}", original_error=e + ) + yield StreamCompletedEvent( + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=parameters_for_log, + error=str(transform_error), + ) + ) + + @classmethod + def _extract_variable_selector_to_variable_mapping( + cls, + *, + graph_config: Mapping[str, Any], + node_id: str, + node_data: AgentNodeData, + ) -> Mapping[str, Sequence[str]]: + _ = graph_config # Explicitly mark as unused + result: dict[str, Any] = {} + typed_node_data = node_data + for parameter_name in typed_node_data.agent_parameters: + input = typed_node_data.agent_parameters[parameter_name] + match input.type: + case "mixed" | "constant": + selectors = VariableTemplateParser(str(input.value)).extract_variable_selectors() + for selector in selectors: + result[selector.variable] = selector.value_selector + case "variable": + result[parameter_name] = input.value + + result = {node_id + "." + key: value for key, value in result.items()} + + return result diff --git a/api/dify_graph/nodes/agent/entities.py b/api/core/workflow/nodes/agent/entities.py similarity index 93% rename from api/dify_graph/nodes/agent/entities.py rename to api/core/workflow/nodes/agent/entities.py index f7b7af8fa4..59842862ef 100644 --- a/api/dify_graph/nodes/agent/entities.py +++ b/api/core/workflow/nodes/agent/entities.py @@ -11,9 +11,9 @@ from dify_graph.enums import NodeType class AgentNodeData(BaseNodeData): type: NodeType = NodeType.AGENT - agent_strategy_provider_name: str # redundancy + agent_strategy_provider_name: str agent_strategy_name: str - agent_strategy_label: str # redundancy + agent_strategy_label: str memory: MemoryConfig | None = None # The version of the tool parameter. # If this value is None, it indicates this is a previous version diff --git a/api/dify_graph/nodes/agent/exc.py b/api/core/workflow/nodes/agent/exceptions.py similarity index 90% rename from api/dify_graph/nodes/agent/exc.py rename to api/core/workflow/nodes/agent/exceptions.py index ba2c83d8a6..944f5f0b20 100644 --- a/api/dify_graph/nodes/agent/exc.py +++ b/api/core/workflow/nodes/agent/exceptions.py @@ -119,14 +119,3 @@ class AgentVariableTypeError(AgentNodeError): self.expected_type = expected_type self.actual_type = actual_type super().__init__(message) - - -class AgentMaxIterationError(AgentNodeError): - """Exception raised when the agent exceeds the maximum iteration limit.""" - - def __init__(self, max_iteration: int): - self.max_iteration = max_iteration - super().__init__( - f"Agent exceeded the maximum iteration limit of {max_iteration}. " - f"The agent was unable to complete the task within the allowed number of iterations." - ) diff --git a/api/core/workflow/nodes/agent/message_transformer.py b/api/core/workflow/nodes/agent/message_transformer.py new file mode 100644 index 0000000000..317db14d3f --- /dev/null +++ b/api/core/workflow/nodes/agent/message_transformer.py @@ -0,0 +1,292 @@ +from __future__ import annotations + +from collections.abc import Generator, Mapping +from typing import Any, cast + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from core.tools.entities.tool_entities import ToolInvokeMessage +from core.tools.utils.message_transformer import ToolFileMessageTransformer +from dify_graph.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus +from dify_graph.file import File, FileTransferMethod +from dify_graph.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata +from dify_graph.model_runtime.utils.encoders import jsonable_encoder +from dify_graph.node_events import ( + AgentLogEvent, + NodeEventBase, + NodeRunResult, + StreamChunkEvent, + StreamCompletedEvent, +) +from dify_graph.variables.segments import ArrayFileSegment +from extensions.ext_database import db +from factories import file_factory +from models import ToolFile +from services.tools.builtin_tools_manage_service import BuiltinToolManageService + +from .exceptions import AgentNodeError, AgentVariableTypeError, ToolFileNotFoundError + + +class AgentMessageTransformer: + def transform( + self, + *, + messages: Generator[ToolInvokeMessage, None, None], + tool_info: Mapping[str, Any], + parameters_for_log: dict[str, Any], + user_id: str, + tenant_id: str, + node_type: NodeType, + node_id: str, + node_execution_id: str, + ) -> Generator[NodeEventBase, None, None]: + from core.plugin.impl.plugin import PluginInstaller + + message_stream = ToolFileMessageTransformer.transform_tool_invoke_messages( + messages=messages, + user_id=user_id, + tenant_id=tenant_id, + conversation_id=None, + ) + + text = "" + files: list[File] = [] + json_list: list[dict | list] = [] + + agent_logs: list[AgentLogEvent] = [] + agent_execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] = {} + llm_usage = LLMUsage.empty_usage() + variables: dict[str, Any] = {} + + for message in message_stream: + if message.type in { + ToolInvokeMessage.MessageType.IMAGE_LINK, + ToolInvokeMessage.MessageType.BINARY_LINK, + ToolInvokeMessage.MessageType.IMAGE, + }: + assert isinstance(message.message, ToolInvokeMessage.TextMessage) + + url = message.message.text + if message.meta: + transfer_method = message.meta.get("transfer_method", FileTransferMethod.TOOL_FILE) + else: + transfer_method = FileTransferMethod.TOOL_FILE + + tool_file_id = str(url).split("/")[-1].split(".")[0] + + with Session(db.engine) as session: + stmt = select(ToolFile).where(ToolFile.id == tool_file_id) + tool_file = session.scalar(stmt) + if tool_file is None: + raise ToolFileNotFoundError(tool_file_id) + + mapping = { + "tool_file_id": tool_file_id, + "type": file_factory.get_file_type_by_mime_type(tool_file.mimetype), + "transfer_method": transfer_method, + "url": url, + } + file = file_factory.build_from_mapping( + mapping=mapping, + tenant_id=tenant_id, + ) + files.append(file) + elif message.type == ToolInvokeMessage.MessageType.BLOB: + assert isinstance(message.message, ToolInvokeMessage.TextMessage) + assert message.meta + + tool_file_id = message.message.text.split("/")[-1].split(".")[0] + with Session(db.engine) as session: + stmt = select(ToolFile).where(ToolFile.id == tool_file_id) + tool_file = session.scalar(stmt) + if tool_file is None: + raise ToolFileNotFoundError(tool_file_id) + + mapping = { + "tool_file_id": tool_file_id, + "transfer_method": FileTransferMethod.TOOL_FILE, + } + files.append( + file_factory.build_from_mapping( + mapping=mapping, + tenant_id=tenant_id, + ) + ) + elif message.type == ToolInvokeMessage.MessageType.TEXT: + assert isinstance(message.message, ToolInvokeMessage.TextMessage) + text += message.message.text + yield StreamChunkEvent( + selector=[node_id, "text"], + chunk=message.message.text, + is_final=False, + ) + elif message.type == ToolInvokeMessage.MessageType.JSON: + assert isinstance(message.message, ToolInvokeMessage.JsonMessage) + if node_type == NodeType.AGENT: + if isinstance(message.message.json_object, dict): + msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {}) + llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata)) + agent_execution_metadata = { + WorkflowNodeExecutionMetadataKey(key): value + for key, value in msg_metadata.items() + if key in WorkflowNodeExecutionMetadataKey.__members__.values() + } + else: + llm_usage = LLMUsage.empty_usage() + agent_execution_metadata = {} + if message.message.json_object: + json_list.append(message.message.json_object) + elif message.type == ToolInvokeMessage.MessageType.LINK: + assert isinstance(message.message, ToolInvokeMessage.TextMessage) + stream_text = f"Link: {message.message.text}\n" + text += stream_text + yield StreamChunkEvent( + selector=[node_id, "text"], + chunk=stream_text, + is_final=False, + ) + elif message.type == ToolInvokeMessage.MessageType.VARIABLE: + assert isinstance(message.message, ToolInvokeMessage.VariableMessage) + variable_name = message.message.variable_name + variable_value = message.message.variable_value + if message.message.stream: + if not isinstance(variable_value, str): + raise AgentVariableTypeError( + "When 'stream' is True, 'variable_value' must be a string.", + variable_name=variable_name, + expected_type="str", + actual_type=type(variable_value).__name__, + ) + if variable_name not in variables: + variables[variable_name] = "" + variables[variable_name] += variable_value + + yield StreamChunkEvent( + selector=[node_id, variable_name], + chunk=variable_value, + is_final=False, + ) + else: + variables[variable_name] = variable_value + elif message.type == ToolInvokeMessage.MessageType.FILE: + assert message.meta is not None + assert isinstance(message.meta, dict) + if "file" not in message.meta: + raise AgentNodeError("File message is missing 'file' key in meta") + + if not isinstance(message.meta["file"], File): + raise AgentNodeError(f"Expected File object but got {type(message.meta['file']).__name__}") + files.append(message.meta["file"]) + elif message.type == ToolInvokeMessage.MessageType.LOG: + assert isinstance(message.message, ToolInvokeMessage.LogMessage) + if message.message.metadata: + icon = tool_info.get("icon", "") + dict_metadata = dict(message.message.metadata) + if dict_metadata.get("provider"): + manager = PluginInstaller() + plugins = manager.list_plugins(tenant_id) + try: + current_plugin = next( + plugin + for plugin in plugins + if f"{plugin.plugin_id}/{plugin.name}" == dict_metadata["provider"] + ) + icon = current_plugin.declaration.icon + except StopIteration: + pass + icon_dark = None + try: + builtin_tool = next( + provider + for provider in BuiltinToolManageService.list_builtin_tools( + user_id, + tenant_id, + ) + if provider.name == dict_metadata["provider"] + ) + icon = builtin_tool.icon + icon_dark = builtin_tool.icon_dark + except StopIteration: + pass + + dict_metadata["icon"] = icon + dict_metadata["icon_dark"] = icon_dark + message.message.metadata = dict_metadata + agent_log = AgentLogEvent( + message_id=message.message.id, + node_execution_id=node_execution_id, + parent_id=message.message.parent_id, + error=message.message.error, + status=message.message.status.value, + data=message.message.data, + label=message.message.label, + metadata=message.message.metadata, + node_id=node_id, + ) + + for log in agent_logs: + if log.message_id == agent_log.message_id: + log.data = agent_log.data + log.status = agent_log.status + log.error = agent_log.error + log.label = agent_log.label + log.metadata = agent_log.metadata + break + else: + agent_logs.append(agent_log) + + yield agent_log + + json_output: list[dict[str, Any] | list[Any]] = [] + if agent_logs: + for log in agent_logs: + json_output.append( + { + "id": log.message_id, + "parent_id": log.parent_id, + "error": log.error, + "status": log.status, + "data": log.data, + "label": log.label, + "metadata": log.metadata, + "node_id": log.node_id, + } + ) + if json_list: + json_output.extend(json_list) + else: + json_output.append({"data": []}) + + yield StreamChunkEvent( + selector=[node_id, "text"], + chunk="", + is_final=True, + ) + + for var_name in variables: + yield StreamChunkEvent( + selector=[node_id, var_name], + chunk="", + is_final=True, + ) + + yield StreamCompletedEvent( + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + outputs={ + "text": text, + "usage": jsonable_encoder(llm_usage), + "files": ArrayFileSegment(value=files), + "json": json_output, + **variables, + }, + metadata={ + **agent_execution_metadata, + WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info, + WorkflowNodeExecutionMetadataKey.AGENT_LOG: agent_logs, + }, + inputs=parameters_for_log, + llm_usage=llm_usage, + ) + ) diff --git a/api/core/workflow/nodes/agent/plugin_strategy_adapter.py b/api/core/workflow/nodes/agent/plugin_strategy_adapter.py new file mode 100644 index 0000000000..1fc427ad6c --- /dev/null +++ b/api/core/workflow/nodes/agent/plugin_strategy_adapter.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from factories.agent_factory import get_plugin_agent_strategy + +from .strategy_protocols import AgentStrategyPresentationProvider, AgentStrategyResolver, ResolvedAgentStrategy + + +class PluginAgentStrategyResolver(AgentStrategyResolver): + def resolve( + self, + *, + tenant_id: str, + agent_strategy_provider_name: str, + agent_strategy_name: str, + ) -> ResolvedAgentStrategy: + return get_plugin_agent_strategy( + tenant_id=tenant_id, + agent_strategy_provider_name=agent_strategy_provider_name, + agent_strategy_name=agent_strategy_name, + ) + + +class PluginAgentStrategyPresentationProvider(AgentStrategyPresentationProvider): + def get_icon(self, *, tenant_id: str, agent_strategy_provider_name: str) -> str | None: + from core.plugin.impl.plugin import PluginInstaller + + manager = PluginInstaller() + try: + plugins = manager.list_plugins(tenant_id) + except Exception: + return None + + try: + current_plugin = next( + plugin for plugin in plugins if f"{plugin.plugin_id}/{plugin.name}" == agent_strategy_provider_name + ) + except StopIteration: + return None + + return current_plugin.declaration.icon diff --git a/api/core/workflow/nodes/agent/runtime_support.py b/api/core/workflow/nodes/agent/runtime_support.py new file mode 100644 index 0000000000..2ff7c964b9 --- /dev/null +++ b/api/core/workflow/nodes/agent/runtime_support.py @@ -0,0 +1,276 @@ +from __future__ import annotations + +import json +from collections.abc import Sequence +from typing import Any, cast + +from packaging.version import Version +from pydantic import ValidationError +from sqlalchemy import select +from sqlalchemy.orm import Session + +from core.agent.entities import AgentToolEntity +from core.agent.plugin_entities import AgentStrategyParameter +from core.memory.token_buffer_memory import TokenBufferMemory +from core.model_manager import ModelInstance, ModelManager +from core.plugin.entities.request import InvokeCredentials +from core.provider_manager import ProviderManager +from core.tools.entities.tool_entities import ToolIdentity, ToolParameter, ToolProviderType +from core.tools.tool_manager import ToolManager +from dify_graph.enums import SystemVariableKey +from dify_graph.model_runtime.entities.model_entities import AIModelEntity, ModelType +from dify_graph.runtime import VariablePool +from dify_graph.variables.segments import StringSegment +from extensions.ext_database import db +from models.model import Conversation + +from .entities import AgentNodeData, AgentOldVersionModelFeatures, ParamsAutoGenerated +from .exceptions import AgentInputTypeError, AgentVariableNotFoundError +from .strategy_protocols import ResolvedAgentStrategy + + +class AgentRuntimeSupport: + def build_parameters( + self, + *, + agent_parameters: Sequence[AgentStrategyParameter], + variable_pool: VariablePool, + node_data: AgentNodeData, + strategy: ResolvedAgentStrategy, + tenant_id: str, + app_id: str, + invoke_from: Any, + for_log: bool = False, + ) -> dict[str, Any]: + agent_parameters_dictionary = {parameter.name: parameter for parameter in agent_parameters} + + result: dict[str, Any] = {} + for parameter_name in node_data.agent_parameters: + parameter = agent_parameters_dictionary.get(parameter_name) + if not parameter: + result[parameter_name] = None + continue + + agent_input = node_data.agent_parameters[parameter_name] + match agent_input.type: + case "variable": + variable = variable_pool.get(agent_input.value) # type: ignore[arg-type] + if variable is None: + raise AgentVariableNotFoundError(str(agent_input.value)) + parameter_value = variable.value + case "mixed" | "constant": + try: + if not isinstance(agent_input.value, str): + parameter_value = json.dumps(agent_input.value, ensure_ascii=False) + else: + parameter_value = str(agent_input.value) + except TypeError: + parameter_value = str(agent_input.value) + + segment_group = variable_pool.convert_template(parameter_value) + parameter_value = segment_group.log if for_log else segment_group.text + try: + if not isinstance(agent_input.value, str): + parameter_value = json.loads(parameter_value) + except json.JSONDecodeError: + parameter_value = parameter_value + case _: + raise AgentInputTypeError(agent_input.type) + + value = parameter_value + if parameter.type == "array[tools]": + value = cast(list[dict[str, Any]], value) + value = [tool for tool in value if tool.get("enabled", False)] + value = self._filter_mcp_type_tool(strategy, value) + for tool in value: + if "schemas" in tool: + tool.pop("schemas") + parameters = tool.get("parameters", {}) + if all(isinstance(v, dict) for _, v in parameters.items()): + params = {} + for key, param in parameters.items(): + if param.get("auto", ParamsAutoGenerated.OPEN) in ( + ParamsAutoGenerated.CLOSE, + 0, + ): + value_param = param.get("value", {}) + if value_param and value_param.get("type", "") == "variable": + variable_selector = value_param.get("value") + if not variable_selector: + raise ValueError("Variable selector is missing for a variable-type parameter.") + + variable = variable_pool.get(variable_selector) + if variable is None: + raise AgentVariableNotFoundError(str(variable_selector)) + + params[key] = variable.value + else: + params[key] = value_param.get("value", "") if value_param is not None else None + else: + params[key] = None + parameters = params + tool["settings"] = {k: v.get("value", None) for k, v in tool.get("settings", {}).items()} + tool["parameters"] = parameters + + if not for_log: + if parameter.type == "array[tools]": + value = cast(list[dict[str, Any]], value) + tool_value = [] + for tool in value: + provider_type = ToolProviderType(tool.get("type", ToolProviderType.BUILT_IN)) + setting_params = tool.get("settings", {}) + parameters = tool.get("parameters", {}) + manual_input_params = [key for key, value in parameters.items() if value is not None] + + parameters = {**parameters, **setting_params} + entity = AgentToolEntity( + provider_id=tool.get("provider_name", ""), + provider_type=provider_type, + tool_name=tool.get("tool_name", ""), + tool_parameters=parameters, + plugin_unique_identifier=tool.get("plugin_unique_identifier", None), + credential_id=tool.get("credential_id", None), + ) + + extra = tool.get("extra", {}) + + runtime_variable_pool: VariablePool | None = None + if node_data.version != "1" or node_data.tool_node_version is not None: + runtime_variable_pool = variable_pool + tool_runtime = ToolManager.get_agent_tool_runtime( + tenant_id, + app_id, + entity, + invoke_from, + runtime_variable_pool, + ) + if tool_runtime.entity.description: + tool_runtime.entity.description.llm = ( + extra.get("description", "") or tool_runtime.entity.description.llm + ) + for tool_runtime_params in tool_runtime.entity.parameters: + tool_runtime_params.form = ( + ToolParameter.ToolParameterForm.FORM + if tool_runtime_params.name in manual_input_params + else tool_runtime_params.form + ) + manual_input_value = {} + if tool_runtime.entity.parameters: + manual_input_value = { + key: value for key, value in parameters.items() if key in manual_input_params + } + runtime_parameters = { + **tool_runtime.runtime.runtime_parameters, + **manual_input_value, + } + tool_value.append( + { + **tool_runtime.entity.model_dump(mode="json"), + "runtime_parameters": runtime_parameters, + "credential_id": tool.get("credential_id", None), + "provider_type": provider_type.value, + } + ) + value = tool_value + if parameter.type == AgentStrategyParameter.AgentStrategyParameterType.MODEL_SELECTOR: + value = cast(dict[str, Any], value) + model_instance, model_schema = self.fetch_model(tenant_id=tenant_id, value=value) + history_prompt_messages = [] + if node_data.memory: + memory = self.fetch_memory( + variable_pool=variable_pool, + app_id=app_id, + model_instance=model_instance, + ) + if memory: + prompt_messages = memory.get_history_prompt_messages( + message_limit=node_data.memory.window.size or None + ) + history_prompt_messages = [ + prompt_message.model_dump(mode="json") for prompt_message in prompt_messages + ] + value["history_prompt_messages"] = history_prompt_messages + if model_schema: + model_schema = self._remove_unsupported_model_features_for_old_version(model_schema) + value["entity"] = model_schema.model_dump(mode="json") + else: + value["entity"] = None + result[parameter_name] = value + + return result + + def build_credentials(self, *, parameters: dict[str, Any]) -> InvokeCredentials: + credentials = InvokeCredentials() + credentials.tool_credentials = {} + for tool in parameters.get("tools", []): + if not tool.get("credential_id"): + continue + try: + identity = ToolIdentity.model_validate(tool.get("identity", {})) + except ValidationError: + continue + credentials.tool_credentials[identity.provider] = tool.get("credential_id", None) + return credentials + + def fetch_memory( + self, + *, + variable_pool: VariablePool, + app_id: str, + model_instance: ModelInstance, + ) -> TokenBufferMemory | None: + conversation_id_variable = variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID]) + if not isinstance(conversation_id_variable, StringSegment): + return None + conversation_id = conversation_id_variable.value + + with Session(db.engine, expire_on_commit=False) as session: + stmt = select(Conversation).where(Conversation.app_id == app_id, Conversation.id == conversation_id) + conversation = session.scalar(stmt) + if not conversation: + return None + + return TokenBufferMemory(conversation=conversation, model_instance=model_instance) + + def fetch_model(self, *, tenant_id: str, value: dict[str, Any]) -> tuple[ModelInstance, AIModelEntity | None]: + provider_manager = ProviderManager() + provider_model_bundle = provider_manager.get_provider_model_bundle( + tenant_id=tenant_id, + provider=value.get("provider", ""), + model_type=ModelType.LLM, + ) + model_name = value.get("model", "") + model_credentials = provider_model_bundle.configuration.get_current_credentials( + model_type=ModelType.LLM, + model=model_name, + ) + provider_name = provider_model_bundle.configuration.provider.provider + model_type_instance = provider_model_bundle.model_type_instance + model_instance = ModelManager().get_model_instance( + tenant_id=tenant_id, + provider=provider_name, + model_type=ModelType(value.get("model_type", "")), + model=model_name, + ) + model_schema = model_type_instance.get_model_schema(model_name, model_credentials) + return model_instance, model_schema + + @staticmethod + def _remove_unsupported_model_features_for_old_version(model_schema: AIModelEntity) -> AIModelEntity: + if model_schema.features: + for feature in model_schema.features[:]: + try: + AgentOldVersionModelFeatures(feature.value) + except ValueError: + model_schema.features.remove(feature) + return model_schema + + @staticmethod + def _filter_mcp_type_tool( + strategy: ResolvedAgentStrategy, + tools: list[dict[str, Any]], + ) -> list[dict[str, Any]]: + meta_version = strategy.meta_version + if meta_version and Version(meta_version) > Version("0.0.1"): + return tools + return [tool for tool in tools if tool.get("type") != ToolProviderType.MCP] diff --git a/api/core/workflow/nodes/agent/strategy_protocols.py b/api/core/workflow/nodes/agent/strategy_protocols.py new file mode 100644 index 0000000000..643d916d15 --- /dev/null +++ b/api/core/workflow/nodes/agent/strategy_protocols.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from collections.abc import Generator, Sequence +from typing import Any, Protocol + +from core.agent.plugin_entities import AgentStrategyParameter +from core.plugin.entities.request import InvokeCredentials +from core.tools.entities.tool_entities import ToolInvokeMessage + + +class ResolvedAgentStrategy(Protocol): + meta_version: str | None + + def get_parameters(self) -> Sequence[AgentStrategyParameter]: ... + + def invoke( + self, + *, + params: dict[str, Any], + user_id: str, + conversation_id: str | None = None, + app_id: str | None = None, + message_id: str | None = None, + credentials: InvokeCredentials | None = None, + ) -> Generator[ToolInvokeMessage, None, None]: ... + + +class AgentStrategyResolver(Protocol): + def resolve( + self, + *, + tenant_id: str, + agent_strategy_provider_name: str, + agent_strategy_name: str, + ) -> ResolvedAgentStrategy: ... + + +class AgentStrategyPresentationProvider(Protocol): + def get_icon(self, *, tenant_id: str, agent_strategy_provider_name: str) -> str | None: ... diff --git a/api/core/workflow/workflow_entry.py b/api/core/workflow/workflow_entry.py index fef01049c5..01b309bf54 100644 --- a/api/core/workflow/workflow_entry.py +++ b/api/core/workflow/workflow_entry.py @@ -9,6 +9,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_di from core.app.workflow.layers.llm_quota import LLMQuotaLayer from core.app.workflow.layers.observability import ObservabilityLayer from core.workflow.node_factory import DifyNodeFactory +from core.workflow.node_resolution import resolve_workflow_node_class from dify_graph.constants import ENVIRONMENT_VARIABLE_NODE_ID from dify_graph.entities import GraphInitParams from dify_graph.entities.graph_config import NodeConfigDictAdapter @@ -23,7 +24,6 @@ from dify_graph.graph_engine.protocols.command_channel import CommandChannel from dify_graph.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent from dify_graph.nodes import NodeType from dify_graph.nodes.base.node import Node -from dify_graph.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING from dify_graph.runtime import ChildGraphNotFoundError, GraphRuntimeState, VariablePool from dify_graph.system_variable import SystemVariable from dify_graph.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool @@ -343,7 +343,7 @@ class WorkflowEntry: if node_type not in {NodeType.PARAMETER_EXTRACTOR, NodeType.QUESTION_CLASSIFIER}: raise ValueError(f"Node type {node_type} not supported") - node_cls = NODE_TYPE_CLASSES_MAPPING[node_type]["1"] + node_cls = resolve_workflow_node_class(node_type=node_type, node_version="1") if not node_cls: raise ValueError(f"Node class not found for node type {node_type}") diff --git a/api/dify_graph/entities/__init__.py b/api/dify_graph/entities/__init__.py index e73c38c1d3..ef7789c49c 100644 --- a/api/dify_graph/entities/__init__.py +++ b/api/dify_graph/entities/__init__.py @@ -1,11 +1,9 @@ -from .agent import AgentNodeStrategyInit from .graph_init_params import GraphInitParams from .workflow_execution import WorkflowExecution from .workflow_node_execution import WorkflowNodeExecution from .workflow_start_reason import WorkflowStartReason __all__ = [ - "AgentNodeStrategyInit", "GraphInitParams", "WorkflowExecution", "WorkflowNodeExecution", diff --git a/api/dify_graph/entities/agent.py b/api/dify_graph/entities/agent.py deleted file mode 100644 index 2b4d6db76f..0000000000 --- a/api/dify_graph/entities/agent.py +++ /dev/null @@ -1,8 +0,0 @@ -from pydantic import BaseModel - - -class AgentNodeStrategyInit(BaseModel): - """Agent node strategy initialization data.""" - - name: str - icon: str | None = None diff --git a/api/dify_graph/graph_events/node.py b/api/dify_graph/graph_events/node.py index 21ddf80b64..8552254627 100644 --- a/api/dify_graph/graph_events/node.py +++ b/api/dify_graph/graph_events/node.py @@ -4,7 +4,6 @@ from datetime import datetime from pydantic import Field from core.rag.entities.citation_metadata import RetrievalSourceMetadata -from dify_graph.entities import AgentNodeStrategyInit from dify_graph.entities.pause_reason import PauseReason from .base import GraphNodeEventBase @@ -13,8 +12,8 @@ from .base import GraphNodeEventBase class NodeRunStartedEvent(GraphNodeEventBase): node_title: str predecessor_node_id: str | None = None - agent_strategy: AgentNodeStrategyInit | None = None start_at: datetime = Field(..., description="node start time") + extras: dict[str, object] = Field(default_factory=dict) # FIXME(-LAN-): only for ToolNode provider_type: str = "" diff --git a/api/dify_graph/nodes/agent/__init__.py b/api/dify_graph/nodes/agent/__init__.py deleted file mode 100644 index 95e7cf895b..0000000000 --- a/api/dify_graph/nodes/agent/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .agent_node import AgentNode - -__all__ = ["AgentNode"] diff --git a/api/dify_graph/nodes/agent/agent_node.py b/api/dify_graph/nodes/agent/agent_node.py deleted file mode 100644 index d501217454..0000000000 --- a/api/dify_graph/nodes/agent/agent_node.py +++ /dev/null @@ -1,761 +0,0 @@ -from __future__ import annotations - -import json -from collections.abc import Generator, Mapping, Sequence -from typing import TYPE_CHECKING, Any, cast - -from packaging.version import Version -from pydantic import ValidationError -from sqlalchemy import select -from sqlalchemy.orm import Session - -from core.agent.entities import AgentToolEntity -from core.agent.plugin_entities import AgentStrategyParameter -from core.memory.token_buffer_memory import TokenBufferMemory -from core.model_manager import ModelInstance, ModelManager -from core.provider_manager import ProviderManager -from core.tools.entities.tool_entities import ( - ToolIdentity, - ToolInvokeMessage, - ToolParameter, - ToolProviderType, -) -from core.tools.tool_manager import ToolManager -from core.tools.utils.message_transformer import ToolFileMessageTransformer -from dify_graph.enums import ( - NodeType, - SystemVariableKey, - WorkflowNodeExecutionMetadataKey, - WorkflowNodeExecutionStatus, -) -from dify_graph.file import File, FileTransferMethod -from dify_graph.model_runtime.entities.llm_entities import LLMUsage, LLMUsageMetadata -from dify_graph.model_runtime.entities.model_entities import AIModelEntity, ModelType -from dify_graph.model_runtime.utils.encoders import jsonable_encoder -from dify_graph.node_events import ( - AgentLogEvent, - NodeEventBase, - NodeRunResult, - StreamChunkEvent, - StreamCompletedEvent, -) -from dify_graph.nodes.agent.entities import AgentNodeData, AgentOldVersionModelFeatures, ParamsAutoGenerated -from dify_graph.nodes.base.node import Node -from dify_graph.nodes.base.variable_template_parser import VariableTemplateParser -from dify_graph.runtime import VariablePool -from dify_graph.variables.segments import ArrayFileSegment, StringSegment -from extensions.ext_database import db -from factories import file_factory -from factories.agent_factory import get_plugin_agent_strategy -from models import ToolFile -from models.model import Conversation -from services.tools.builtin_tools_manage_service import BuiltinToolManageService - -from .exc import ( - AgentInputTypeError, - AgentInvocationError, - AgentMessageTransformError, - AgentNodeError, - AgentVariableNotFoundError, - AgentVariableTypeError, - ToolFileNotFoundError, -) - -if TYPE_CHECKING: - from core.agent.strategy.plugin import PluginAgentStrategy - from core.plugin.entities.request import InvokeCredentials - - -class AgentNode(Node[AgentNodeData]): - """ - Agent Node - """ - - node_type = NodeType.AGENT - - @classmethod - def version(cls) -> str: - return "1" - - def _run(self) -> Generator[NodeEventBase, None, None]: - from core.plugin.impl.exc import PluginDaemonClientSideError - - dify_ctx = self.require_dify_context() - - try: - strategy = get_plugin_agent_strategy( - tenant_id=dify_ctx.tenant_id, - agent_strategy_provider_name=self.node_data.agent_strategy_provider_name, - agent_strategy_name=self.node_data.agent_strategy_name, - ) - except Exception as e: - yield StreamCompletedEvent( - node_run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.FAILED, - inputs={}, - error=f"Failed to get agent strategy: {str(e)}", - ), - ) - return - - agent_parameters = strategy.get_parameters() - - # get parameters - parameters = self._generate_agent_parameters( - agent_parameters=agent_parameters, - variable_pool=self.graph_runtime_state.variable_pool, - node_data=self.node_data, - strategy=strategy, - ) - parameters_for_log = self._generate_agent_parameters( - agent_parameters=agent_parameters, - variable_pool=self.graph_runtime_state.variable_pool, - node_data=self.node_data, - for_log=True, - strategy=strategy, - ) - credentials = self._generate_credentials(parameters=parameters) - - # get conversation id - conversation_id = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID]) - - try: - message_stream = strategy.invoke( - params=parameters, - user_id=dify_ctx.user_id, - app_id=dify_ctx.app_id, - conversation_id=conversation_id.text if conversation_id else None, - credentials=credentials, - ) - except Exception as e: - error = AgentInvocationError(f"Failed to invoke agent: {str(e)}", original_error=e) - yield StreamCompletedEvent( - node_run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.FAILED, - inputs=parameters_for_log, - error=str(error), - ) - ) - return - - try: - yield from self._transform_message( - messages=message_stream, - tool_info={ - "icon": self.agent_strategy_icon, - "agent_strategy": self.node_data.agent_strategy_name, - }, - parameters_for_log=parameters_for_log, - user_id=dify_ctx.user_id, - tenant_id=dify_ctx.tenant_id, - node_type=self.node_type, - node_id=self._node_id, - node_execution_id=self.id, - ) - except PluginDaemonClientSideError as e: - transform_error = AgentMessageTransformError( - f"Failed to transform agent message: {str(e)}", original_error=e - ) - yield StreamCompletedEvent( - node_run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.FAILED, - inputs=parameters_for_log, - error=str(transform_error), - ) - ) - - def _generate_agent_parameters( - self, - *, - agent_parameters: Sequence[AgentStrategyParameter], - variable_pool: VariablePool, - node_data: AgentNodeData, - for_log: bool = False, - strategy: PluginAgentStrategy, - ) -> dict[str, Any]: - """ - Generate parameters based on the given tool parameters, variable pool, and node data. - - Args: - agent_parameters (Sequence[AgentParameter]): The list of agent parameters. - variable_pool (VariablePool): The variable pool containing the variables. - node_data (AgentNodeData): The data associated with the agent node. - - Returns: - Mapping[str, Any]: A dictionary containing the generated parameters. - - """ - agent_parameters_dictionary = {parameter.name: parameter for parameter in agent_parameters} - - result: dict[str, Any] = {} - for parameter_name in node_data.agent_parameters: - parameter = agent_parameters_dictionary.get(parameter_name) - if not parameter: - result[parameter_name] = None - continue - agent_input = node_data.agent_parameters[parameter_name] - match agent_input.type: - case "variable": - variable = variable_pool.get(agent_input.value) # type: ignore - if variable is None: - raise AgentVariableNotFoundError(str(agent_input.value)) - parameter_value = variable.value - case "mixed" | "constant": - # variable_pool.convert_template expects a string template, - # but if passing a dict, convert to JSON string first before rendering - try: - if not isinstance(agent_input.value, str): - parameter_value = json.dumps(agent_input.value, ensure_ascii=False) - else: - parameter_value = str(agent_input.value) - except TypeError: - parameter_value = str(agent_input.value) - segment_group = variable_pool.convert_template(parameter_value) - parameter_value = segment_group.log if for_log else segment_group.text - # variable_pool.convert_template returns a string, - # so we need to convert it back to a dictionary - try: - if not isinstance(agent_input.value, str): - parameter_value = json.loads(parameter_value) - except json.JSONDecodeError: - parameter_value = parameter_value - case _: - raise AgentInputTypeError(agent_input.type) - value = parameter_value - if parameter.type == "array[tools]": - value = cast(list[dict[str, Any]], value) - value = [tool for tool in value if tool.get("enabled", False)] - value = self._filter_mcp_type_tool(strategy, value) - for tool in value: - if "schemas" in tool: - tool.pop("schemas") - parameters = tool.get("parameters", {}) - if all(isinstance(v, dict) for _, v in parameters.items()): - params = {} - for key, param in parameters.items(): - if param.get("auto", ParamsAutoGenerated.OPEN) in ( - ParamsAutoGenerated.CLOSE, - 0, - ): - value_param = param.get("value", {}) - if value_param and value_param.get("type", "") == "variable": - variable_selector = value_param.get("value") - if not variable_selector: - raise ValueError("Variable selector is missing for a variable-type parameter.") - - variable = variable_pool.get(variable_selector) - if variable is None: - raise AgentVariableNotFoundError(str(variable_selector)) - - params[key] = variable.value - else: - params[key] = value_param.get("value", "") if value_param is not None else None - else: - params[key] = None - parameters = params - tool["settings"] = {k: v.get("value", None) for k, v in tool.get("settings", {}).items()} - tool["parameters"] = parameters - - if not for_log: - if parameter.type == "array[tools]": - value = cast(list[dict[str, Any]], value) - tool_value = [] - for tool in value: - provider_type = ToolProviderType(tool.get("type", ToolProviderType.BUILT_IN)) - setting_params = tool.get("settings", {}) - parameters = tool.get("parameters", {}) - manual_input_params = [key for key, value in parameters.items() if value is not None] - - parameters = {**parameters, **setting_params} - entity = AgentToolEntity( - provider_id=tool.get("provider_name", ""), - provider_type=provider_type, - tool_name=tool.get("tool_name", ""), - tool_parameters=parameters, - plugin_unique_identifier=tool.get("plugin_unique_identifier", None), - credential_id=tool.get("credential_id", None), - ) - - extra = tool.get("extra", {}) - - # This is an issue that caused problems before. - # Logically, we shouldn't use the node_data.version field for judgment - # But for backward compatibility with historical data - # this version field judgment is still preserved here. - runtime_variable_pool: VariablePool | None = None - if node_data.version != "1" or node_data.tool_node_version is not None: - runtime_variable_pool = variable_pool - dify_ctx = self.require_dify_context() - tool_runtime = ToolManager.get_agent_tool_runtime( - dify_ctx.tenant_id, - dify_ctx.app_id, - entity, - dify_ctx.invoke_from, - runtime_variable_pool, - ) - if tool_runtime.entity.description: - tool_runtime.entity.description.llm = ( - extra.get("description", "") or tool_runtime.entity.description.llm - ) - for tool_runtime_params in tool_runtime.entity.parameters: - tool_runtime_params.form = ( - ToolParameter.ToolParameterForm.FORM - if tool_runtime_params.name in manual_input_params - else tool_runtime_params.form - ) - manual_input_value = {} - if tool_runtime.entity.parameters: - manual_input_value = { - key: value for key, value in parameters.items() if key in manual_input_params - } - runtime_parameters = { - **tool_runtime.runtime.runtime_parameters, - **manual_input_value, - } - tool_value.append( - { - **tool_runtime.entity.model_dump(mode="json"), - "runtime_parameters": runtime_parameters, - "credential_id": tool.get("credential_id", None), - "provider_type": provider_type.value, - } - ) - value = tool_value - if parameter.type == AgentStrategyParameter.AgentStrategyParameterType.MODEL_SELECTOR: - value = cast(dict[str, Any], value) - model_instance, model_schema = self._fetch_model(value) - # memory config - history_prompt_messages = [] - if node_data.memory: - memory = self._fetch_memory(model_instance) - if memory: - prompt_messages = memory.get_history_prompt_messages( - message_limit=node_data.memory.window.size or None - ) - history_prompt_messages = [ - prompt_message.model_dump(mode="json") for prompt_message in prompt_messages - ] - value["history_prompt_messages"] = history_prompt_messages - if model_schema: - # remove structured output feature to support old version agent plugin - model_schema = self._remove_unsupported_model_features_for_old_version(model_schema) - value["entity"] = model_schema.model_dump(mode="json") - else: - value["entity"] = None - result[parameter_name] = value - - return result - - def _generate_credentials( - self, - parameters: dict[str, Any], - ) -> InvokeCredentials: - """ - Generate credentials based on the given agent parameters. - """ - from core.plugin.entities.request import InvokeCredentials - - credentials = InvokeCredentials() - - # generate credentials for tools selector - credentials.tool_credentials = {} - for tool in parameters.get("tools", []): - if tool.get("credential_id"): - try: - identity = ToolIdentity.model_validate(tool.get("identity", {})) - credentials.tool_credentials[identity.provider] = tool.get("credential_id", None) - except ValidationError: - continue - return credentials - - @classmethod - def _extract_variable_selector_to_variable_mapping( - cls, - *, - graph_config: Mapping[str, Any], - node_id: str, - node_data: AgentNodeData, - ) -> Mapping[str, Sequence[str]]: - _ = graph_config # Explicitly mark as unused - result: dict[str, Any] = {} - typed_node_data = node_data - for parameter_name in typed_node_data.agent_parameters: - input = typed_node_data.agent_parameters[parameter_name] - match input.type: - case "mixed" | "constant": - selectors = VariableTemplateParser(str(input.value)).extract_variable_selectors() - for selector in selectors: - result[selector.variable] = selector.value_selector - case "variable": - result[parameter_name] = input.value - - result = {node_id + "." + key: value for key, value in result.items()} - - return result - - @property - def agent_strategy_icon(self) -> str | None: - """ - Get agent strategy icon - :return: - """ - from core.plugin.impl.plugin import PluginInstaller - - manager = PluginInstaller() - dify_ctx = self.require_dify_context() - plugins = manager.list_plugins(dify_ctx.tenant_id) - try: - current_plugin = next( - plugin - for plugin in plugins - if f"{plugin.plugin_id}/{plugin.name}" == self.node_data.agent_strategy_provider_name - ) - icon = current_plugin.declaration.icon - except StopIteration: - icon = None - return icon - - def _fetch_memory(self, model_instance: ModelInstance) -> TokenBufferMemory | None: - # get conversation id - conversation_id_variable = self.graph_runtime_state.variable_pool.get( - ["sys", SystemVariableKey.CONVERSATION_ID] - ) - if not isinstance(conversation_id_variable, StringSegment): - return None - conversation_id = conversation_id_variable.value - - dify_ctx = self.require_dify_context() - with Session(db.engine, expire_on_commit=False) as session: - stmt = select(Conversation).where( - Conversation.app_id == dify_ctx.app_id, Conversation.id == conversation_id - ) - conversation = session.scalar(stmt) - - if not conversation: - return None - - memory = TokenBufferMemory(conversation=conversation, model_instance=model_instance) - - return memory - - def _fetch_model(self, value: dict[str, Any]) -> tuple[ModelInstance, AIModelEntity | None]: - dify_ctx = self.require_dify_context() - provider_manager = ProviderManager() - provider_model_bundle = provider_manager.get_provider_model_bundle( - tenant_id=dify_ctx.tenant_id, provider=value.get("provider", ""), model_type=ModelType.LLM - ) - model_name = value.get("model", "") - model_credentials = provider_model_bundle.configuration.get_current_credentials( - model_type=ModelType.LLM, model=model_name - ) - provider_name = provider_model_bundle.configuration.provider.provider - model_type_instance = provider_model_bundle.model_type_instance - model_instance = ModelManager().get_model_instance( - tenant_id=dify_ctx.tenant_id, - provider=provider_name, - model_type=ModelType(value.get("model_type", "")), - model=model_name, - ) - model_schema = model_type_instance.get_model_schema(model_name, model_credentials) - return model_instance, model_schema - - def _remove_unsupported_model_features_for_old_version(self, model_schema: AIModelEntity) -> AIModelEntity: - if model_schema.features: - for feature in model_schema.features[:]: # Create a copy to safely modify during iteration - try: - AgentOldVersionModelFeatures(feature.value) # Try to create enum member from value - except ValueError: - model_schema.features.remove(feature) - return model_schema - - def _filter_mcp_type_tool(self, strategy: PluginAgentStrategy, tools: list[dict[str, Any]]) -> list[dict[str, Any]]: - """ - Filter MCP type tool - :param strategy: plugin agent strategy - :param tool: tool - :return: filtered tool dict - """ - meta_version = strategy.meta_version - if meta_version and Version(meta_version) > Version("0.0.1"): - return tools - else: - return [tool for tool in tools if tool.get("type") != ToolProviderType.MCP] - - def _transform_message( - self, - messages: Generator[ToolInvokeMessage, None, None], - tool_info: Mapping[str, Any], - parameters_for_log: dict[str, Any], - user_id: str, - tenant_id: str, - node_type: NodeType, - node_id: str, - node_execution_id: str, - ) -> Generator[NodeEventBase, None, None]: - """ - Convert ToolInvokeMessages into tuple[plain_text, files] - """ - # transform message and handle file storage - from core.plugin.impl.plugin import PluginInstaller - - message_stream = ToolFileMessageTransformer.transform_tool_invoke_messages( - messages=messages, - user_id=user_id, - tenant_id=tenant_id, - conversation_id=None, - ) - - text = "" - files: list[File] = [] - json_list: list[dict | list] = [] - - agent_logs: list[AgentLogEvent] = [] - agent_execution_metadata: Mapping[WorkflowNodeExecutionMetadataKey, Any] = {} - llm_usage = LLMUsage.empty_usage() - variables: dict[str, Any] = {} - - for message in message_stream: - if message.type in { - ToolInvokeMessage.MessageType.IMAGE_LINK, - ToolInvokeMessage.MessageType.BINARY_LINK, - ToolInvokeMessage.MessageType.IMAGE, - }: - assert isinstance(message.message, ToolInvokeMessage.TextMessage) - - url = message.message.text - if message.meta: - transfer_method = message.meta.get("transfer_method", FileTransferMethod.TOOL_FILE) - else: - transfer_method = FileTransferMethod.TOOL_FILE - - tool_file_id = str(url).split("/")[-1].split(".")[0] - - with Session(db.engine) as session: - stmt = select(ToolFile).where(ToolFile.id == tool_file_id) - tool_file = session.scalar(stmt) - if tool_file is None: - raise ToolFileNotFoundError(tool_file_id) - - mapping = { - "tool_file_id": tool_file_id, - "type": file_factory.get_file_type_by_mime_type(tool_file.mimetype), - "transfer_method": transfer_method, - "url": url, - } - file = file_factory.build_from_mapping( - mapping=mapping, - tenant_id=tenant_id, - ) - files.append(file) - elif message.type == ToolInvokeMessage.MessageType.BLOB: - # get tool file id - assert isinstance(message.message, ToolInvokeMessage.TextMessage) - assert message.meta - - tool_file_id = message.message.text.split("/")[-1].split(".")[0] - with Session(db.engine) as session: - stmt = select(ToolFile).where(ToolFile.id == tool_file_id) - tool_file = session.scalar(stmt) - if tool_file is None: - raise ToolFileNotFoundError(tool_file_id) - - mapping = { - "tool_file_id": tool_file_id, - "transfer_method": FileTransferMethod.TOOL_FILE, - } - - files.append( - file_factory.build_from_mapping( - mapping=mapping, - tenant_id=tenant_id, - ) - ) - elif message.type == ToolInvokeMessage.MessageType.TEXT: - assert isinstance(message.message, ToolInvokeMessage.TextMessage) - text += message.message.text - yield StreamChunkEvent( - selector=[node_id, "text"], - chunk=message.message.text, - is_final=False, - ) - elif message.type == ToolInvokeMessage.MessageType.JSON: - assert isinstance(message.message, ToolInvokeMessage.JsonMessage) - if node_type == NodeType.AGENT: - if isinstance(message.message.json_object, dict): - msg_metadata: dict[str, Any] = message.message.json_object.pop("execution_metadata", {}) - llm_usage = LLMUsage.from_metadata(cast(LLMUsageMetadata, msg_metadata)) - agent_execution_metadata = { - WorkflowNodeExecutionMetadataKey(key): value - for key, value in msg_metadata.items() - if key in WorkflowNodeExecutionMetadataKey.__members__.values() - } - else: - msg_metadata = {} - llm_usage = LLMUsage.empty_usage() - agent_execution_metadata = {} - if message.message.json_object: - json_list.append(message.message.json_object) - elif message.type == ToolInvokeMessage.MessageType.LINK: - assert isinstance(message.message, ToolInvokeMessage.TextMessage) - stream_text = f"Link: {message.message.text}\n" - text += stream_text - yield StreamChunkEvent( - selector=[node_id, "text"], - chunk=stream_text, - is_final=False, - ) - elif message.type == ToolInvokeMessage.MessageType.VARIABLE: - assert isinstance(message.message, ToolInvokeMessage.VariableMessage) - variable_name = message.message.variable_name - variable_value = message.message.variable_value - if message.message.stream: - if not isinstance(variable_value, str): - raise AgentVariableTypeError( - "When 'stream' is True, 'variable_value' must be a string.", - variable_name=variable_name, - expected_type="str", - actual_type=type(variable_value).__name__, - ) - if variable_name not in variables: - variables[variable_name] = "" - variables[variable_name] += variable_value - - yield StreamChunkEvent( - selector=[node_id, variable_name], - chunk=variable_value, - is_final=False, - ) - else: - variables[variable_name] = variable_value - elif message.type == ToolInvokeMessage.MessageType.FILE: - assert message.meta is not None - assert isinstance(message.meta, dict) - # Validate that meta contains a 'file' key - if "file" not in message.meta: - raise AgentNodeError("File message is missing 'file' key in meta") - - # Validate that the file is an instance of File - if not isinstance(message.meta["file"], File): - raise AgentNodeError(f"Expected File object but got {type(message.meta['file']).__name__}") - files.append(message.meta["file"]) - elif message.type == ToolInvokeMessage.MessageType.LOG: - assert isinstance(message.message, ToolInvokeMessage.LogMessage) - if message.message.metadata: - icon = tool_info.get("icon", "") - dict_metadata = dict(message.message.metadata) - if dict_metadata.get("provider"): - manager = PluginInstaller() - plugins = manager.list_plugins(tenant_id) - try: - current_plugin = next( - plugin - for plugin in plugins - if f"{plugin.plugin_id}/{plugin.name}" == dict_metadata["provider"] - ) - icon = current_plugin.declaration.icon - except StopIteration: - pass - icon_dark = None - try: - builtin_tool = next( - provider - for provider in BuiltinToolManageService.list_builtin_tools( - user_id, - tenant_id, - ) - if provider.name == dict_metadata["provider"] - ) - icon = builtin_tool.icon - icon_dark = builtin_tool.icon_dark - except StopIteration: - pass - - dict_metadata["icon"] = icon - dict_metadata["icon_dark"] = icon_dark - message.message.metadata = dict_metadata - agent_log = AgentLogEvent( - message_id=message.message.id, - node_execution_id=node_execution_id, - parent_id=message.message.parent_id, - error=message.message.error, - status=message.message.status.value, - data=message.message.data, - label=message.message.label, - metadata=message.message.metadata, - node_id=node_id, - ) - - # check if the agent log is already in the list - for log in agent_logs: - if log.message_id == agent_log.message_id: - # update the log - log.data = agent_log.data - log.status = agent_log.status - log.error = agent_log.error - log.label = agent_log.label - log.metadata = agent_log.metadata - break - else: - agent_logs.append(agent_log) - - yield agent_log - - # Add agent_logs to outputs['json'] to ensure frontend can access thinking process - json_output: list[dict[str, Any] | list[Any]] = [] - - # Step 1: append each agent log as its own dict. - if agent_logs: - for log in agent_logs: - json_output.append( - { - "id": log.message_id, - "parent_id": log.parent_id, - "error": log.error, - "status": log.status, - "data": log.data, - "label": log.label, - "metadata": log.metadata, - "node_id": log.node_id, - } - ) - # Step 2: normalize JSON into {"data": [...]}.change json to list[dict] - if json_list: - json_output.extend(json_list) - else: - json_output.append({"data": []}) - - # Send final chunk events for all streamed outputs - # Final chunk for text stream - yield StreamChunkEvent( - selector=[node_id, "text"], - chunk="", - is_final=True, - ) - - # Final chunks for any streamed variables - for var_name in variables: - yield StreamChunkEvent( - selector=[node_id, var_name], - chunk="", - is_final=True, - ) - - yield StreamCompletedEvent( - node_run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.SUCCEEDED, - outputs={ - "text": text, - "usage": jsonable_encoder(llm_usage), - "files": ArrayFileSegment(value=files), - "json": json_output, - **variables, - }, - metadata={ - **agent_execution_metadata, - WorkflowNodeExecutionMetadataKey.TOOL_INFO: tool_info, - WorkflowNodeExecutionMetadataKey.AGENT_LOG: agent_logs, - }, - inputs=parameters_for_log, - llm_usage=llm_usage, - ) - ) diff --git a/api/dify_graph/nodes/base/node.py b/api/dify_graph/nodes/base/node.py index d7702c8cb4..2044b09333 100644 --- a/api/dify_graph/nodes/base/node.py +++ b/api/dify_graph/nodes/base/node.py @@ -11,7 +11,7 @@ from types import MappingProxyType from typing import Any, ClassVar, Generic, Protocol, TypeVar, cast, get_args, get_origin from uuid import uuid4 -from dify_graph.entities import AgentNodeStrategyInit, GraphInitParams +from dify_graph.entities import GraphInitParams from dify_graph.entities.base_node_data import BaseNodeData, RetryConfig from dify_graph.entities.graph_config import NodeConfigDict from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY @@ -349,6 +349,10 @@ class Node(Generic[NodeDataT]): """ raise NotImplementedError + def populate_start_event(self, event: NodeRunStartedEvent) -> None: + """Allow subclasses to enrich the started event without cross-node imports in the base class.""" + _ = event + def run(self) -> Generator[GraphNodeEventBase, None, None]: execution_id = self.ensure_execution_id() self._start_at = naive_utc_now() @@ -362,39 +366,10 @@ class Node(Generic[NodeDataT]): in_iteration_id=None, start_at=self._start_at, ) - - # === FIXME(-LAN-): Needs to refactor. - from dify_graph.nodes.tool.tool_node import ToolNode - - if isinstance(self, ToolNode): - start_event.provider_id = getattr(self.node_data, "provider_id", "") - start_event.provider_type = getattr(self.node_data, "provider_type", "") - - from dify_graph.nodes.datasource.datasource_node import DatasourceNode - - if isinstance(self, DatasourceNode): - plugin_id = getattr(self.node_data, "plugin_id", "") - provider_name = getattr(self.node_data, "provider_name", "") - - start_event.provider_id = f"{plugin_id}/{provider_name}" - start_event.provider_type = getattr(self.node_data, "provider_type", "") - - from dify_graph.nodes.trigger_plugin.trigger_event_node import TriggerEventNode - - if isinstance(self, TriggerEventNode): - start_event.provider_id = getattr(self.node_data, "provider_id", "") - start_event.provider_type = getattr(self.node_data, "provider_type", "") - - from dify_graph.nodes.agent.agent_node import AgentNode - from dify_graph.nodes.agent.entities import AgentNodeData - - if isinstance(self, AgentNode): - start_event.agent_strategy = AgentNodeStrategyInit( - name=cast(AgentNodeData, self.node_data).agent_strategy_name, - icon=self.agent_strategy_icon, - ) - - # === + try: + self.populate_start_event(start_event) + except Exception: + logger.warning("Failed to populate start event for node %s", self._node_id, exc_info=True) yield start_event try: @@ -513,10 +488,8 @@ class Node(Generic[NodeDataT]): @abstractmethod def version(cls) -> str: """`node_version` returns the version of current node type.""" - # NOTE(QuantumGhost): This should be in sync with `NODE_TYPE_CLASSES_MAPPING`. - # - # If you have introduced a new node type, please add it to `NODE_TYPE_CLASSES_MAPPING` - # in `api/dify_graph/nodes/__init__.py`. + # NOTE(QuantumGhost): Node versions must remain unique per `NodeType` so + # `Node.get_node_type_classes_mapping()` can resolve numeric versions and `latest`. raise NotImplementedError("subclasses of BaseNode must implement `version` method.") @classmethod @@ -524,7 +497,9 @@ class Node(Generic[NodeDataT]): """Return mapping of NodeType -> {version -> Node subclass} using __init_subclass__ registry. Import all modules under dify_graph.nodes so subclasses register themselves on import. - Then we return a readonly view of the registry to avoid accidental mutation. + Callers that rely on workflow-local nodes defined outside `dify_graph.nodes` must import + those modules before invoking this method so they can register through `__init_subclass__`. + We then return a readonly view of the registry to avoid accidental mutation. """ # Import all node modules to ensure they are loaded (thus registered) import dify_graph.nodes as _nodes_pkg diff --git a/api/dify_graph/nodes/datasource/datasource_node.py b/api/dify_graph/nodes/datasource/datasource_node.py index b0d3e1a24e..62dcb2924f 100644 --- a/api/dify_graph/nodes/datasource/datasource_node.py +++ b/api/dify_graph/nodes/datasource/datasource_node.py @@ -48,6 +48,10 @@ class DatasourceNode(Node[DatasourceNodeData]): ) self.datasource_manager = datasource_manager + def populate_start_event(self, event) -> None: + event.provider_id = f"{self.node_data.plugin_id}/{self.node_data.provider_name}" + event.provider_type = self.node_data.provider_type + def _run(self) -> Generator: """ Run the datasource node diff --git a/api/dify_graph/nodes/iteration/iteration_node.py b/api/dify_graph/nodes/iteration/iteration_node.py index bc36f635e9..1d626f4bd6 100644 --- a/api/dify_graph/nodes/iteration/iteration_node.py +++ b/api/dify_graph/nodes/iteration/iteration_node.py @@ -486,14 +486,15 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): # variable selector to variable mapping try: # Get node class - from dify_graph.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING + from dify_graph.nodes.node_mapping import get_node_type_classes_mapping typed_sub_node_config = NodeConfigDictAdapter.validate_python(sub_node_config) node_type = typed_sub_node_config["data"].type - if node_type not in NODE_TYPE_CLASSES_MAPPING: + node_mapping = get_node_type_classes_mapping() + if node_type not in node_mapping: continue node_version = str(typed_sub_node_config["data"].version) - node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version] + node_cls = node_mapping[node_type][node_version] sub_node_variable_mapping = node_cls.extract_variable_selector_to_variable_mapping( graph_config=graph_config, config=typed_sub_node_config diff --git a/api/dify_graph/nodes/loop/loop_node.py b/api/dify_graph/nodes/loop/loop_node.py index 27b78eaa3f..1a8774f445 100644 --- a/api/dify_graph/nodes/loop/loop_node.py +++ b/api/dify_graph/nodes/loop/loop_node.py @@ -316,14 +316,15 @@ class LoopNode(LLMUsageTrackingMixin, Node[LoopNodeData]): # variable selector to variable mapping try: # Get node class - from dify_graph.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING + from dify_graph.nodes.node_mapping import get_node_type_classes_mapping typed_sub_node_config = NodeConfigDictAdapter.validate_python(sub_node_config) node_type = typed_sub_node_config["data"].type - if node_type not in NODE_TYPE_CLASSES_MAPPING: + node_mapping = get_node_type_classes_mapping() + if node_type not in node_mapping: continue node_version = str(typed_sub_node_config["data"].version) - node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version] + node_cls = node_mapping[node_type][node_version] sub_node_variable_mapping = node_cls.extract_variable_selector_to_variable_mapping( graph_config=graph_config, config=typed_sub_node_config diff --git a/api/dify_graph/nodes/node_mapping.py b/api/dify_graph/nodes/node_mapping.py index 8e5405f1aa..e0f5524a04 100644 --- a/api/dify_graph/nodes/node_mapping.py +++ b/api/dify_graph/nodes/node_mapping.py @@ -5,5 +5,24 @@ from dify_graph.nodes.base.node import Node LATEST_VERSION = "latest" -# Mapping is built by Node.get_node_type_classes_mapping(), which imports and walks dify_graph.nodes -NODE_TYPE_CLASSES_MAPPING: Mapping[NodeType, Mapping[str, type[Node]]] = Node.get_node_type_classes_mapping() + +def get_node_type_classes_mapping() -> Mapping[NodeType, Mapping[str, type[Node]]]: + """Return the live node registry after importing all `dify_graph.nodes` modules.""" + return Node.get_node_type_classes_mapping() + + +def resolve_node_class(*, node_type: NodeType, node_version: str) -> type[Node]: + node_mapping = get_node_type_classes_mapping().get(node_type) + if not node_mapping: + raise ValueError(f"No class mapping found for node type: {node_type}") + + latest_node_class = node_mapping.get(LATEST_VERSION) + matched_node_class = node_mapping.get(node_version) + node_class = matched_node_class or latest_node_class + if not node_class: + raise ValueError(f"No latest version class found for node type: {node_type}") + return node_class + + +# Snapshot kept for compatibility with older tests; production paths should use the live helpers. +NODE_TYPE_CLASSES_MAPPING: Mapping[NodeType, Mapping[str, type[Node]]] = get_node_type_classes_mapping() diff --git a/api/dify_graph/nodes/tool/tool_node.py b/api/dify_graph/nodes/tool/tool_node.py index 44d0ca885d..ec7386981e 100644 --- a/api/dify_graph/nodes/tool/tool_node.py +++ b/api/dify_graph/nodes/tool/tool_node.py @@ -65,6 +65,10 @@ class ToolNode(Node[ToolNodeData]): def version(cls) -> str: return "1" + def populate_start_event(self, event) -> None: + event.provider_id = self.node_data.provider_id + event.provider_type = self.node_data.provider_type + def _run(self) -> Generator[NodeEventBase, None, None]: """ Run the tool node diff --git a/api/dify_graph/nodes/trigger_plugin/trigger_event_node.py b/api/dify_graph/nodes/trigger_plugin/trigger_event_node.py index b4f1116f7e..536ba96dec 100644 --- a/api/dify_graph/nodes/trigger_plugin/trigger_event_node.py +++ b/api/dify_graph/nodes/trigger_plugin/trigger_event_node.py @@ -32,6 +32,9 @@ class TriggerEventNode(Node[TriggerEventNodeData]): def version(cls) -> str: return "1" + def populate_start_event(self, event) -> None: + event.provider_id = self.node_data.provider_id + def _run(self) -> NodeRunResult: """ Run the plugin trigger node. diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index ce745a4679..b9a565ec17 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -36,6 +36,7 @@ from core.rag.entities.event import ( ) from core.repositories.factory import DifyCoreRepositoryFactory from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository +from core.workflow.node_resolution import LATEST_VERSION, get_workflow_node_type_classes_mapping from core.workflow.workflow_entry import WorkflowEntry from dify_graph.entities.workflow_node_execution import ( WorkflowNodeExecution, @@ -48,7 +49,6 @@ from dify_graph.graph_events.base import GraphNodeEventBase from dify_graph.node_events.base import NodeRunResult from dify_graph.nodes.base.node import Node from dify_graph.nodes.http_request import HTTP_REQUEST_CONFIG_FILTER_KEY, build_http_request_config -from dify_graph.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from dify_graph.repositories.workflow_node_execution_repository import OrderConfig from dify_graph.runtime import VariablePool from dify_graph.system_variable import SystemVariable @@ -381,7 +381,7 @@ class RagPipelineService: """ # return default block config default_block_configs: list[dict[str, Any]] = [] - for node_type, node_class_mapping in NODE_TYPE_CLASSES_MAPPING.items(): + for node_type, node_class_mapping in get_workflow_node_type_classes_mapping().items(): node_class = node_class_mapping[LATEST_VERSION] filters = None if node_type is NodeType.HTTP_REQUEST: @@ -410,12 +410,13 @@ class RagPipelineService: :return: """ node_type_enum = NodeType(node_type) + node_mapping = get_workflow_node_type_classes_mapping() # return default block config - if node_type_enum not in NODE_TYPE_CLASSES_MAPPING: + if node_type_enum not in node_mapping: return None - node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION] + node_class = node_mapping[node_type_enum][LATEST_VERSION] final_filters = dict(filters) if filters else {} if node_type_enum is NodeType.HTTP_REQUEST and HTTP_REQUEST_CONFIG_FILTER_KEY not in final_filters: final_filters[HTTP_REQUEST_CONFIG_FILTER_KEY] = build_http_request_config( diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 2549cdf180..5b24c356c2 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -14,6 +14,7 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context from core.repositories import DifyCoreRepositoryFactory from core.repositories.human_input_repository import HumanInputFormRepositoryImpl +from core.workflow.node_resolution import LATEST_VERSION, get_workflow_node_type_classes_mapping from core.workflow.workflow_entry import WorkflowEntry from dify_graph.entities import GraphInitParams, WorkflowNodeExecution from dify_graph.entities.graph_config import NodeConfigDict @@ -34,7 +35,6 @@ from dify_graph.nodes.human_input.entities import ( ) from dify_graph.nodes.human_input.enums import HumanInputFormKind from dify_graph.nodes.human_input.human_input_node import HumanInputNode -from dify_graph.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from dify_graph.nodes.start.entities import StartNodeData from dify_graph.repositories.human_input_form_repository import FormCreateParams from dify_graph.runtime import GraphRuntimeState, VariablePool @@ -619,7 +619,7 @@ class WorkflowService: """ # return default block config default_block_configs: list[Mapping[str, object]] = [] - for node_type, node_class_mapping in NODE_TYPE_CLASSES_MAPPING.items(): + for node_type, node_class_mapping in get_workflow_node_type_classes_mapping().items(): node_class = node_class_mapping[LATEST_VERSION] filters = None if node_type is NodeType.HTTP_REQUEST: @@ -650,12 +650,13 @@ class WorkflowService: :return: """ node_type_enum = NodeType(node_type) + node_mapping = get_workflow_node_type_classes_mapping() # return default block config - if node_type_enum not in NODE_TYPE_CLASSES_MAPPING: + if node_type_enum not in node_mapping: return {} - node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION] + node_class = node_mapping[node_type_enum][LATEST_VERSION] resolved_filters = dict(filters) if filters else {} if node_type_enum is NodeType.HTTP_REQUEST and HTTP_REQUEST_CONFIG_FILTER_KEY not in resolved_filters: resolved_filters[HTTP_REQUEST_CONFIG_FILTER_KEY] = build_http_request_config( diff --git a/api/tests/integration_tests/workflow/nodes/test_tool.py b/api/tests/integration_tests/workflow/nodes/test_tool.py index 23cb56d2a5..8a4fb8eda4 100644 --- a/api/tests/integration_tests/workflow/nodes/test_tool.py +++ b/api/tests/integration_tests/workflow/nodes/test_tool.py @@ -1,6 +1,6 @@ import time import uuid -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom from core.tools.utils.configuration import ToolParameterConfigurationManager @@ -87,17 +87,20 @@ def test_tool_variable_invoke(): } ) - ToolParameterConfigurationManager.decrypt_tool_parameters = MagicMock(return_value={"format": "%Y-%m-%d %H:%M:%S"}) + with patch.object( + ToolParameterConfigurationManager, + "decrypt_tool_parameters", + return_value={"format": "%Y-%m-%d %H:%M:%S"}, + ): + node.graph_runtime_state.variable_pool.add(["1", "args1"], "1+1") - node.graph_runtime_state.variable_pool.add(["1", "args1"], "1+1") - - # execute node - result = node._run() - for item in result: - if isinstance(item, StreamCompletedEvent): - assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED - assert item.node_run_result.outputs is not None - assert item.node_run_result.outputs.get("text") is not None + # execute node + result = node._run() + for item in result: + if isinstance(item, StreamCompletedEvent): + assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert item.node_run_result.outputs is not None + assert item.node_run_result.outputs.get("text") is not None def test_tool_mixed_invoke(): @@ -121,12 +124,15 @@ def test_tool_mixed_invoke(): } ) - ToolParameterConfigurationManager.decrypt_tool_parameters = MagicMock(return_value={"format": "%Y-%m-%d %H:%M:%S"}) - - # execute node - result = node._run() - for item in result: - if isinstance(item, StreamCompletedEvent): - assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED - assert item.node_run_result.outputs is not None - assert item.node_run_result.outputs.get("text") is not None + with patch.object( + ToolParameterConfigurationManager, + "decrypt_tool_parameters", + return_value={"format": "%Y-%m-%d %H:%M:%S"}, + ): + # execute node + result = node._run() + for item in result: + if isinstance(item, StreamCompletedEvent): + assert item.node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert item.node_run_result.outputs is not None + assert item.node_run_result.outputs.get("text") is not None diff --git a/api/tests/unit_tests/controllers/console/workspace/test_plugin.py b/api/tests/unit_tests/controllers/console/workspace/test_plugin.py index f6db55db5b..eb19243225 100644 --- a/api/tests/unit_tests/controllers/console/workspace/test_plugin.py +++ b/api/tests/unit_tests/controllers/console/workspace/test_plugin.py @@ -200,10 +200,13 @@ class TestPluginUploadFromPkgApi: app.test_request_context("/", data=data, content_type="multipart/form-data"), patch("controllers.console.workspace.plugin.current_account_with_tenant", return_value=(None, "t1")), patch("controllers.console.workspace.plugin.dify_config.PLUGIN_MAX_PACKAGE_SIZE", 0), + patch("controllers.console.workspace.plugin.PluginService.upload_pkg") as upload_pkg_mock, ): with pytest.raises(ValueError): method(api) + upload_pkg_mock.assert_not_called() + class TestPluginInstallFromPkgApi: def test_install_from_pkg(self, app): @@ -444,10 +447,13 @@ class TestPluginUploadFromBundleApi: ), patch("controllers.console.workspace.plugin.current_account_with_tenant", return_value=(None, "t1")), patch("controllers.console.workspace.plugin.dify_config.PLUGIN_MAX_BUNDLE_SIZE", 0), + patch("controllers.console.workspace.plugin.PluginService.upload_bundle") as upload_bundle_mock, ): with pytest.raises(ValueError): method(api) + upload_bundle_mock.assert_not_called() + class TestPluginInstallFromGithubApi: def test_success(self, app): diff --git a/api/tests/unit_tests/core/agent/test_cot_agent_runner.py b/api/tests/unit_tests/core/agent/test_cot_agent_runner.py index 9518c61202..f6d1edbaf0 100644 --- a/api/tests/unit_tests/core/agent/test_cot_agent_runner.py +++ b/api/tests/unit_tests/core/agent/test_cot_agent_runner.py @@ -5,8 +5,8 @@ import pytest from core.agent.cot_agent_runner import CotAgentRunner from core.agent.entities import AgentScratchpadUnit +from core.agent.errors import AgentMaxIterationError from dify_graph.model_runtime.entities.llm_entities import LLMUsage -from dify_graph.nodes.agent.exc import AgentMaxIterationError class DummyRunner(CotAgentRunner): diff --git a/api/tests/unit_tests/core/agent/test_fc_agent_runner.py b/api/tests/unit_tests/core/agent/test_fc_agent_runner.py index 8843a8d505..299c9b31d2 100644 --- a/api/tests/unit_tests/core/agent/test_fc_agent_runner.py +++ b/api/tests/unit_tests/core/agent/test_fc_agent_runner.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock import pytest +from core.agent.errors import AgentMaxIterationError from core.agent.fc_agent_runner import FunctionCallAgentRunner from core.app.apps.base_app_queue_manager import PublishFrom from core.app.entities.queue_entities import QueueMessageFileEvent @@ -14,7 +15,6 @@ from dify_graph.model_runtime.entities.message_entities import ( TextPromptMessageContent, UserPromptMessage, ) -from dify_graph.nodes.agent.exc import AgentMaxIterationError # ============================== # Dummy Helper Classes diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py index d8afd3b10a..108b740344 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py @@ -105,10 +105,10 @@ class TestWorkflowBasedAppRunner: from core.app.apps import workflow_app_runner - monkeypatch.setitem( - workflow_app_runner.NODE_TYPE_CLASSES_MAPPING, - NodeType.START, - {"1": _NodeCls}, + monkeypatch.setattr( + workflow_app_runner, + "resolve_workflow_node_class", + lambda **_kwargs: _NodeCls, ) monkeypatch.setattr( "core.app.apps.workflow_app_runner.load_into_variable_pool", diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py index 34e714a227..9e3574266c 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py @@ -11,10 +11,10 @@ from typing import TYPE_CHECKING, Any, Optional from unittest.mock import MagicMock from core.model_manager import ModelInstance +from core.workflow.nodes.agent import AgentNode from dify_graph.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from dify_graph.model_runtime.entities.llm_entities import LLMUsage from dify_graph.node_events import NodeRunResult, StreamChunkEvent, StreamCompletedEvent -from dify_graph.nodes.agent import AgentNode from dify_graph.nodes.code import CodeNode from dify_graph.nodes.document_extractor import DocumentExtractorNode from dify_graph.nodes.http_request import HttpRequestNode @@ -79,6 +79,14 @@ class MockNodeMixin: if isinstance(self, _ToolNode): kwargs.setdefault("tool_file_manager_factory", MagicMock(spec=ToolFileManagerProtocol)) + if isinstance(self, AgentNode): + presentation_provider = MagicMock() + presentation_provider.get_icon.return_value = None + kwargs.setdefault("strategy_resolver", MagicMock()) + kwargs.setdefault("presentation_provider", presentation_provider) + kwargs.setdefault("runtime_support", MagicMock()) + kwargs.setdefault("message_transformer", MagicMock()) + super().__init__( id=id, config=config, diff --git a/api/tests/unit_tests/core/workflow/test_node_factory.py b/api/tests/unit_tests/core/workflow/test_node_factory.py index 4a5f561c22..934e29546c 100644 --- a/api/tests/unit_tests/core/workflow/test_node_factory.py +++ b/api/tests/unit_tests/core/workflow/test_node_factory.py @@ -260,7 +260,11 @@ class TestDifyNodeFactoryCreateNode: factory.create_node({"id": "node-id", "data": {"type": "missing"}}) def test_rejects_missing_class_mapping(self, monkeypatch, factory): - monkeypatch.setattr(node_factory, "NODE_TYPE_CLASSES_MAPPING", {}) + monkeypatch.setattr( + node_factory, + "resolve_workflow_node_class", + MagicMock(side_effect=ValueError("No class mapping found for node type: start")), + ) with pytest.raises(ValueError, match="No class mapping found for node type: start"): factory.create_node({"id": "node-id", "data": {"type": NodeType.START.value}}) @@ -268,8 +272,8 @@ class TestDifyNodeFactoryCreateNode: def test_rejects_missing_latest_class(self, monkeypatch, factory): monkeypatch.setattr( node_factory, - "NODE_TYPE_CLASSES_MAPPING", - {NodeType.START: {node_factory.LATEST_VERSION: None}}, + "resolve_workflow_node_class", + MagicMock(side_effect=ValueError("No latest version class found for node type: start")), ) with pytest.raises(ValueError, match="No latest version class found for node type: start"): @@ -281,13 +285,8 @@ class TestDifyNodeFactoryCreateNode: matched_node_class = MagicMock(return_value=matched_node) monkeypatch.setattr( node_factory, - "NODE_TYPE_CLASSES_MAPPING", - { - NodeType.START: { - node_factory.LATEST_VERSION: latest_node_class, - "9": matched_node_class, - } - }, + "resolve_workflow_node_class", + MagicMock(return_value=matched_node_class), ) result = factory.create_node({"id": "node-id", "data": {"type": NodeType.START.value, "version": "9"}}) @@ -306,8 +305,8 @@ class TestDifyNodeFactoryCreateNode: latest_node_class = MagicMock(return_value=latest_node) monkeypatch.setattr( node_factory, - "NODE_TYPE_CLASSES_MAPPING", - {NodeType.START: {node_factory.LATEST_VERSION: latest_node_class}}, + "resolve_workflow_node_class", + MagicMock(return_value=latest_node_class), ) result = factory.create_node({"id": "node-id", "data": {"type": NodeType.START.value, "version": "9"}}) @@ -338,8 +337,8 @@ class TestDifyNodeFactoryCreateNode: constructor = MagicMock(name=constructor_name, return_value=created_node) monkeypatch.setattr( node_factory, - "NODE_TYPE_CLASSES_MAPPING", - {node_type: {node_factory.LATEST_VERSION: constructor}}, + "resolve_workflow_node_class", + MagicMock(return_value=constructor), ) if constructor_name == "HumanInputNode": @@ -411,8 +410,8 @@ class TestDifyNodeFactoryCreateNode: constructor = MagicMock(name=constructor_name, return_value=created_node) monkeypatch.setattr( node_factory, - "NODE_TYPE_CLASSES_MAPPING", - {node_type: {node_factory.LATEST_VERSION: constructor}}, + "resolve_workflow_node_class", + MagicMock(return_value=constructor), ) llm_init_kwargs = { "credentials_provider": sentinel.credentials_provider, diff --git a/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py b/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py index fe211fb76a..68e42894fc 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py @@ -400,8 +400,8 @@ class TestWorkflowEntryHelpers: def test_run_free_node_rejects_missing_node_class(self, monkeypatch): monkeypatch.setattr( workflow_entry, - "NODE_TYPE_CLASSES_MAPPING", - {NodeType.PARAMETER_EXTRACTOR: {"1": None}}, + "resolve_workflow_node_class", + MagicMock(return_value=None), ) with pytest.raises(ValueError, match="Node class not found for node type parameter-extractor"): @@ -432,8 +432,8 @@ class TestWorkflowEntryHelpers: dify_node_factory.create_node.return_value = FakeNode() monkeypatch.setattr( workflow_entry, - "NODE_TYPE_CLASSES_MAPPING", - {NodeType.PARAMETER_EXTRACTOR: {"1": FakeNodeClass}}, + "resolve_workflow_node_class", + MagicMock(return_value=FakeNodeClass), ) with ( @@ -518,8 +518,8 @@ class TestWorkflowEntryHelpers: dify_node_factory.create_node.return_value = FakeNode() monkeypatch.setattr( workflow_entry, - "NODE_TYPE_CLASSES_MAPPING", - {NodeType.PARAMETER_EXTRACTOR: {"1": FakeNodeClass}}, + "resolve_workflow_node_class", + MagicMock(return_value=FakeNodeClass), ) with ( diff --git a/api/tests/unit_tests/services/test_workflow_service.py b/api/tests/unit_tests/services/test_workflow_service.py index 8820a1acc0..5ce0e6f140 100644 --- a/api/tests/unit_tests/services/test_workflow_service.py +++ b/api/tests/unit_tests/services/test_workflow_service.py @@ -1001,12 +1001,12 @@ class TestWorkflowService: Used by the UI to populate the node palette and provide sensible defaults when users add new nodes to their workflow. """ - with patch("services.workflow_service.NODE_TYPE_CLASSES_MAPPING") as mock_mapping: + with patch("services.workflow_service.get_workflow_node_type_classes_mapping") as mock_mapping: # Mock node class with default config mock_node_class = MagicMock() mock_node_class.get_default_config.return_value = {"type": "llm", "config": {}} - mock_mapping.items.return_value = [(NodeType.LLM, {"latest": mock_node_class})] + mock_mapping.return_value = {NodeType.LLM: {"latest": mock_node_class}} with patch("services.workflow_service.LATEST_VERSION", "latest"): result = workflow_service.get_default_block_configs() @@ -1025,7 +1025,7 @@ class TestWorkflowService: ) with ( - patch("services.workflow_service.NODE_TYPE_CLASSES_MAPPING") as mock_mapping, + patch("services.workflow_service.get_workflow_node_type_classes_mapping") as mock_mapping, patch("services.workflow_service.LATEST_VERSION", "latest"), patch( "services.workflow_service.build_http_request_config", @@ -1036,10 +1036,10 @@ class TestWorkflowService: mock_http_node_class.get_default_config.return_value = {"type": "http-request", "config": {}} mock_llm_node_class = MagicMock() mock_llm_node_class.get_default_config.return_value = {"type": "llm", "config": {}} - mock_mapping.items.return_value = [ - (NodeType.HTTP_REQUEST, {"latest": mock_http_node_class}), - (NodeType.LLM, {"latest": mock_llm_node_class}), - ] + mock_mapping.return_value = { + NodeType.HTTP_REQUEST: {"latest": mock_http_node_class}, + NodeType.LLM: {"latest": mock_llm_node_class}, + } result = workflow_service.get_default_block_configs() @@ -1060,7 +1060,7 @@ class TestWorkflowService: This includes default values for all required and optional parameters. """ with ( - patch("services.workflow_service.NODE_TYPE_CLASSES_MAPPING") as mock_mapping, + patch("services.workflow_service.get_workflow_node_type_classes_mapping") as mock_mapping, patch("services.workflow_service.LATEST_VERSION", "latest"), ): # Mock node class with default config @@ -1069,8 +1069,7 @@ class TestWorkflowService: mock_node_class.get_default_config.return_value = mock_config # Create a mock mapping that includes NodeType.LLM - mock_mapping.__contains__.return_value = True - mock_mapping.__getitem__.return_value = {"latest": mock_node_class} + mock_mapping.return_value = {NodeType.LLM: {"latest": mock_node_class}} result = workflow_service.get_default_block_config(NodeType.LLM.value) @@ -1079,9 +1078,8 @@ class TestWorkflowService: def test_get_default_block_config_invalid_node_type(self, workflow_service): """Test get_default_block_config returns empty dict for invalid node type.""" - with patch("services.workflow_service.NODE_TYPE_CLASSES_MAPPING") as mock_mapping: - # Mock mapping to not contain the node type - mock_mapping.__contains__.return_value = False + with patch("services.workflow_service.get_workflow_node_type_classes_mapping") as mock_mapping: + mock_mapping.return_value = {} # Use a valid NodeType but one that's not in the mapping result = workflow_service.get_default_block_config(NodeType.LLM.value) @@ -1100,7 +1098,7 @@ class TestWorkflowService: ) with ( - patch("services.workflow_service.NODE_TYPE_CLASSES_MAPPING") as mock_mapping, + patch("services.workflow_service.get_workflow_node_type_classes_mapping") as mock_mapping, patch("services.workflow_service.LATEST_VERSION", "latest"), patch( "services.workflow_service.build_http_request_config", @@ -1110,8 +1108,7 @@ class TestWorkflowService: mock_node_class = MagicMock() expected = {"type": "http-request", "config": {}} mock_node_class.get_default_config.return_value = expected - mock_mapping.__contains__.return_value = True - mock_mapping.__getitem__.return_value = {"latest": mock_node_class} + mock_mapping.return_value = {NodeType.HTTP_REQUEST: {"latest": mock_node_class}} result = workflow_service.get_default_block_config(NodeType.HTTP_REQUEST.value) @@ -1132,15 +1129,14 @@ class TestWorkflowService: ) with ( - patch("services.workflow_service.NODE_TYPE_CLASSES_MAPPING") as mock_mapping, + patch("services.workflow_service.get_workflow_node_type_classes_mapping") as mock_mapping, patch("services.workflow_service.LATEST_VERSION", "latest"), patch("services.workflow_service.build_http_request_config") as mock_build_config, ): mock_node_class = MagicMock() expected = {"type": "http-request", "config": {}} mock_node_class.get_default_config.return_value = expected - mock_mapping.__contains__.return_value = True - mock_mapping.__getitem__.return_value = {"latest": mock_node_class} + mock_mapping.return_value = {NodeType.HTTP_REQUEST: {"latest": mock_node_class}} result = workflow_service.get_default_block_config( NodeType.HTTP_REQUEST.value, @@ -1155,8 +1151,8 @@ class TestWorkflowService: def test_get_default_block_config_http_request_malformed_config_raises_value_error(self, workflow_service): with ( patch( - "services.workflow_service.NODE_TYPE_CLASSES_MAPPING", - {NodeType.HTTP_REQUEST: {"latest": HttpRequestNode}}, + "services.workflow_service.get_workflow_node_type_classes_mapping", + return_value={NodeType.HTTP_REQUEST: {"latest": HttpRequestNode}}, ), patch("services.workflow_service.LATEST_VERSION", "latest"), ):