From 5cef61137cddb64b12f2fe8572894c2fd254746b Mon Sep 17 00:00:00 2001 From: GuanMu Date: Fri, 23 Jan 2026 14:36:45 +0800 Subject: [PATCH 01/68] feat(workflow): add document metadata configuration for Knowledge Base node Implement comprehensive document metadata support in Knowledge Base workflow node, allowing users to configure metadata values through both constants and variables. Backend changes: - Add DocMetadata model with support for constant values and variable selectors - Implement metadata processing in KnowledgeIndexNode with variable resolution - Add batch query optimization to prevent N+1 queries - Implement metadata validation and binding creation - Add comprehensive unit tests for node and service layers Frontend changes: - Add MetadataSection component with type-aware input controls - String type: text input - Number type: number input with validation - Time type: date picker (Unix timestamp) - Implement variable filtering based on metadata data type - String metadata: only string variables - Number metadata: only number/integer variables - Time metadata: only time-related number variables (timestamp, *time*, *date*, *at*) - Add VarReferencePicker with 360px min-width for better UX - Standardize font size to text-[13px] across all inputs - Add i18n support for all user-facing strings Technical improvements: - Use SQLAlchemy attributes.flag_modified() for JSON field updates - Optimize logging to follow project standards (warnings and exceptions only) - Add type safety with proper TypeScript definitions - Implement proper error handling with user-friendly messages Co-Authored-By: Claude --- .../nodes/knowledge_index/entities.py | 10 + .../knowledge_index/knowledge_index_node.py | 116 +++++- api/services/dataset_service.py | 53 +++ .../knowledge_entities/knowledge_entities.py | 7 + api/services/metadata_service.py | 59 +++ .../test_knowledge_index_node.py | 117 ++++++ .../services/test_dataset_service_metadata.py | 137 +++++++ .../datasets/metadata/base/date-picker.tsx | 2 +- .../components/metadata-section.tsx | 346 ++++++++++++++++++ .../nodes/knowledge-base/hooks/use-config.ts | 15 + .../workflow/nodes/knowledge-base/panel.tsx | 22 ++ .../workflow/nodes/knowledge-base/types.ts | 8 + web/i18n/en-US/dataset-creation.json | 2 + web/i18n/en-US/workflow.json | 2 + web/i18n/zh-Hans/dataset-creation.json | 2 + web/i18n/zh-Hans/workflow.json | 2 + 16 files changed, 897 insertions(+), 3 deletions(-) create mode 100644 api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py create mode 100644 api/tests/unit_tests/services/test_dataset_service_metadata.py create mode 100644 web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx diff --git a/api/core/workflow/nodes/knowledge_index/entities.py b/api/core/workflow/nodes/knowledge_index/entities.py index 3daca90b9b..339ffd3e40 100644 --- a/api/core/workflow/nodes/knowledge_index/entities.py +++ b/api/core/workflow/nodes/knowledge_index/entities.py @@ -150,6 +150,15 @@ class ParentChildStructureChunk(BaseModel): data_source_info: Union[FileInfo, OnlineDocumentInfo, WebsiteInfo] +class DocMetadata(BaseModel): + """ + Doc Metadata. + """ + + metadata_id: str + value: str | int | float | list[str] + + class KnowledgeIndexNodeData(BaseNodeData): """ Knowledge index Node Data. @@ -158,3 +167,4 @@ class KnowledgeIndexNodeData(BaseNodeData): type: str = "knowledge-index" chunk_structure: str index_chunk_variable_selector: list[str] + doc_metadata: list[DocMetadata] | None = None diff --git a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py index 17ca4bef7b..ad227fa59a 100644 --- a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py +++ b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py @@ -1,10 +1,11 @@ import datetime import logging import time -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from typing import Any from sqlalchemy import func, select +from sqlalchemy.orm import attributes from core.app.entities.app_invoke_entities import InvokeFrom from core.rag.index_processor.index_processor_factory import IndexProcessorFactory @@ -16,7 +17,7 @@ from core.workflow.nodes.base.node import Node from core.workflow.nodes.base.template import Template from core.workflow.runtime import VariablePool from extensions.ext_database import db -from models.dataset import Dataset, Document, DocumentSegment +from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding, Document, DocumentSegment from .entities import KnowledgeIndexNodeData from .exc import ( @@ -25,6 +26,9 @@ from .exc import ( logger = logging.getLogger(__name__) +# Constant for built-in metadata identifier +BUILT_IN_METADATA_ID = "built-in" + default_retrieval_model = { "search_method": RetrievalMethod.SEMANTIC_SEARCH, "reranking_enable": False, @@ -161,6 +165,88 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): } ) + # Process doc_metadata before commit to ensure it's saved with the same document object + if node_data.doc_metadata: + try: + # Fetch metadata definitions for name mapping + metadata_name_map: dict[str, str] = {} + dataset_metadatas = db.session.scalars( + select(DatasetMetadata).where(DatasetMetadata.dataset_id == dataset.id) + ).all() + for md in dataset_metadatas: + metadata_name_map[md.id] = md.name + + # Collect valid metadata IDs (excluding built-in) + valid_metadata_ids = [ + item.metadata_id + for item in node_data.doc_metadata + if item.metadata_id != BUILT_IN_METADATA_ID and item.metadata_id in metadata_name_map + ] + + # Batch fetch existing bindings to avoid N+1 query + existing_binding_ids: set[str] = set() + if valid_metadata_ids: + existing_bindings = db.session.scalars( + select(DatasetMetadataBinding.metadata_id).where( + DatasetMetadataBinding.dataset_id == dataset.id, + DatasetMetadataBinding.document_id == doc_id_value, + DatasetMetadataBinding.metadata_id.in_(valid_metadata_ids), + ) + ).all() + existing_binding_ids = set(existing_bindings) + + doc_metadata_dict = document.doc_metadata or {} + + for item in node_data.doc_metadata: + # Skip built-in fields + if item.metadata_id == BUILT_IN_METADATA_ID: + continue + + # Resolve Name + md_name = metadata_name_map.get(item.metadata_id) + if not md_name: + logger.warning( + "[KnowledgeIndexNode] metadata_id %s not found, skipping", item.metadata_id + ) + continue + + # Resolve Value + value = item.value + if isinstance(value, list): + var_obj = variable_pool.get(value) + if var_obj: + value = var_obj.to_object() + else: + # Variable not found - raise error to notify user of configuration issue + variable_path = ".".join(value) + raise KnowledgeIndexNodeError( + f"Variable '{variable_path}' not found for metadata '{md_name}'. " + f"Please check your variable configuration." + ) + + if value is not None: + doc_metadata_dict[md_name] = value + + # Create DatasetMetadataBinding if not exists + if item.metadata_id not in existing_binding_ids: + binding = DatasetMetadataBinding( + tenant_id=dataset.tenant_id, + dataset_id=dataset.id, + metadata_id=item.metadata_id, + document_id=doc_id_value, + created_by=self.user_id, + ) + db.session.add(binding) + existing_binding_ids.add(item.metadata_id) # Prevent duplicate in same batch + + document.doc_metadata = doc_metadata_dict + # Force SQLAlchemy to recognize the change to the JSON field + attributes.flag_modified(document, "doc_metadata") + + except Exception as e: + logger.exception("[KnowledgeIndexNode] Failed to process doc_metadata") + raise KnowledgeIndexNodeError(f"Failed to process document metadata: {e}") from e + db.session.commit() return { @@ -189,3 +275,29 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): Template instance for this knowledge index node """ return Template(segments=[]) + + @classmethod + def _extract_variable_selector_to_variable_mapping( + cls, *, graph_config: Mapping[str, Any], node_id: str, node_data: Mapping[str, Any] + ) -> Mapping[str, Sequence[str]]: + """ + Extract variable selector to variable mapping + :param graph_config: graph config + :param node_id: node id + :param node_data: node data + :return: + """ + variable_mapping = {} + node_data_obj = KnowledgeIndexNodeData(**node_data) + + # index chunk variable + variable_mapping[node_id + ".index_chunk_variable_selector"] = node_data_obj.index_chunk_variable_selector + + # doc_metadata variables + if node_data_obj.doc_metadata: + for item in node_data_obj.doc_metadata: + if isinstance(item.value, list): + variable_mapping[node_id + "." + item.metadata_id] = item.value + + return variable_mapping + diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 18e5613438..893baa53cc 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -39,6 +39,8 @@ from models.dataset import ( Dataset, DatasetAutoDisableLog, DatasetCollectionBinding, + DatasetMetadata, + DatasetMetadataBinding, DatasetPermission, DatasetPermissionEnum, DatasetProcessRule, @@ -1595,6 +1597,36 @@ class DocumentService: else default_retrieval_model ) + # Handle metadata configuration + # 1. Enable built-in metadata if requested + if knowledge_config.enable_built_in_metadata and not dataset.built_in_field_enabled: + dataset.built_in_field_enabled = True + db.session.add(dataset) + + # 2. Process custom metadata - validate and build dict + custom_metadata: dict = {} + metadata_bindings_to_create: list[tuple[str, str]] = [] # (metadata_id, metadata_name) + if knowledge_config.doc_metadata: + # Batch fetch all metadata definitions to avoid N+1 query + metadata_ids = [item.metadata_id for item in knowledge_config.doc_metadata] + metadata_defs = ( + db.session.query(DatasetMetadata) + .filter( + DatasetMetadata.id.in_(metadata_ids), + DatasetMetadata.dataset_id == dataset.id, + ) + .all() + ) + metadata_map = {md.id: md for md in metadata_defs} + + for item in knowledge_config.doc_metadata: + # Validate metadata_id belongs to this dataset + metadata_def = metadata_map.get(item.metadata_id) + if not metadata_def: + raise ValueError(f"Metadata with id '{item.metadata_id}' not found in this dataset") + custom_metadata[metadata_def.name] = item.value + metadata_bindings_to_create.append((item.metadata_id, metadata_def.name)) + documents = [] if knowledge_config.original_document_id: document = DocumentService.update_document_with_dataset_id(dataset, knowledge_config, account) @@ -1717,6 +1749,7 @@ class DocumentService: account, file.name, batch, + custom_metadata=custom_metadata or None, ) db.session.add(document) db.session.flush() @@ -1769,6 +1802,7 @@ class DocumentService: account, truncated_page_name, batch, + custom_metadata=custom_metadata or None, ) db.session.add(document) db.session.flush() @@ -1809,6 +1843,7 @@ class DocumentService: account, document_name, batch, + custom_metadata=custom_metadata or None, ) db.session.add(document) db.session.flush() @@ -1817,6 +1852,20 @@ class DocumentService: position += 1 db.session.commit() + # Create DatasetMetadataBinding records for custom metadata + if metadata_bindings_to_create and document_ids: + for doc_id in document_ids: + for metadata_id, _ in metadata_bindings_to_create: + binding = DatasetMetadataBinding( + tenant_id=dataset.tenant_id, + dataset_id=dataset.id, + document_id=doc_id, + metadata_id=metadata_id, + created_by=account.id, + ) + db.session.add(binding) + db.session.commit() + # trigger async task if document_ids: DocumentIndexingTaskProxy(dataset.tenant_id, dataset.id, document_ids).delay() @@ -2127,6 +2176,7 @@ class DocumentService: account: Account, name: str, batch: str, + custom_metadata: dict | None = None, ): document = Document( tenant_id=dataset.tenant_id, @@ -2151,6 +2201,9 @@ class DocumentService: BuiltInField.last_update_date: datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d %H:%M:%S"), BuiltInField.source: data_source_type, } + # Merge custom metadata if provided + if custom_metadata: + doc_metadata.update(custom_metadata) if doc_metadata: document.doc_metadata = doc_metadata return document diff --git a/api/services/entities/knowledge_entities/knowledge_entities.py b/api/services/entities/knowledge_entities/knowledge_entities.py index 7959734e89..7b90374f95 100644 --- a/api/services/entities/knowledge_entities/knowledge_entities.py +++ b/api/services/entities/knowledge_entities/knowledge_entities.py @@ -112,6 +112,11 @@ class MetaDataConfig(BaseModel): doc_metadata: dict +class DocumentMetadataInput(BaseModel): + metadata_id: str + value: str | int | float | None = None + + class KnowledgeConfig(BaseModel): original_document_id: str | None = None duplicate: bool = True @@ -125,6 +130,8 @@ class KnowledgeConfig(BaseModel): embedding_model_provider: str | None = None name: str | None = None is_multimodal: bool = False + enable_built_in_metadata: bool = False + doc_metadata: list[DocumentMetadataInput] | None = None class SegmentCreateArgs(BaseModel): diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index 3329ac349c..8538dac792 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -7,6 +7,8 @@ from extensions.ext_redis import redis_client from libs.datetime_utils import naive_utc_now from libs.login import current_account_with_tenant from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding +from models.model import App +from models.workflow import Workflow from services.dataset_service import DocumentService from services.entities.knowledge_entities.knowledge_entities import ( MetadataArgs, @@ -95,6 +97,52 @@ class MetadataService: finally: redis_client.delete(lock_key) + @staticmethod + def check_metadata_used_in_pipeline(dataset_id: str, metadata_id: str) -> tuple[bool, str | None]: + """ + Check if a metadata is used in the associated Pipeline's Knowledge Base node. + + Returns: + tuple[bool, str | None]: (is_used, pipeline_name) - True if used, with pipeline name + """ + # Get the dataset + dataset = db.session.query(Dataset).filter_by(id=dataset_id).first() + if not dataset or not dataset.pipeline_id: + return False, None + + # Get the draft workflow directly using pipeline_id as app_id + workflow = db.session.query(Workflow).filter_by( + app_id=dataset.pipeline_id, + version=Workflow.VERSION_DRAFT + ).first() + if not workflow: + return False, None + + # Get pipeline name from App if exists + app = db.session.query(App).filter_by(id=dataset.pipeline_id).first() + pipeline_name = app.name if app else "Pipeline" + + # Walk through nodes to find Knowledge Index node (type is "knowledge-index") + try: + graph_dict = workflow.graph_dict + if "nodes" not in graph_dict: + return False, None + + for node in graph_dict["nodes"]: + node_data = node.get("data", {}) + # Check if this is a knowledge-index node + if node_data.get("type") == "knowledge-index": + doc_metadata = node_data.get("doc_metadata", []) + if doc_metadata: + for item in doc_metadata: + if item.get("metadata_id") == metadata_id: + return True, pipeline_name + except Exception: + logger.exception("Error checking metadata usage in pipeline") + return False, None + + return False, None + @staticmethod def delete_metadata(dataset_id: str, metadata_id: str): lock_key = f"dataset_metadata_lock_{dataset_id}" @@ -103,6 +151,15 @@ class MetadataService: metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id).first() if metadata is None: raise ValueError("Metadata not found.") + + # Check if metadata is used in Pipeline before deletion + is_used, pipeline_name = MetadataService.check_metadata_used_in_pipeline(dataset_id, metadata_id) + if is_used: + raise ValueError( + f"Cannot delete metadata '{metadata.name}' because it is currently used in " + f"Pipeline '{pipeline_name}'." + ) + db.session.delete(metadata) # deal related documents @@ -122,6 +179,8 @@ class MetadataService: db.session.add(document) db.session.commit() return metadata + except ValueError: + raise except Exception: logger.exception("Delete metadata failed") finally: diff --git a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py new file mode 100644 index 0000000000..c7d514c243 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py @@ -0,0 +1,117 @@ + +import unittest +import uuid +from unittest.mock import MagicMock, patch + +from core.app.entities.app_invoke_entities import InvokeFrom +from core.workflow.enums import SystemVariableKey +from core.workflow.nodes.knowledge_index.entities import DocMetadata, KnowledgeIndexNodeData +from core.workflow.nodes.knowledge_index.knowledge_index_node import KnowledgeIndexNode +from core.workflow.runtime import VariablePool +from models.dataset import Dataset, DatasetMetadata, Document +from models.enums import UserFrom + + +class TestKnowledgeIndexNode(unittest.TestCase): + def setUp(self): + self.dataset_id = str(uuid.uuid4()) + self.document_id = str(uuid.uuid4()) + self.mock_dataset = MagicMock(spec=Dataset) + self.mock_dataset.id = self.dataset_id + self.mock_dataset.built_in_field_enabled = False + + self.mock_document = MagicMock(spec=Document) + self.mock_document.id = self.document_id + self.mock_document.doc_metadata = {} + + @patch('core.workflow.nodes.knowledge_index.knowledge_index_node.db.session') + @patch('core.workflow.nodes.knowledge_index.knowledge_index_node.IndexProcessorFactory') + def test_run_with_custom_metadata(self, mock_index_processor_factory, mock_db_session): + # Mock DB queries + mock_db_session.query.return_value.filter_by.return_value.first.side_effect = [ + self.mock_dataset, # For dataset query + self.mock_document # For document query + ] + + # Mock Dataset Metadata + mock_metadata = MagicMock(spec=DatasetMetadata) + mock_metadata.id = "meta_uuid_1" + mock_metadata.name = "Category" + mock_db_session.scalars.return_value.all.return_value = [mock_metadata] + # Simpler mock for the scalar query - switched to bulk fetch + mock_db_session.scalar.return_value = "Category" + + # Mock Variable Pool + pool = MagicMock(spec=VariablePool) + + # System variables + pool.get.side_effect = lambda selector: { + ("sys", SystemVariableKey.DATASET_ID): MagicMock(value=self.dataset_id), + ("sys", SystemVariableKey.DOCUMENT_ID): MagicMock(value=self.document_id), + ("sys", SystemVariableKey.INVOKE_FROM): None, + ("Start", "category"): MagicMock(to_object=lambda: "Financial"), + # handle list as key? get takes list + frozenset(["Start", "category"]): MagicMock(to_object=lambda: "Financial"), + }.get(tuple(selector) if isinstance(selector, list) else selector) + + # Handle the chunk variable specifically first + chunk_var_mock = MagicMock() + chunk_var_mock.value = {"chunk": "data"} + + # Override side_effect to handle list lookups correctly + def variable_pool_get(selector): + if selector == ["sys", SystemVariableKey.DATASET_ID]: + return MagicMock(value=self.dataset_id) + if selector == ["sys", SystemVariableKey.DOCUMENT_ID]: + return MagicMock(value=self.document_id) + if selector == ["Start", "category"]: + var = MagicMock() + var.to_object.return_value = "Financial" + return var + if selector == ["sys", SystemVariableKey.INVOKE_FROM]: + return None + if selector == ["sys", "chunks"]: # whatever index_chunk_variable_selector is + return chunk_var_mock + return None + + pool.get.side_effect = variable_pool_get + + # Node Configuration + node_data = KnowledgeIndexNodeData( + id="node1", + title="Knowledge", + chunk_structure="chunk", + index_chunk_variable_selector=["sys", "chunks"], + doc_metadata=[ + DocMetadata(metadata_id="meta_uuid_1", value=["Start", "category"]) + ] + ) + + # Initialize Node + graph_init_params = MagicMock() + graph_init_params.user_from = UserFrom.ACCOUNT + graph_init_params.invoke_from = InvokeFrom.WEB_APP + + config = { + "id": "node1", + "data": node_data.model_dump() + } + + node = KnowledgeIndexNode( + id="node1", + graph_init_params=graph_init_params, + graph_runtime_state=MagicMock(variable_pool=pool), + config=config + ) + + # Mock _invoke_knowledge_index to avoid calling specific index logic + node._invoke_knowledge_index = MagicMock() + + # Execute + result = node._run() + + # Verify + assert self.mock_document.doc_metadata["Category"] == "Financial" + mock_db_session.add.assert_called_with(self.mock_document) + mock_db_session.commit.assert_called() + diff --git a/api/tests/unit_tests/services/test_dataset_service_metadata.py b/api/tests/unit_tests/services/test_dataset_service_metadata.py new file mode 100644 index 0000000000..98d12317a5 --- /dev/null +++ b/api/tests/unit_tests/services/test_dataset_service_metadata.py @@ -0,0 +1,137 @@ +from unittest.mock import Mock, patch +from uuid import uuid4 + +import pytest + +from models.account import Account +from models.dataset import Dataset, DatasetMetadata, Document +from models.model import UploadFile +from services.dataset_service import DocumentService +from services.entities.knowledge_entities.knowledge_entities import ( + DataSource, + DocumentMetadataInput, + FileInfo, + InfoList, + KnowledgeConfig, +) + + +class TestDocumentServiceMetadata: + @pytest.fixture + def mock_dependencies(self): + with ( + patch("services.dataset_service.db.session") as mock_db, + patch("services.dataset_service.DatasetService.get_dataset") as mock_get_dataset, + patch("services.dataset_service.redis_client") as mock_redis, + patch("services.dataset_service.DocumentService.build_document") as mock_build_document, + patch("services.dataset_service.current_user") as mock_current_user, + patch("services.dataset_service.DocumentIndexingTaskProxy") as mock_indexing_task, + # We don't patch DocumentService.save_document_with_dataset_id as that's what we are testing + ): + # Hack to pass isinstance check + mock_current_user.__class__ = Account + mock_current_user.current_tenant_id = "tenant-123" + + yield { + "db": mock_db, + "get_dataset": mock_get_dataset, + "redis": mock_redis, + "build_document": mock_build_document, + "current_user": mock_current_user, + } + + def test_save_document_with_metadata(self, mock_dependencies): + # Arrange + dataset_id = str(uuid4()) + tenant_id = str(uuid4()) + account = Mock(spec=Account) + account.id = "account-1" + account.current_tenant_id = tenant_id + + dataset = Mock(spec=Dataset) + dataset.id = dataset_id + dataset.tenant_id = tenant_id + dataset.built_in_field_enabled = False + dataset.doc_form = "text_model" + mock_dependencies["get_dataset"].return_value = dataset + + # Define metadata inputs + metadata_id = str(uuid4()) + doc_metadata_inputs = [ + DocumentMetadataInput(metadata_id=metadata_id, value="custom_value") + ] + + # Knowledge config + knowledge_config = KnowledgeConfig( + data_source_type="upload_file", + data_source=DataSource( + info_list=InfoList( + data_source_type="upload_file", + file_info_list=FileInfo(file_ids=["file-1"]) + ) + ), + doc_form="text_model", + doc_language="en", + indexing_technique="high_quality", + enable_built_in_metadata=True, + doc_metadata=doc_metadata_inputs + ) + + # Mock local file for upload_file type + with patch("services.dataset_service.db.session.query") as mock_query: + # Mock DatasetMetadata lookup + mock_metadata_def = Mock(spec=DatasetMetadata) + mock_metadata_def.id = metadata_id + mock_metadata_def.name = "custom_field" + mock_metadata_def.field_type = "text" + + # Create a side effect for query(Model) + def query_side_effect(model): + m = Mock() + if model == DatasetMetadata: + m.filter.return_value.filter.return_value.first.return_value = mock_metadata_def + # handle the specific chain in code + m.filter_by.return_value.first.return_value = mock_metadata_def + return m + if model == Document: + doc_mock = Mock() + doc_mock.position = 1 + # For get_documents_position + m.filter_by.return_value.order_by.return_value.first.return_value = doc_mock + # For duplicate check + m.where.return_value.all.return_value = [] + return m + if model == UploadFile: + m.where.return_value.all.return_value = [Mock(id="file-1", tenant_id=tenant_id)] + return m + + return m + + mock_query.side_effect = query_side_effect + + # Mock build_document to return a document + mock_document = Mock(spec=Document) + mock_document.id = "doc-123" + mock_document.doc_metadata = {} + mock_dependencies["build_document"].return_value = mock_document + + # Act + DocumentService.save_document_with_dataset_id( + dataset=dataset, + knowledge_config=knowledge_config, + account=account + ) + + # Assert + # 1. Check built-in metadata enabled + assert dataset.built_in_field_enabled is True + + # 2. Check custom metadata passed to build_document + call_args = mock_dependencies["build_document"].call_args + assert call_args is not None + _, kwargs = call_args + assert "custom_metadata" in kwargs + assert kwargs["custom_metadata"] == {"custom_field": "custom_value"} + + # 3. Check DatasetMetadataBinding creation + assert mock_dependencies["db"].add.call_count >= 1 diff --git a/web/app/components/datasets/metadata/base/date-picker.tsx b/web/app/components/datasets/metadata/base/date-picker.tsx index 2f61549859..6839af6232 100644 --- a/web/app/components/datasets/metadata/base/date-picker.tsx +++ b/web/app/components/datasets/metadata/base/date-picker.tsx @@ -38,7 +38,7 @@ const WrappedDatePicker = ({
diff --git a/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx new file mode 100644 index 0000000000..ef5fbb99a6 --- /dev/null +++ b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx @@ -0,0 +1,346 @@ +'use client' +import type { FC } from 'react' +import type { DocMetadataItem } from '../types' +import type { BuiltInMetadataItem, MetadataItemWithValueLength } from '@/app/components/datasets/metadata/types' +import type { ValueSelector, Var } from '@/app/components/workflow/types' +import { RiAddLine, RiCloseLine, RiDraftLine, RiEditLine } from '@remixicon/react' +import { useCallback, useState } from 'react' +import { useTranslation } from 'react-i18next' +import Button from '@/app/components/base/button' +import { Variable02 } from '@/app/components/base/icons/src/vender/solid/development' +import { InputNumber } from '@/app/components/base/input-number' +import Toast from '@/app/components/base/toast' +import Tooltip from '@/app/components/base/tooltip' +import Datepicker from '@/app/components/datasets/metadata/base/date-picker' +import DatasetMetadataDrawer from '@/app/components/datasets/metadata/metadata-dataset/dataset-metadata-drawer' +import { DataType } from '@/app/components/datasets/metadata/types' +import VarReferencePicker from '@/app/components/workflow/nodes/_base/components/variable/var-reference-picker' +import { VarType } from '@/app/components/workflow/types' +import { + useBuiltInMetaDataFields, + useCreateMetaData, + useDeleteMetaData, + useRenameMeta, + useUpdateBuiltInStatus, +} from '@/service/knowledge/use-metadata' +import { cn } from '@/utils/classnames' + +type MetadataSectionProps = { + nodeId: string + datasetId?: string + enableBuiltInMetadata: boolean + onEnableBuiltInMetadataChange: (enabled: boolean) => void + userMetadata?: MetadataItemWithValueLength[] + docMetadata?: DocMetadataItem[] + onDocMetadataChange?: (metadata: DocMetadataItem[]) => void + onMetadataListChange?: () => void + readonly?: boolean + className?: string +} + +const MetadataSection: FC = ({ + nodeId, + datasetId, + enableBuiltInMetadata, + onEnableBuiltInMetadataChange, + userMetadata = [], + docMetadata = [], + onDocMetadataChange, + onMetadataListChange, + readonly, + className, +}) => { + const { t } = useTranslation() + const [isDrawerOpen, setIsDrawerOpen] = useState(false) + + // Get built-in metadata fields from API + const { data: builtInFieldsData } = useBuiltInMetaDataFields() + const builtInFields = builtInFieldsData?.fields || [] + + // Mutations for drawer + const createMetadataMutation = useCreateMetaData(datasetId || '') + const renameMetadataMutation = useRenameMeta(datasetId || '') + const deleteMetadataMutation = useDeleteMetaData(datasetId || '') + const updateBuiltInStatus = useUpdateBuiltInStatus(datasetId || '') + + // Drawer handlers + const handleAddMetadata = useCallback(async (data: BuiltInMetadataItem) => { + await createMetadataMutation.mutateAsync(data) + Toast.notify({ type: 'success', message: t('api.actionSuccess', { ns: 'common' }) }) + onMetadataListChange?.() + }, [createMetadataMutation, t, onMetadataListChange]) + + const handleRenameMetadata = useCallback(async (data: MetadataItemWithValueLength) => { + await renameMetadataMutation.mutateAsync(data) + Toast.notify({ type: 'success', message: t('api.actionSuccess', { ns: 'common' }) }) + onMetadataListChange?.() + }, [renameMetadataMutation, t, onMetadataListChange]) + + const handleDeleteMetadata = useCallback(async (id: string) => { + await deleteMetadataMutation.mutateAsync(id) + Toast.notify({ type: 'success', message: t('api.actionSuccess', { ns: 'common' }) }) + onMetadataListChange?.() + }, [deleteMetadataMutation, t, onMetadataListChange]) + + const handleBuiltInEnabledChange = useCallback(async (enabled: boolean) => { + onEnableBuiltInMetadataChange(enabled) + if (datasetId) { + await updateBuiltInStatus.mutateAsync(enabled) + } + }, [datasetId, updateBuiltInStatus, onEnableBuiltInMetadataChange]) + + // Document metadata value handlers + const handleAddDocMetadata = useCallback(() => { + if (onDocMetadataChange) { + onDocMetadataChange([...docMetadata, { metadata_id: '', value: '' }]) + } + }, [docMetadata, onDocMetadataChange]) + + const handleRemoveDocMetadata = useCallback((index: number) => { + if (onDocMetadataChange) { + const newMetadata = [...docMetadata] + newMetadata.splice(index, 1) + onDocMetadataChange(newMetadata) + } + }, [docMetadata, onDocMetadataChange]) + + const handleDocMetadataIdChange = useCallback((index: number, metadataId: string) => { + if (onDocMetadataChange) { + const newMetadata = [...docMetadata] + newMetadata[index] = { ...newMetadata[index], metadata_id: metadataId } + onDocMetadataChange(newMetadata) + } + }, [docMetadata, onDocMetadataChange]) + + const handleDocMetadataValueChange = useCallback((index: number, value: string | number | ValueSelector) => { + if (onDocMetadataChange) { + const newMetadata = [...docMetadata] + newMetadata[index] = { ...newMetadata[index], value } + onDocMetadataChange(newMetadata) + } + }, [docMetadata, onDocMetadataChange]) + + const getAvailableMetadataOptions = useCallback((currentId: string) => { + const usedIds = docMetadata.map(m => m.metadata_id).filter(id => id !== currentId) + return userMetadata.filter(m => !usedIds.includes(m.id)) + }, [userMetadata, docMetadata]) + + const getMetadataType = useCallback((metadataId: string): DataType | undefined => { + return userMetadata.find(m => m.id === metadataId)?.type + }, [userMetadata]) + + // Filter variables based on metadata type + const createVarFilter = useCallback((metadataId: string) => { + return (variable: Var): boolean => { + const metadataType = getMetadataType(metadataId) + + if (!metadataType) + return false + + // Type mapping: Metadata DataType -> Workflow VarType + switch (metadataType) { + case DataType.string: + return variable.type === VarType.string + case DataType.number: + return variable.type === VarType.number || variable.type === VarType.integer + case DataType.time: { + // Only allow number variables with time-related names + const varName = variable.variable.toLowerCase() + const isTimeRelated + = varName === 'timestamp' // sys.timestamp + || varName.includes('time') // current_time, expiry_time + || varName.includes('date') // created_date, updated_date + || varName.includes('at') // created_at, updated_at + + return (variable.type === VarType.number || variable.type === VarType.integer) + && isTimeRelated + } + default: + return false + } + } + }, [getMetadataType]) + + return ( +
+
+
+ {t('metadata.metadata', { ns: 'dataset' })} +
+ {datasetId && !readonly && ( + + )} +
+ + {/* Document Metadata Values Section */} + {userMetadata.length > 0 && ( +
+
+ {!readonly && ( + + )} +
+ + {docMetadata.length > 0 + ? ( +
+ {docMetadata.map((item, index) => { + const isVariable = Array.isArray(item.value) + const itemKey = item.metadata_id ? `metadata-${item.metadata_id}` : `new-${index}` + return ( +
+
+
+ +
+
+
+ +
!readonly && handleDocMetadataValueChange(index, [])} + > + +
+
+ +
!readonly && handleDocMetadataValueChange(index, '')} + > + +
+
+
+
+
+ {isVariable + ? ( + handleDocMetadataValueChange(index, value)} + isSupportConstantValue={false} + placeholder={t('placeholder.input', { ns: 'common' }) || ''} + className="h-full border-none !bg-transparent p-0" + zIndex={1000} + isShowNodeName + minWidth={360} + filterVar={createVarFilter(item.metadata_id)} + /> + ) + : ( +
+ {(() => { + const metadataType = getMetadataType(item.metadata_id) + + // Time type - use Datepicker + if (metadataType === DataType.time) { + return ( + handleDocMetadataValueChange(index, value || 0)} + /> + ) + } + + // Number type - use InputNumber + if (metadataType === DataType.number) { + return ( + handleDocMetadataValueChange(index, value)} + readOnly={readonly} + size="regular" + /> + ) + } + + // String type (default) - use text input + return ( + handleDocMetadataValueChange(index, e.target.value)} + placeholder={t('placeholder.input', { ns: 'common' }) || ''} + disabled={readonly} + className="h-full w-full bg-transparent text-[13px] text-text-primary outline-none placeholder:text-text-placeholder disabled:opacity-50" + /> + ) + })()} +
+ )} +
+
+
+ {!readonly && ( + + )} +
+ ) + })} +
+ ) + : ( +
+ {t('stepTwo.metadata.noValues', { ns: 'datasetCreation' })} +
+ )} +
+ )} + + {/* Metadata Drawer */} + {isDrawerOpen && datasetId && ( + setIsDrawerOpen(false)} + onAdd={handleAddMetadata} + onRename={handleRenameMetadata} + onRemove={handleDeleteMetadata} + /> + )} +
+ ) +} + +export default MetadataSection diff --git a/web/app/components/workflow/nodes/knowledge-base/hooks/use-config.ts b/web/app/components/workflow/nodes/knowledge-base/hooks/use-config.ts index f2a27d338e..9c18925117 100644 --- a/web/app/components/workflow/nodes/knowledge-base/hooks/use-config.ts +++ b/web/app/components/workflow/nodes/knowledge-base/hooks/use-config.ts @@ -1,4 +1,5 @@ import type { + DocMetadataItem, KnowledgeBaseNodeType, RerankingModel, } from '../types' @@ -246,6 +247,18 @@ export const useConfig = (id: string) => { }) }, [handleNodeDataUpdate]) + const handleEnableBuiltInMetadataChange = useCallback((enabled: boolean) => { + handleNodeDataUpdate({ + enable_built_in_metadata: enabled, + }) + }, [handleNodeDataUpdate]) + + const handleDocMetadataChange = useCallback((docMetadata: DocMetadataItem[]) => { + handleNodeDataUpdate({ + doc_metadata: docMetadata, + }) + }, [handleNodeDataUpdate]) + return { handleChunkStructureChange, handleIndexMethodChange, @@ -260,5 +273,7 @@ export const useConfig = (id: string) => { handleScoreThresholdChange, handleScoreThresholdEnabledChange, handleInputVariableChange, + handleEnableBuiltInMetadataChange, + handleDocMetadataChange, } } diff --git a/web/app/components/workflow/nodes/knowledge-base/panel.tsx b/web/app/components/workflow/nodes/knowledge-base/panel.tsx index f32278fc22..cd7c9ecec1 100644 --- a/web/app/components/workflow/nodes/knowledge-base/panel.tsx +++ b/web/app/components/workflow/nodes/knowledge-base/panel.tsx @@ -17,10 +17,13 @@ import { Group, } from '@/app/components/workflow/nodes/_base/components/layout' import VarReferencePicker from '@/app/components/workflow/nodes/_base/components/variable/var-reference-picker' +import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail' +import { useDatasetMetaData } from '@/service/knowledge/use-metadata' import Split from '../_base/components/split' import ChunkStructure from './components/chunk-structure' import EmbeddingModel from './components/embedding-model' import IndexMethod from './components/index-method' +import MetadataSection from './components/metadata-section' import RetrievalSetting from './components/retrieval-setting' import { useConfig } from './hooks/use-config' import { @@ -37,6 +40,10 @@ const Panel: FC> = ({ const { data: embeddingModelList } = useModelList(ModelTypeEnum.textEmbedding) const { data: rerankModelList } = useModelList(ModelTypeEnum.rerank) + // Get datasetId from context and fetch metadata + const datasetId = useDatasetDetailContextWithSelector(s => s.dataset?.id) + const { data: metadataList, refetch: refetchMetadataList } = useDatasetMetaData(datasetId || '') + const { handleChunkStructureChange, handleIndexMethodChange, @@ -51,6 +58,8 @@ const Panel: FC> = ({ handleScoreThresholdChange, handleScoreThresholdEnabledChange, handleInputVariableChange, + handleEnableBuiltInMetadataChange, + handleDocMetadataChange, } = useConfig(id) const filterVar = useCallback((variable: Var) => { @@ -190,6 +199,19 @@ const Panel: FC> = ({ />
+ + + ) } diff --git a/web/app/components/workflow/nodes/knowledge-base/types.ts b/web/app/components/workflow/nodes/knowledge-base/types.ts index b54e8e2b2f..7f0bb3f18e 100644 --- a/web/app/components/workflow/nodes/knowledge-base/types.ts +++ b/web/app/components/workflow/nodes/knowledge-base/types.ts @@ -42,6 +42,12 @@ export type RetrievalSetting = { score_threshold: number reranking_mode?: RerankingModeEnum } + +export type DocMetadataItem = { + metadata_id: string + value: string | number | string[] // string[] for ValueSelector +} + export type KnowledgeBaseNodeType = CommonNodeType & { index_chunk_variable_selector: string[] chunk_structure?: ChunkStructureEnum @@ -50,6 +56,8 @@ export type KnowledgeBaseNodeType = CommonNodeType & { embedding_model_provider?: string keyword_number: number retrieval_model: RetrievalSetting + enable_built_in_metadata?: boolean + doc_metadata?: DocMetadataItem[] _embeddingModelList?: Model[] _rerankModelList?: Model[] } diff --git a/web/i18n/en-US/dataset-creation.json b/web/i18n/en-US/dataset-creation.json index e544aaa097..e1ac975b97 100644 --- a/web/i18n/en-US/dataset-creation.json +++ b/web/i18n/en-US/dataset-creation.json @@ -121,6 +121,8 @@ "stepTwo.indexSettingTip": "To change the index method & embedding model, please go to the ", "stepTwo.maxLength": "Maximum chunk length", "stepTwo.maxLengthCheck": "Maximum chunk length should be less than {{limit}}", + "stepTwo.metadata.customValues": "Custom Values", + "stepTwo.metadata.noValues": "No values configured", "stepTwo.nextStep": "Save & Process", "stepTwo.notAvailableForParentChild": "Not available for Parent-child Index", "stepTwo.notAvailableForQA": "Not available for Q&A Index", diff --git a/web/i18n/en-US/workflow.json b/web/i18n/en-US/workflow.json index 107dad5b28..ebf6eea08e 100644 --- a/web/i18n/en-US/workflow.json +++ b/web/i18n/en-US/workflow.json @@ -447,6 +447,8 @@ "nodes.common.retry.times": "times", "nodes.common.typeSwitch.input": "Input value", "nodes.common.typeSwitch.variable": "Use variable", + "nodes.common.valueType.constant": "Constant", + "nodes.common.valueType.variable": "Variable", "nodes.dataSource.add": "Add data source", "nodes.dataSource.supportedFileFormats": "Supported file formats", "nodes.dataSource.supportedFileFormatsPlaceholder": "File extension, e.g. doc", diff --git a/web/i18n/zh-Hans/dataset-creation.json b/web/i18n/zh-Hans/dataset-creation.json index 102f64e5e7..9a2ecbf5f1 100644 --- a/web/i18n/zh-Hans/dataset-creation.json +++ b/web/i18n/zh-Hans/dataset-creation.json @@ -121,6 +121,8 @@ "stepTwo.indexSettingTip": "要更改索引方法和 embedding 模型,请转到", "stepTwo.maxLength": "分段最大长度", "stepTwo.maxLengthCheck": "分段最大长度不能大于 {{limit}}", + "stepTwo.metadata.customValues": "自定义值", + "stepTwo.metadata.noValues": "未配置任何值", "stepTwo.nextStep": "保存并处理", "stepTwo.notAvailableForParentChild": "不支持父子索引", "stepTwo.notAvailableForQA": "不支持 Q&A 索引", diff --git a/web/i18n/zh-Hans/workflow.json b/web/i18n/zh-Hans/workflow.json index 7787c9db4b..3b076014ff 100644 --- a/web/i18n/zh-Hans/workflow.json +++ b/web/i18n/zh-Hans/workflow.json @@ -447,6 +447,8 @@ "nodes.common.retry.times": "次", "nodes.common.typeSwitch.input": "输入值", "nodes.common.typeSwitch.variable": "使用变量", + "nodes.common.valueType.constant": "常量", + "nodes.common.valueType.variable": "变量", "nodes.dataSource.add": "添加数据源", "nodes.dataSource.supportedFileFormats": "支持的文件格式", "nodes.dataSource.supportedFileFormatsPlaceholder": "文件格式,例如:doc", From 8a31d522253517e4a6369cf0061d66bd08989f94 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 07:23:21 +0000 Subject: [PATCH 02/68] [autofix.ci] apply automated fixes --- .../knowledge_index/knowledge_index_node.py | 7 ++--- api/services/metadata_service.py | 23 +++++++------- .../test_knowledge_index_node.py | 31 +++++++------------ .../services/test_dataset_service_metadata.py | 25 ++++++--------- 4 files changed, 34 insertions(+), 52 deletions(-) diff --git a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py index ad227fa59a..6ad1bd1d3e 100644 --- a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py +++ b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py @@ -205,9 +205,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): # Resolve Name md_name = metadata_name_map.get(item.metadata_id) if not md_name: - logger.warning( - "[KnowledgeIndexNode] metadata_id %s not found, skipping", item.metadata_id - ) + logger.warning("[KnowledgeIndexNode] metadata_id %s not found, skipping", item.metadata_id) continue # Resolve Value @@ -298,6 +296,5 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): for item in node_data_obj.doc_metadata: if isinstance(item.value, list): variable_mapping[node_id + "." + item.metadata_id] = item.value - - return variable_mapping + return variable_mapping diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index 8538dac792..487628c87b 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -101,7 +101,7 @@ class MetadataService: def check_metadata_used_in_pipeline(dataset_id: str, metadata_id: str) -> tuple[bool, str | None]: """ Check if a metadata is used in the associated Pipeline's Knowledge Base node. - + Returns: tuple[bool, str | None]: (is_used, pipeline_name) - True if used, with pipeline name """ @@ -109,25 +109,24 @@ class MetadataService: dataset = db.session.query(Dataset).filter_by(id=dataset_id).first() if not dataset or not dataset.pipeline_id: return False, None - + # Get the draft workflow directly using pipeline_id as app_id - workflow = db.session.query(Workflow).filter_by( - app_id=dataset.pipeline_id, - version=Workflow.VERSION_DRAFT - ).first() + workflow = ( + db.session.query(Workflow).filter_by(app_id=dataset.pipeline_id, version=Workflow.VERSION_DRAFT).first() + ) if not workflow: return False, None - + # Get pipeline name from App if exists app = db.session.query(App).filter_by(id=dataset.pipeline_id).first() pipeline_name = app.name if app else "Pipeline" - + # Walk through nodes to find Knowledge Index node (type is "knowledge-index") try: graph_dict = workflow.graph_dict if "nodes" not in graph_dict: return False, None - + for node in graph_dict["nodes"]: node_data = node.get("data", {}) # Check if this is a knowledge-index node @@ -140,7 +139,7 @@ class MetadataService: except Exception: logger.exception("Error checking metadata usage in pipeline") return False, None - + return False, None @staticmethod @@ -151,7 +150,7 @@ class MetadataService: metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id).first() if metadata is None: raise ValueError("Metadata not found.") - + # Check if metadata is used in Pipeline before deletion is_used, pipeline_name = MetadataService.check_metadata_used_in_pipeline(dataset_id, metadata_id) if is_used: @@ -159,7 +158,7 @@ class MetadataService: f"Cannot delete metadata '{metadata.name}' because it is currently used in " f"Pipeline '{pipeline_name}'." ) - + db.session.delete(metadata) # deal related documents diff --git a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py index c7d514c243..486b015fbb 100644 --- a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py @@ -1,4 +1,3 @@ - import unittest import uuid from unittest.mock import MagicMock, patch @@ -19,18 +18,18 @@ class TestKnowledgeIndexNode(unittest.TestCase): self.mock_dataset = MagicMock(spec=Dataset) self.mock_dataset.id = self.dataset_id self.mock_dataset.built_in_field_enabled = False - + self.mock_document = MagicMock(spec=Document) self.mock_document.id = self.document_id self.mock_document.doc_metadata = {} - @patch('core.workflow.nodes.knowledge_index.knowledge_index_node.db.session') - @patch('core.workflow.nodes.knowledge_index.knowledge_index_node.IndexProcessorFactory') + @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.db.session") + @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.IndexProcessorFactory") def test_run_with_custom_metadata(self, mock_index_processor_factory, mock_db_session): # Mock DB queries mock_db_session.query.return_value.filter_by.return_value.first.side_effect = [ self.mock_dataset, # For dataset query - self.mock_document # For document query + self.mock_document, # For document query ] # Mock Dataset Metadata @@ -40,10 +39,10 @@ class TestKnowledgeIndexNode(unittest.TestCase): mock_db_session.scalars.return_value.all.return_value = [mock_metadata] # Simpler mock for the scalar query - switched to bulk fetch mock_db_session.scalar.return_value = "Category" - + # Mock Variable Pool pool = MagicMock(spec=VariablePool) - + # System variables pool.get.side_effect = lambda selector: { ("sys", SystemVariableKey.DATASET_ID): MagicMock(value=self.dataset_id), @@ -57,7 +56,7 @@ class TestKnowledgeIndexNode(unittest.TestCase): # Handle the chunk variable specifically first chunk_var_mock = MagicMock() chunk_var_mock.value = {"chunk": "data"} - + # Override side_effect to handle list lookups correctly def variable_pool_get(selector): if selector == ["sys", SystemVariableKey.DATASET_ID]: @@ -82,26 +81,21 @@ class TestKnowledgeIndexNode(unittest.TestCase): title="Knowledge", chunk_structure="chunk", index_chunk_variable_selector=["sys", "chunks"], - doc_metadata=[ - DocMetadata(metadata_id="meta_uuid_1", value=["Start", "category"]) - ] + doc_metadata=[DocMetadata(metadata_id="meta_uuid_1", value=["Start", "category"])], ) # Initialize Node graph_init_params = MagicMock() graph_init_params.user_from = UserFrom.ACCOUNT graph_init_params.invoke_from = InvokeFrom.WEB_APP - - config = { - "id": "node1", - "data": node_data.model_dump() - } - + + config = {"id": "node1", "data": node_data.model_dump()} + node = KnowledgeIndexNode( id="node1", graph_init_params=graph_init_params, graph_runtime_state=MagicMock(variable_pool=pool), - config=config + config=config, ) # Mock _invoke_knowledge_index to avoid calling specific index logic @@ -114,4 +108,3 @@ class TestKnowledgeIndexNode(unittest.TestCase): assert self.mock_document.doc_metadata["Category"] == "Financial" mock_db_session.add.assert_called_with(self.mock_document) mock_db_session.commit.assert_called() - diff --git a/api/tests/unit_tests/services/test_dataset_service_metadata.py b/api/tests/unit_tests/services/test_dataset_service_metadata.py index 98d12317a5..b1da9a55bd 100644 --- a/api/tests/unit_tests/services/test_dataset_service_metadata.py +++ b/api/tests/unit_tests/services/test_dataset_service_metadata.py @@ -31,7 +31,7 @@ class TestDocumentServiceMetadata: # Hack to pass isinstance check mock_current_user.__class__ = Account mock_current_user.current_tenant_id = "tenant-123" - + yield { "db": mock_db, "get_dataset": mock_get_dataset, @@ -57,24 +57,19 @@ class TestDocumentServiceMetadata: # Define metadata inputs metadata_id = str(uuid4()) - doc_metadata_inputs = [ - DocumentMetadataInput(metadata_id=metadata_id, value="custom_value") - ] + doc_metadata_inputs = [DocumentMetadataInput(metadata_id=metadata_id, value="custom_value")] # Knowledge config knowledge_config = KnowledgeConfig( data_source_type="upload_file", data_source=DataSource( - info_list=InfoList( - data_source_type="upload_file", - file_info_list=FileInfo(file_ids=["file-1"]) - ) + info_list=InfoList(data_source_type="upload_file", file_info_list=FileInfo(file_ids=["file-1"])) ), doc_form="text_model", doc_language="en", indexing_technique="high_quality", enable_built_in_metadata=True, - doc_metadata=doc_metadata_inputs + doc_metadata=doc_metadata_inputs, ) # Mock local file for upload_file type @@ -84,7 +79,7 @@ class TestDocumentServiceMetadata: mock_metadata_def.id = metadata_id mock_metadata_def.name = "custom_field" mock_metadata_def.field_type = "text" - + # Create a side effect for query(Model) def query_side_effect(model): m = Mock() @@ -106,9 +101,9 @@ class TestDocumentServiceMetadata: return m return m - + mock_query.side_effect = query_side_effect - + # Mock build_document to return a document mock_document = Mock(spec=Document) mock_document.id = "doc-123" @@ -117,15 +112,13 @@ class TestDocumentServiceMetadata: # Act DocumentService.save_document_with_dataset_id( - dataset=dataset, - knowledge_config=knowledge_config, - account=account + dataset=dataset, knowledge_config=knowledge_config, account=account ) # Assert # 1. Check built-in metadata enabled assert dataset.built_in_field_enabled is True - + # 2. Check custom metadata passed to build_document call_args = mock_dependencies["build_document"].call_args assert call_args is not None From eb1b8c50f71915e731c9817835d2cafb3da709c2 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Fri, 23 Jan 2026 16:29:43 +0800 Subject: [PATCH 03/68] =?UTF-8?q?=F0=9F=90=9B=20fix(api):=20fix=20failing?= =?UTF-8?q?=20metadata=20unit=20tests=20and=20enhance=20pipeline=20check?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix test_delete_metadata_not_found: use pytest.raises() since service raises ValueError instead of returning None - Fix test_run_with_custom_metadata: remove mock of _invoke_knowledge_index that bypassed metadata processing logic, add missing BATCH/ORIGINAL_DOCUMENT_ID variable mocks, remove redundant side_effect code - Fix test_save_document_with_dataset_id_ignores_lock_not_owned: add missing enable_built_in_metadata and doc_metadata attributes to knowledge_config - Fix test_save_document_with_metadata: add .filter().all() mock chain for DatasetMetadata query - Enhance check_metadata_used_in_pipeline to check both draft and current published workflows (via pipeline.workflow_id) to prevent deletion of metadata actively used in production Co-Authored-By: Claude --- api/services/metadata_service.py | 63 +++++++++++-------- .../services/test_metadata_service.py | 8 +-- .../test_knowledge_index_node.py | 22 ++----- .../test_dataset_service_lock_not_owned.py | 2 + .../services/test_dataset_service_metadata.py | 1 + 5 files changed, 49 insertions(+), 47 deletions(-) diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index 487628c87b..033960ffd5 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -1,13 +1,14 @@ import copy import logging +from sqlalchemy import or_ + from core.rag.index_processor.constant.built_in_field import BuiltInField, MetadataDataSource from extensions.ext_database import db from extensions.ext_redis import redis_client from libs.datetime_utils import naive_utc_now from libs.login import current_account_with_tenant -from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding -from models.model import App +from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding, Pipeline from models.workflow import Workflow from services.dataset_service import DocumentService from services.entities.knowledge_entities.knowledge_entities import ( @@ -102,6 +103,9 @@ class MetadataService: """ Check if a metadata is used in the associated Pipeline's Knowledge Base node. + Checks both draft and current published workflows to prevent deletion of metadata + that is actively used in production. + Returns: tuple[bool, str | None]: (is_used, pipeline_name) - True if used, with pipeline name """ @@ -110,36 +114,43 @@ class MetadataService: if not dataset or not dataset.pipeline_id: return False, None - # Get the draft workflow directly using pipeline_id as app_id - workflow = ( - db.session.query(Workflow).filter_by(app_id=dataset.pipeline_id, version=Workflow.VERSION_DRAFT).first() - ) - if not workflow: + # Get the pipeline to access workflow_id (current published version) + pipeline = db.session.query(Pipeline).filter_by(id=dataset.pipeline_id).first() + if not pipeline: return False, None - # Get pipeline name from App if exists - app = db.session.query(App).filter_by(id=dataset.pipeline_id).first() - pipeline_name = app.name if app else "Pipeline" + # Build conditions for draft and current published workflows only + workflow_conditions = [ + (Workflow.app_id == pipeline.id) & (Workflow.version == Workflow.VERSION_DRAFT) + ] + if pipeline.workflow_id: + workflow_conditions.append(Workflow.id == pipeline.workflow_id) - # Walk through nodes to find Knowledge Index node (type is "knowledge-index") - try: - graph_dict = workflow.graph_dict - if "nodes" not in graph_dict: - return False, None + workflows = db.session.query(Workflow).filter(or_(*workflow_conditions)).all() - for node in graph_dict["nodes"]: - node_data = node.get("data", {}) - # Check if this is a knowledge-index node - if node_data.get("type") == "knowledge-index": - doc_metadata = node_data.get("doc_metadata", []) - if doc_metadata: - for item in doc_metadata: - if item.get("metadata_id") == metadata_id: - return True, pipeline_name - except Exception: - logger.exception("Error checking metadata usage in pipeline") + if not workflows: return False, None + # Check each workflow for metadata usage + for workflow in workflows: + try: + graph_dict = workflow.graph_dict + if "nodes" not in graph_dict: + continue + + for node in graph_dict["nodes"]: + node_data = node.get("data", {}) + # Check if this is a knowledge-index node + if node_data.get("type") == "knowledge-index": + doc_metadata = node_data.get("doc_metadata", []) + if doc_metadata: + for item in doc_metadata: + if item.get("metadata_id") == metadata_id: + return True, pipeline.name + except Exception: + logger.exception("Error checking metadata usage in pipeline workflow %s", workflow.id) + continue + return False, None @staticmethod diff --git a/api/tests/test_containers_integration_tests/services/test_metadata_service.py b/api/tests/test_containers_integration_tests/services/test_metadata_service.py index c8ced3f3a5..130cd759e7 100644 --- a/api/tests/test_containers_integration_tests/services/test_metadata_service.py +++ b/api/tests/test_containers_integration_tests/services/test_metadata_service.py @@ -460,11 +460,9 @@ class TestMetadataService: fake_metadata_id = str(uuid.uuid4()) # Use valid UUID format - # Act: Execute the method under test - result = MetadataService.delete_metadata(dataset.id, fake_metadata_id) - - # Assert: Verify the method returns None when metadata is not found - assert result is None + # Act & Assert: Verify the method raises ValueError when metadata is not found + with pytest.raises(ValueError, match="Metadata not found."): + MetadataService.delete_metadata(dataset.id, fake_metadata_id) def test_delete_metadata_with_document_bindings( self, db_session_with_containers, mock_external_service_dependencies diff --git a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py index 486b015fbb..174116317d 100644 --- a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py @@ -43,33 +43,26 @@ class TestKnowledgeIndexNode(unittest.TestCase): # Mock Variable Pool pool = MagicMock(spec=VariablePool) - # System variables - pool.get.side_effect = lambda selector: { - ("sys", SystemVariableKey.DATASET_ID): MagicMock(value=self.dataset_id), - ("sys", SystemVariableKey.DOCUMENT_ID): MagicMock(value=self.document_id), - ("sys", SystemVariableKey.INVOKE_FROM): None, - ("Start", "category"): MagicMock(to_object=lambda: "Financial"), - # handle list as key? get takes list - frozenset(["Start", "category"]): MagicMock(to_object=lambda: "Financial"), - }.get(tuple(selector) if isinstance(selector, list) else selector) - - # Handle the chunk variable specifically first + # Handle the chunk variable chunk_var_mock = MagicMock() chunk_var_mock.value = {"chunk": "data"} - # Override side_effect to handle list lookups correctly def variable_pool_get(selector): if selector == ["sys", SystemVariableKey.DATASET_ID]: return MagicMock(value=self.dataset_id) if selector == ["sys", SystemVariableKey.DOCUMENT_ID]: return MagicMock(value=self.document_id) + if selector == ["sys", SystemVariableKey.BATCH]: + return MagicMock(value="test-batch") + if selector == ["sys", SystemVariableKey.ORIGINAL_DOCUMENT_ID]: + return None if selector == ["Start", "category"]: var = MagicMock() var.to_object.return_value = "Financial" return var if selector == ["sys", SystemVariableKey.INVOKE_FROM]: return None - if selector == ["sys", "chunks"]: # whatever index_chunk_variable_selector is + if selector == ["sys", "chunks"]: return chunk_var_mock return None @@ -98,9 +91,6 @@ class TestKnowledgeIndexNode(unittest.TestCase): config=config, ) - # Mock _invoke_knowledge_index to avoid calling specific index logic - node._invoke_knowledge_index = MagicMock() - # Execute result = node._run() diff --git a/api/tests/unit_tests/services/test_dataset_service_lock_not_owned.py b/api/tests/unit_tests/services/test_dataset_service_lock_not_owned.py index bd226f7536..136a0af8ff 100644 --- a/api/tests/unit_tests/services/test_dataset_service_lock_not_owned.py +++ b/api/tests/unit_tests/services/test_dataset_service_lock_not_owned.py @@ -86,6 +86,8 @@ def test_save_document_with_dataset_id_ignores_lock_not_owned( process_rule=None, duplicate=False, doc_language="en", + enable_built_in_metadata=False, + doc_metadata=None, ) account = fake_current_user diff --git a/api/tests/unit_tests/services/test_dataset_service_metadata.py b/api/tests/unit_tests/services/test_dataset_service_metadata.py index b1da9a55bd..a3de25ed30 100644 --- a/api/tests/unit_tests/services/test_dataset_service_metadata.py +++ b/api/tests/unit_tests/services/test_dataset_service_metadata.py @@ -87,6 +87,7 @@ class TestDocumentServiceMetadata: m.filter.return_value.filter.return_value.first.return_value = mock_metadata_def # handle the specific chain in code m.filter_by.return_value.first.return_value = mock_metadata_def + m.filter.return_value.all.return_value = [mock_metadata_def] return m if model == Document: doc_mock = Mock() From 7997ef22af5ae0848c338061a026a4e63f3a3878 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Fri, 23 Jan 2026 16:39:16 +0800 Subject: [PATCH 04/68] =?UTF-8?q?=F0=9F=8E=A8=20style(web):=20add=20border?= =?UTF-8?q?=20and=20background=20to=20metadata=20value=20input?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add proper input styling (border, rounded corners, background) to the metadata value container in Knowledge Base node for visual consistency with other form inputs. Co-Authored-By: Claude --- .../nodes/knowledge-base/components/metadata-section.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx index ef5fbb99a6..3ed4076fa8 100644 --- a/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx +++ b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx @@ -217,7 +217,7 @@ const MetadataSection: FC = ({ )}
-
+
Date: Fri, 23 Jan 2026 08:44:39 +0000 Subject: [PATCH 05/68] [autofix.ci] apply automated fixes --- api/services/metadata_service.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index 033960ffd5..90436f6d46 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -120,13 +120,11 @@ class MetadataService: return False, None # Build conditions for draft and current published workflows only - workflow_conditions = [ - (Workflow.app_id == pipeline.id) & (Workflow.version == Workflow.VERSION_DRAFT) - ] + workflow_conditions = [(Workflow.app_id == pipeline.id) & (Workflow.version == Workflow.VERSION_DRAFT)] if pipeline.workflow_id: workflow_conditions.append(Workflow.id == pipeline.workflow_id) - workflows = db.session.query(Workflow).filter(or_(*workflow_conditions)).all() + workflows = db.session.query(Workflow).where(or_(*workflow_conditions)).all() if not workflows: return False, None From 15777a6172ecb9029557bc5985b825c170a458e8 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Fri, 23 Jan 2026 16:57:27 +0800 Subject: [PATCH 06/68] =?UTF-8?q?=F0=9F=90=9B=20fix(api):=20fix=20type=20e?= =?UTF-8?q?rror=20in=20check=5Fmetadata=5Fused=5Fin=5Fpipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rewrite workflow query conditions to avoid type checking error with list.append() having inconsistent types. Co-Authored-By: Claude --- api/services/metadata_service.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index 90436f6d46..b04d98d281 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -120,11 +120,21 @@ class MetadataService: return False, None # Build conditions for draft and current published workflows only - workflow_conditions = [(Workflow.app_id == pipeline.id) & (Workflow.version == Workflow.VERSION_DRAFT)] - if pipeline.workflow_id: - workflow_conditions.append(Workflow.id == pipeline.workflow_id) + draft_condition = (Workflow.app_id == pipeline.id) & (Workflow.version == Workflow.VERSION_DRAFT) - workflows = db.session.query(Workflow).where(or_(*workflow_conditions)).all() + if pipeline.workflow_id: + workflows = ( + db.session.query(Workflow) + .filter( + or_( + draft_condition, + Workflow.id == pipeline.workflow_id, + ) + ) + .all() + ) + else: + workflows = db.session.query(Workflow).filter(draft_condition).all() if not workflows: return False, None From cf198efa3469be650328ad3fde81c5aa63816d9d Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 09:03:18 +0000 Subject: [PATCH 07/68] [autofix.ci] apply automated fixes --- api/services/metadata_service.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index b04d98d281..586ec27243 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -124,17 +124,14 @@ class MetadataService: if pipeline.workflow_id: workflows = ( - db.session.query(Workflow) - .filter( - or_( - draft_condition, - Workflow.id == pipeline.workflow_id, - ) - ) + db.session.query(Workflow).where(or_( + draft_condition, + Workflow.id == pipeline.workflow_id, + )) .all() ) else: - workflows = db.session.query(Workflow).filter(draft_condition).all() + workflows = db.session.query(Workflow).where(draft_condition).all() if not workflows: return False, None From f566da58f7cf23135d50636570379558ba46acda Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 09:07:12 +0000 Subject: [PATCH 08/68] [autofix.ci] apply automated fixes (attempt 2/3) --- api/services/metadata_service.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index 586ec27243..beb47f0104 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -124,10 +124,13 @@ class MetadataService: if pipeline.workflow_id: workflows = ( - db.session.query(Workflow).where(or_( - draft_condition, - Workflow.id == pipeline.workflow_id, - )) + db.session.query(Workflow) + .where( + or_( + draft_condition, + Workflow.id == pipeline.workflow_id, + ) + ) .all() ) else: From 1df04fb16f1ed43d47cb8655907175fb12114a0b Mon Sep 17 00:00:00 2001 From: GuanMu Date: Fri, 23 Jan 2026 20:10:31 +0800 Subject: [PATCH 09/68] =?UTF-8?q?=E2=9C=85=20test(api):=20fix=20mock=20set?= =?UTF-8?q?up=20in=20knowledge=5Findex=5Fnode=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mock attributes.flag_modified to avoid SQLAlchemy internal state requirements. Update assertion to verify flag_modified call instead of db.session.add call. --- .../knowledge_index/test_knowledge_index_node.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py index 174116317d..9743a7694d 100644 --- a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py @@ -7,7 +7,7 @@ from core.workflow.enums import SystemVariableKey from core.workflow.nodes.knowledge_index.entities import DocMetadata, KnowledgeIndexNodeData from core.workflow.nodes.knowledge_index.knowledge_index_node import KnowledgeIndexNode from core.workflow.runtime import VariablePool -from models.dataset import Dataset, DatasetMetadata, Document +from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding, Document from models.enums import UserFrom @@ -23,9 +23,12 @@ class TestKnowledgeIndexNode(unittest.TestCase): self.mock_document.id = self.document_id self.mock_document.doc_metadata = {} + @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.attributes.flag_modified") @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.db.session") @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.IndexProcessorFactory") - def test_run_with_custom_metadata(self, mock_index_processor_factory, mock_db_session): + def test_run_with_custom_metadata( + self, mock_index_processor_factory, mock_db_session, mock_flag_modified + ): # Mock DB queries mock_db_session.query.return_value.filter_by.return_value.first.side_effect = [ self.mock_dataset, # For dataset query @@ -94,7 +97,9 @@ class TestKnowledgeIndexNode(unittest.TestCase): # Execute result = node._run() - # Verify + # Verify metadata was set on document assert self.mock_document.doc_metadata["Category"] == "Financial" - mock_db_session.add.assert_called_with(self.mock_document) + # Verify flag_modified was called for the doc_metadata field + mock_flag_modified.assert_called_with(self.mock_document, "doc_metadata") + # Verify commit was called mock_db_session.commit.assert_called() From 7759ab08e951f5837d562e4397de6b1a5a2b40a5 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Fri, 23 Jan 2026 20:10:57 +0800 Subject: [PATCH 10/68] =?UTF-8?q?=F0=9F=8E=A8=20test(api):=20remove=20unus?= =?UTF-8?q?ed=20import?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../workflow/nodes/knowledge_index/test_knowledge_index_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py index 9743a7694d..0ffd5dd41c 100644 --- a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py @@ -7,7 +7,7 @@ from core.workflow.enums import SystemVariableKey from core.workflow.nodes.knowledge_index.entities import DocMetadata, KnowledgeIndexNodeData from core.workflow.nodes.knowledge_index.knowledge_index_node import KnowledgeIndexNode from core.workflow.runtime import VariablePool -from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding, Document +from models.dataset import Dataset, DatasetMetadata, Document from models.enums import UserFrom From 96b7f2c2916b5b3d78d948d969745d57b6034f5c Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 12:15:51 +0000 Subject: [PATCH 11/68] [autofix.ci] apply automated fixes --- .../nodes/knowledge_index/test_knowledge_index_node.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py index 0ffd5dd41c..3d6da66a0c 100644 --- a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py @@ -26,9 +26,7 @@ class TestKnowledgeIndexNode(unittest.TestCase): @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.attributes.flag_modified") @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.db.session") @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.IndexProcessorFactory") - def test_run_with_custom_metadata( - self, mock_index_processor_factory, mock_db_session, mock_flag_modified - ): + def test_run_with_custom_metadata(self, mock_index_processor_factory, mock_db_session, mock_flag_modified): # Mock DB queries mock_db_session.query.return_value.filter_by.return_value.first.side_effect = [ self.mock_dataset, # For dataset query From eba2f06bd443ea82597662993052466b830007fd Mon Sep 17 00:00:00 2001 From: GuanMu Date: Mon, 2 Feb 2026 11:07:26 +0800 Subject: [PATCH 12/68] refactor: format dataset model imports for readability --- .../nodes/knowledge_index/knowledge_index_node.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py index b68f5cca50..28d0aaf4e0 100644 --- a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py +++ b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py @@ -19,7 +19,14 @@ from core.workflow.nodes.base.node import Node from core.workflow.nodes.base.template import Template from core.workflow.runtime import VariablePool from extensions.ext_database import db -from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding, Document, DocumentSegment, DocumentSegmentSummary +from models.dataset import ( + Dataset, + DatasetMetadata, + DatasetMetadataBinding, + Document, + DocumentSegment, + DocumentSegmentSummary, +) from services.summary_index_service import SummaryIndexService from tasks.generate_summary_index_task import generate_summary_index_task From 26248e3d80b8e4b211c353f4f94f0ffc8f78d838 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Mon, 2 Feb 2026 14:54:46 +0800 Subject: [PATCH 13/68] style: remove fixed font size from date picker text --- web/app/components/datasets/metadata/base/date-picker.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/app/components/datasets/metadata/base/date-picker.tsx b/web/app/components/datasets/metadata/base/date-picker.tsx index 6839af6232..2f61549859 100644 --- a/web/app/components/datasets/metadata/base/date-picker.tsx +++ b/web/app/components/datasets/metadata/base/date-picker.tsx @@ -38,7 +38,7 @@ const WrappedDatePicker = ({
From a4cd1fb4c41bf73f20ee30b643f1e976b186dff2 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Mon, 2 Feb 2026 16:11:30 +0800 Subject: [PATCH 14/68] =?UTF-8?q?=F0=9F=8E=A8=20style:=20fix=20metadata=20?= =?UTF-8?q?date=20picker=20styling=20and=20alignment?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add system-xs-regular and truncate to date-picker for consistent font size - Fix value container alignment with w-0 grow overflow-hidden pattern - Change delete button from × to trash icon with destructive hover - Align condition-date layout with other condition components Co-Authored-By: Claude Opus 4.5 --- .../datasets/metadata/base/date-picker.tsx | 2 +- .../edit-metadata-batch/input-combined.tsx | 6 +++--- .../knowledge-base/components/metadata-section.tsx | 14 +++++++------- .../metadata/condition-list/condition-date.tsx | 6 +++--- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/web/app/components/datasets/metadata/base/date-picker.tsx b/web/app/components/datasets/metadata/base/date-picker.tsx index 2f61549859..3c99323599 100644 --- a/web/app/components/datasets/metadata/base/date-picker.tsx +++ b/web/app/components/datasets/metadata/base/date-picker.tsx @@ -38,7 +38,7 @@ const WrappedDatePicker = ({
diff --git a/web/app/components/datasets/metadata/edit-metadata-batch/input-combined.tsx b/web/app/components/datasets/metadata/edit-metadata-batch/input-combined.tsx index aec74bcfef..4b34f4772a 100644 --- a/web/app/components/datasets/metadata/edit-metadata-batch/input-combined.tsx +++ b/web/app/components/datasets/metadata/edit-metadata-batch/input-combined.tsx @@ -22,7 +22,7 @@ const InputCombined: FC = ({ onChange, readOnly, }) => { - const className = cn('h-6 grow p-0.5 text-xs') + const className = cn('h-6 grow p-0.5') if (type === DataType.time) { return ( = ({ return (
= ({ return ( onChange(e.target.value)} readOnly={readOnly} diff --git a/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx index 3ed4076fa8..4867ba48e3 100644 --- a/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx +++ b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx @@ -3,7 +3,7 @@ import type { FC } from 'react' import type { DocMetadataItem } from '../types' import type { BuiltInMetadataItem, MetadataItemWithValueLength } from '@/app/components/datasets/metadata/types' import type { ValueSelector, Var } from '@/app/components/workflow/types' -import { RiAddLine, RiCloseLine, RiDraftLine, RiEditLine } from '@remixicon/react' +import { RiAddLine, RiDeleteBinLine, RiDraftLine, RiEditLine } from '@remixicon/react' import { useCallback, useState } from 'react' import { useTranslation } from 'react-i18next' import Button from '@/app/components/base/button' @@ -241,7 +241,7 @@ const MetadataSection: FC = ({
-
+
{isVariable ? ( = ({ /> ) : ( -
+
{(() => { const metadataType = getMetadataType(item.metadata_id) @@ -267,7 +267,7 @@ const MetadataSection: FC = ({ if (metadataType === DataType.time) { return ( handleDocMetadataValueChange(index, value || 0)} /> @@ -278,7 +278,7 @@ const MetadataSection: FC = ({ if (metadataType === DataType.number) { return ( handleDocMetadataValueChange(index, value)} readOnly={readonly} @@ -308,9 +308,9 @@ const MetadataSection: FC = ({ )}
diff --git a/web/app/components/workflow/nodes/knowledge-retrieval/components/metadata/condition-list/condition-date.tsx b/web/app/components/workflow/nodes/knowledge-retrieval/components/metadata/condition-list/condition-date.tsx index b34093b7b0..2511571d78 100644 --- a/web/app/components/workflow/nodes/knowledge-retrieval/components/metadata/condition-list/condition-date.tsx +++ b/web/app/components/workflow/nodes/knowledge-retrieval/components/metadata/condition-list/condition-date.tsx @@ -32,10 +32,10 @@ const ConditionDate = ({ handleClickTrigger, }: TriggerProps) => { return ( -
+
@@ -71,7 +71,7 @@ const ConditionDate = ({ }, [value, handleDateChange, timezone, t]) return ( -
+
Date: Thu, 5 Feb 2026 08:22:31 +0000 Subject: [PATCH 15/68] [autofix.ci] apply automated fixes --- web/app/components/workflow/nodes/knowledge-base/panel.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/app/components/workflow/nodes/knowledge-base/panel.tsx b/web/app/components/workflow/nodes/knowledge-base/panel.tsx index 3e0cad6c3b..bf2c3d2ad9 100644 --- a/web/app/components/workflow/nodes/knowledge-base/panel.tsx +++ b/web/app/components/workflow/nodes/knowledge-base/panel.tsx @@ -18,9 +18,9 @@ import { Group, } from '@/app/components/workflow/nodes/_base/components/layout' import VarReferencePicker from '@/app/components/workflow/nodes/_base/components/variable/var-reference-picker' +import { IS_CE_EDITION } from '@/config' import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail' import { useDatasetMetaData } from '@/service/knowledge/use-metadata' -import { IS_CE_EDITION } from '@/config' import Split from '../_base/components/split' import ChunkStructure from './components/chunk-structure' import EmbeddingModel from './components/embedding-model' From c30bd6ebcf87a0edc6e79d49cd03608f6c189111 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Mon, 9 Feb 2026 11:27:22 +0800 Subject: [PATCH 16/68] feat: Enhance document metadata management by introducing `DatasetMetadataBinding` for new and duplicate documents and improving frontend metadata input handling. --- .../nodes/knowledge_index/entities.py | 2 +- .../knowledge_index/knowledge_index_node.py | 190 ++++++++++-------- api/services/dataset_service.py | 56 ++++-- api/tasks/document_indexing_update_task.py | 5 +- .../test_knowledge_index_node.py | 66 ++++++ .../services/test_dataset_service_metadata.py | 105 +++++++++- .../test_document_indexing_update_task.py | 44 ++++ .../components/metadata-section.tsx | 11 +- .../workflow/nodes/knowledge-base/types.ts | 2 +- 9 files changed, 372 insertions(+), 109 deletions(-) create mode 100644 api/tests/unit_tests/tasks/test_document_indexing_update_task.py diff --git a/api/core/workflow/nodes/knowledge_index/entities.py b/api/core/workflow/nodes/knowledge_index/entities.py index 72b7a29985..8b823bdbee 100644 --- a/api/core/workflow/nodes/knowledge_index/entities.py +++ b/api/core/workflow/nodes/knowledge_index/entities.py @@ -156,7 +156,7 @@ class DocMetadata(BaseModel): """ metadata_id: str - value: str | int | float | list[str] + value: str | int | float | list[str] | None class KnowledgeIndexNodeData(BaseNodeData): diff --git a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py index 28d0aaf4e0..178cf6d219 100644 --- a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py +++ b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py @@ -30,7 +30,7 @@ from models.dataset import ( from services.summary_index_service import SummaryIndexService from tasks.generate_summary_index_task import generate_summary_index_task -from .entities import KnowledgeIndexNodeData +from .entities import DocMetadata, KnowledgeIndexNodeData from .exc import ( KnowledgeIndexNodeError, ) @@ -155,6 +155,17 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): dataset_name_value = dataset.name document_name_value = document.name created_at_value = document.created_at + + # Resolve metadata selectors before any indexing side effects. + resolved_doc_metadata: dict[str, Any] = {} + metadata_binding_ids: list[str] = [] + if node_data.doc_metadata: + resolved_doc_metadata, metadata_binding_ids = self._resolve_doc_metadata_values( + dataset=dataset, + doc_metadata_items=node_data.doc_metadata, + variable_pool=variable_pool, + ) + # chunk nodes by chunk size indexing_start_at = time.perf_counter() index_processor = IndexProcessorFactory(dataset.chunk_structure).init_index_processor() @@ -202,86 +213,13 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): DocumentSegment.completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None), } ) - - # Process doc_metadata before commit to ensure it's saved with the same document object - if node_data.doc_metadata: - try: - # Fetch metadata definitions for name mapping - metadata_name_map: dict[str, str] = {} - dataset_metadatas = db.session.scalars( - select(DatasetMetadata).where(DatasetMetadata.dataset_id == dataset.id) - ).all() - for md in dataset_metadatas: - metadata_name_map[md.id] = md.name - - # Collect valid metadata IDs (excluding built-in) - valid_metadata_ids = [ - item.metadata_id - for item in node_data.doc_metadata - if item.metadata_id != BUILT_IN_METADATA_ID and item.metadata_id in metadata_name_map - ] - - # Batch fetch existing bindings to avoid N+1 query - existing_binding_ids: set[str] = set() - if valid_metadata_ids: - existing_bindings = db.session.scalars( - select(DatasetMetadataBinding.metadata_id).where( - DatasetMetadataBinding.dataset_id == dataset.id, - DatasetMetadataBinding.document_id == doc_id_value, - DatasetMetadataBinding.metadata_id.in_(valid_metadata_ids), - ) - ).all() - existing_binding_ids = set(existing_bindings) - - doc_metadata_dict = document.doc_metadata or {} - - for item in node_data.doc_metadata: - # Skip built-in fields - if item.metadata_id == BUILT_IN_METADATA_ID: - continue - - # Resolve Name - md_name = metadata_name_map.get(item.metadata_id) - if not md_name: - logger.warning("[KnowledgeIndexNode] metadata_id %s not found, skipping", item.metadata_id) - continue - - # Resolve Value - value = item.value - if isinstance(value, list): - var_obj = variable_pool.get(value) - if var_obj: - value = var_obj.to_object() - else: - # Variable not found - raise error to notify user of configuration issue - variable_path = ".".join(value) - raise KnowledgeIndexNodeError( - f"Variable '{variable_path}' not found for metadata '{md_name}'. " - f"Please check your variable configuration." - ) - - if value is not None: - doc_metadata_dict[md_name] = value - - # Create DatasetMetadataBinding if not exists - if item.metadata_id not in existing_binding_ids: - binding = DatasetMetadataBinding( - tenant_id=dataset.tenant_id, - dataset_id=dataset.id, - metadata_id=item.metadata_id, - document_id=doc_id_value, - created_by=self.user_id, - ) - db.session.add(binding) - existing_binding_ids.add(item.metadata_id) # Prevent duplicate in same batch - - document.doc_metadata = doc_metadata_dict - # Force SQLAlchemy to recognize the change to the JSON field - attributes.flag_modified(document, "doc_metadata") - - except Exception as e: - logger.exception("[KnowledgeIndexNode] Failed to process doc_metadata") - raise KnowledgeIndexNodeError(f"Failed to process document metadata: {e}") from e + if resolved_doc_metadata or metadata_binding_ids: + self._save_doc_metadata_and_bindings( + dataset=dataset, + document=document, + doc_metadata=resolved_doc_metadata, + metadata_binding_ids=metadata_binding_ids, + ) db.session.commit() @@ -298,6 +236,96 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): "display_status": "completed", } + def _resolve_doc_metadata_values( + self, + *, + dataset: Dataset, + doc_metadata_items: Sequence[DocMetadata], + variable_pool: VariablePool, + ) -> tuple[dict[str, Any], list[str]]: + """ + Resolve node-level metadata values before indexing starts. + + This pre-validation prevents partial index writes when metadata variable selectors are invalid. + """ + metadata_name_map: dict[str, str] = {} + dataset_metadatas = db.session.scalars( + select(DatasetMetadata).where(DatasetMetadata.dataset_id == dataset.id) + ).all() + for metadata in dataset_metadatas: + metadata_name_map[metadata.id] = metadata.name + + resolved_metadata: dict[str, Any] = {} + metadata_binding_ids: list[str] = [] + for item in doc_metadata_items: + if item.metadata_id == BUILT_IN_METADATA_ID: + continue + + metadata_name = metadata_name_map.get(item.metadata_id) + if not metadata_name: + logger.warning("[KnowledgeIndexNode] metadata_id %s not found, skipping", item.metadata_id) + continue + + value = item.value + if isinstance(value, list): + variable = variable_pool.get(value) + if not variable: + variable_path = ".".join(value) + raise KnowledgeIndexNodeError( + f"Variable '{variable_path}' not found for metadata '{metadata_name}'. " + f"Please check your variable configuration." + ) + value = variable.to_object() + + if value is not None: + resolved_metadata[metadata_name] = value + + metadata_binding_ids.append(item.metadata_id) + + return resolved_metadata, metadata_binding_ids + + def _save_doc_metadata_and_bindings( + self, + *, + dataset: Dataset, + document: Document, + doc_metadata: Mapping[str, Any], + metadata_binding_ids: Sequence[str], + ) -> None: + """ + Persist resolved metadata values and ensure metadata bindings exist for the document. + """ + unique_metadata_ids = list(dict.fromkeys(metadata_binding_ids)) + existing_binding_ids: set[str] = set() + if unique_metadata_ids: + existing_bindings = db.session.scalars( + select(DatasetMetadataBinding.metadata_id).where( + DatasetMetadataBinding.dataset_id == dataset.id, + DatasetMetadataBinding.document_id == document.id, + DatasetMetadataBinding.metadata_id.in_(unique_metadata_ids), + ) + ).all() + existing_binding_ids = set(existing_bindings) + + if doc_metadata: + document_doc_metadata = document.doc_metadata or {} + document_doc_metadata.update(doc_metadata) + document.doc_metadata = document_doc_metadata + attributes.flag_modified(document, "doc_metadata") + + for metadata_id in unique_metadata_ids: + if metadata_id in existing_binding_ids: + continue + + binding = DatasetMetadataBinding( + tenant_id=dataset.tenant_id, + dataset_id=dataset.id, + metadata_id=metadata_id, + document_id=document.id, + created_by=self.user_id, + ) + db.session.add(binding) + def _handle_summary_index_generation( self, dataset: Dataset, diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 4b3181f240..92c3bee7e4 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -1917,7 +1917,7 @@ class DocumentService: # 2. Process custom metadata - validate and build dict custom_metadata: dict = {} - metadata_bindings_to_create: list[tuple[str, str]] = [] # (metadata_id, metadata_name) + metadata_bindings_to_create: list[str] = [] if knowledge_config.doc_metadata: # Batch fetch all metadata definitions to avoid N+1 query metadata_ids = [item.metadata_id for item in knowledge_config.doc_metadata] @@ -1937,7 +1937,7 @@ class DocumentService: if not metadata_def: raise ValueError(f"Metadata with id '{item.metadata_id}' not found in this dataset") custom_metadata[metadata_def.name] = item.value - metadata_bindings_to_create.append((item.metadata_id, metadata_def.name)) + metadata_bindings_to_create.append(item.metadata_id) documents = [] if knowledge_config.original_document_id: @@ -2044,6 +2044,14 @@ class DocumentService: document.data_source_info = json.dumps(data_source_info) document.batch = batch document.indexing_status = "waiting" + if custom_metadata: + doc_metadata = ( + copy.deepcopy(document.doc_metadata) + if document.doc_metadata + else {} + ) + doc_metadata.update(custom_metadata) + document.doc_metadata = doc_metadata db.session.add(document) documents.append(document) duplicate_document_ids.append(document.id) @@ -2164,19 +2172,39 @@ class DocumentService: position += 1 db.session.commit() - # Create DatasetMetadataBinding records for custom metadata - if metadata_bindings_to_create and document_ids: - for doc_id in document_ids: - for metadata_id, _ in metadata_bindings_to_create: - binding = DatasetMetadataBinding( - tenant_id=dataset.tenant_id, - dataset_id=dataset.id, - document_id=doc_id, - metadata_id=metadata_id, - created_by=account.id, + # Create DatasetMetadataBinding records for custom metadata. + # Bindings should exist for both newly created and duplicate-reused documents. + if metadata_bindings_to_create: + target_document_ids = list(set(document_ids + duplicate_document_ids)) + metadata_ids = list(dict.fromkeys(metadata_bindings_to_create)) + if target_document_ids and metadata_ids: + existing_binding_pairs = { + (document_id, metadata_id) + for document_id, metadata_id in db.session.query( + DatasetMetadataBinding.document_id, + DatasetMetadataBinding.metadata_id, ) - db.session.add(binding) - db.session.commit() + .filter( + DatasetMetadataBinding.dataset_id == dataset.id, + DatasetMetadataBinding.document_id.in_(target_document_ids), + DatasetMetadataBinding.metadata_id.in_(metadata_ids), + ) + .all() + } + + for doc_id in target_document_ids: + for metadata_id in metadata_ids: + if (doc_id, metadata_id) in existing_binding_pairs: + continue + binding = DatasetMetadataBinding( + tenant_id=dataset.tenant_id, + dataset_id=dataset.id, + document_id=doc_id, + metadata_id=metadata_id, + created_by=account.id, + ) + db.session.add(binding) + db.session.commit() # trigger async task if document_ids: diff --git a/api/tasks/document_indexing_update_task.py b/api/tasks/document_indexing_update_task.py index c7508c6d05..01eb7f2fb0 100644 --- a/api/tasks/document_indexing_update_task.py +++ b/api/tasks/document_indexing_update_task.py @@ -26,15 +26,18 @@ def document_indexing_update_task(dataset_id: str, document_id: str): logger.info(click.style(f"Start update document: {document_id}", fg="green")) start_at = time.perf_counter() - with session_factory.create_session() as session, session.begin(): + with session_factory.create_session() as session: document = session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first() if not document: logger.info(click.style(f"Document not found: {document_id}", fg="red")) return + # Commit status update before invoking IndexingRunner, which uses another session. + # This prevents cross-session row lock contention on the same document record. document.indexing_status = "parsing" document.processing_started_at = naive_utc_now() + session.commit() dataset = session.query(Dataset).where(Dataset.id == dataset_id).first() if not dataset: diff --git a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py index 3d6da66a0c..025aefaddb 100644 --- a/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/knowledge_index/test_knowledge_index_node.py @@ -3,6 +3,7 @@ import uuid from unittest.mock import MagicMock, patch from core.app.entities.app_invoke_entities import InvokeFrom +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import SystemVariableKey from core.workflow.nodes.knowledge_index.entities import DocMetadata, KnowledgeIndexNodeData from core.workflow.nodes.knowledge_index.knowledge_index_node import KnowledgeIndexNode @@ -101,3 +102,68 @@ class TestKnowledgeIndexNode(unittest.TestCase): mock_flag_modified.assert_called_with(self.mock_document, "doc_metadata") # Verify commit was called mock_db_session.commit.assert_called() + + @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.db.session") + @patch("core.workflow.nodes.knowledge_index.knowledge_index_node.IndexProcessorFactory") + def test_run_with_missing_metadata_variable_fails_before_indexing( + self, mock_index_processor_factory, mock_db_session + ): + mock_db_session.query.return_value.filter_by.return_value.first.side_effect = [ + self.mock_dataset, + self.mock_document, + ] + + mock_metadata = MagicMock(spec=DatasetMetadata) + mock_metadata.id = "meta_uuid_1" + mock_metadata.name = "Category" + mock_db_session.scalars.return_value.all.return_value = [mock_metadata] + + pool = MagicMock(spec=VariablePool) + chunk_var_mock = MagicMock() + chunk_var_mock.value = {"chunk": "data"} + + def variable_pool_get(selector): + if selector == ["sys", SystemVariableKey.DATASET_ID]: + return MagicMock(value=self.dataset_id) + if selector == ["sys", SystemVariableKey.DOCUMENT_ID]: + return MagicMock(value=self.document_id) + if selector == ["sys", SystemVariableKey.BATCH]: + return MagicMock(value="test-batch") + if selector == ["sys", SystemVariableKey.ORIGINAL_DOCUMENT_ID]: + return None + if selector == ["Start", "missing"]: + return None + if selector == ["sys", SystemVariableKey.INVOKE_FROM]: + return None + if selector == ["sys", "chunks"]: + return chunk_var_mock + return None + + pool.get.side_effect = variable_pool_get + + node_data = KnowledgeIndexNodeData( + id="node1", + title="Knowledge", + chunk_structure="chunk", + index_chunk_variable_selector=["sys", "chunks"], + doc_metadata=[DocMetadata(metadata_id="meta_uuid_1", value=["Start", "missing"])], + ) + + graph_init_params = MagicMock() + graph_init_params.user_from = UserFrom.ACCOUNT + graph_init_params.invoke_from = InvokeFrom.WEB_APP + + config = {"id": "node1", "data": node_data.model_dump()} + node = KnowledgeIndexNode( + id="node1", + graph_init_params=graph_init_params, + graph_runtime_state=MagicMock(variable_pool=pool), + config=config, + ) + + result = node._run() + + assert result.status == WorkflowNodeExecutionStatus.FAILED + assert result.error + assert "Variable 'Start.missing' not found" in result.error + mock_index_processor_factory.return_value.init_index_processor.assert_not_called() diff --git a/api/tests/unit_tests/services/test_dataset_service_metadata.py b/api/tests/unit_tests/services/test_dataset_service_metadata.py index a3de25ed30..3c01d49a82 100644 --- a/api/tests/unit_tests/services/test_dataset_service_metadata.py +++ b/api/tests/unit_tests/services/test_dataset_service_metadata.py @@ -4,7 +4,7 @@ from uuid import uuid4 import pytest from models.account import Account -from models.dataset import Dataset, DatasetMetadata, Document +from models.dataset import Dataset, DatasetMetadata, DatasetMetadataBinding, Document from models.model import UploadFile from services.dataset_service import DocumentService from services.entities.knowledge_entities.knowledge_entities import ( @@ -26,6 +26,7 @@ class TestDocumentServiceMetadata: patch("services.dataset_service.DocumentService.build_document") as mock_build_document, patch("services.dataset_service.current_user") as mock_current_user, patch("services.dataset_service.DocumentIndexingTaskProxy") as mock_indexing_task, + patch("services.dataset_service.DuplicateDocumentIndexingTaskProxy") as mock_duplicate_indexing_task, # We don't patch DocumentService.save_document_with_dataset_id as that's what we are testing ): # Hack to pass isinstance check @@ -81,15 +82,15 @@ class TestDocumentServiceMetadata: mock_metadata_def.field_type = "text" # Create a side effect for query(Model) - def query_side_effect(model): + def query_side_effect(*models): m = Mock() - if model == DatasetMetadata: + if len(models) == 1 and models[0] == DatasetMetadata: m.filter.return_value.filter.return_value.first.return_value = mock_metadata_def # handle the specific chain in code m.filter_by.return_value.first.return_value = mock_metadata_def m.filter.return_value.all.return_value = [mock_metadata_def] return m - if model == Document: + if len(models) == 1 and models[0] == Document: doc_mock = Mock() doc_mock.position = 1 # For get_documents_position @@ -97,9 +98,12 @@ class TestDocumentServiceMetadata: # For duplicate check m.where.return_value.all.return_value = [] return m - if model == UploadFile: + if len(models) == 1 and models[0] == UploadFile: m.where.return_value.all.return_value = [Mock(id="file-1", tenant_id=tenant_id)] return m + if len(models) == 2: + m.filter.return_value.all.return_value = [] + return m return m @@ -128,4 +132,93 @@ class TestDocumentServiceMetadata: assert kwargs["custom_metadata"] == {"custom_field": "custom_value"} # 3. Check DatasetMetadataBinding creation - assert mock_dependencies["db"].add.call_count >= 1 + binding_instances = [ + call.args[0] + for call in mock_dependencies["db"].add.call_args_list + if isinstance(call.args[0], DatasetMetadataBinding) + ] + assert len(binding_instances) == 1 + assert binding_instances[0].document_id == "doc-123" + assert binding_instances[0].metadata_id == metadata_id + + def test_save_duplicate_document_with_metadata_creates_binding(self, mock_dependencies): + # Arrange + dataset_id = str(uuid4()) + tenant_id = str(uuid4()) + account = Mock(spec=Account) + account.id = "account-1" + account.current_tenant_id = tenant_id + + dataset = Mock(spec=Dataset) + dataset.id = dataset_id + dataset.tenant_id = tenant_id + dataset.built_in_field_enabled = False + dataset.doc_form = "text_model" + mock_dependencies["get_dataset"].return_value = dataset + + metadata_id = str(uuid4()) + knowledge_config = KnowledgeConfig( + data_source_type="upload_file", + data_source=DataSource( + info_list=InfoList(data_source_type="upload_file", file_info_list=FileInfo(file_ids=["file-1"])) + ), + doc_form="text_model", + doc_language="en", + indexing_technique="high_quality", + duplicate=True, + doc_metadata=[DocumentMetadataInput(metadata_id=metadata_id, value="custom_value")], + ) + + existing_document = Mock(spec=Document) + existing_document.id = "dup-doc-1" + existing_document.name = "dup.txt" + existing_document.doc_metadata = {"existing_field": "existing_value"} + + with patch("services.dataset_service.db.session.query") as mock_query: + mock_metadata_def = Mock(spec=DatasetMetadata) + mock_metadata_def.id = metadata_id + mock_metadata_def.name = "custom_field" + mock_metadata_def.field_type = "text" + + def query_side_effect(*models): + m = Mock() + if len(models) == 1 and models[0] == DatasetMetadata: + m.filter.return_value.all.return_value = [mock_metadata_def] + return m + if len(models) == 1 and models[0] == Document: + doc_mock = Mock() + doc_mock.position = 1 + m.filter_by.return_value.order_by.return_value.first.return_value = doc_mock + m.where.return_value.all.return_value = [existing_document] + return m + if len(models) == 1 and models[0] == UploadFile: + file_mock = Mock(id="file-1", tenant_id=tenant_id) + file_mock.name = "dup.txt" + m.where.return_value.all.return_value = [file_mock] + return m + if len(models) == 2: + m.filter.return_value.all.return_value = [] + return m + + return m + + mock_query.side_effect = query_side_effect + + # Act + DocumentService.save_document_with_dataset_id( + dataset=dataset, knowledge_config=knowledge_config, account=account + ) + + # Assert + mock_dependencies["build_document"].assert_not_called() + assert existing_document.doc_metadata["custom_field"] == "custom_value" + + binding_instances = [ + call.args[0] + for call in mock_dependencies["db"].add.call_args_list + if isinstance(call.args[0], DatasetMetadataBinding) + ] + assert any( + binding.document_id == existing_document.id and binding.metadata_id == metadata_id + for binding in binding_instances + ) diff --git a/api/tests/unit_tests/tasks/test_document_indexing_update_task.py b/api/tests/unit_tests/tasks/test_document_indexing_update_task.py new file mode 100644 index 0000000000..518be41ed3 --- /dev/null +++ b/api/tests/unit_tests/tasks/test_document_indexing_update_task.py @@ -0,0 +1,44 @@ +from unittest.mock import MagicMock, patch + +from tasks.document_indexing_update_task import document_indexing_update_task + + +@patch("tasks.document_indexing_update_task.IndexingRunner") +@patch("tasks.document_indexing_update_task.IndexProcessorFactory") +@patch("tasks.document_indexing_update_task.session_factory") +def test_commit_parsing_state_before_indexing_runner( + mock_session_factory, mock_index_processor_factory, mock_runner_class +): + mock_session = MagicMock() + mock_context_manager = MagicMock() + mock_context_manager.__enter__.return_value = mock_session + mock_context_manager.__exit__.return_value = None + mock_session_factory.create_session.return_value = mock_context_manager + + mock_document = MagicMock() + mock_document.id = "doc-1" + mock_document.doc_form = "text_model" + mock_dataset = MagicMock() + mock_dataset.id = "dataset-1" + + document_query = MagicMock() + document_query.where.return_value.first.return_value = mock_document + dataset_query = MagicMock() + dataset_query.where.return_value.first.return_value = mock_dataset + mock_session.query.side_effect = [document_query, dataset_query] + mock_session.scalars.return_value.all.return_value = [] + + execution_order: list[str] = [] + mock_session.commit.side_effect = lambda: execution_order.append("commit") + + runner = MagicMock() + runner.run.side_effect = lambda _: execution_order.append("run") + mock_runner_class.return_value = runner + + document_indexing_update_task("dataset-1", "doc-1") + + assert "commit" in execution_order + assert "run" in execution_order + assert execution_order.index("commit") < execution_order.index("run") + runner.run.assert_called_once_with([mock_document]) + mock_index_processor_factory.return_value.init_index_processor.return_value.clean.assert_not_called() diff --git a/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx index 4867ba48e3..9523c6aaae 100644 --- a/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx +++ b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx @@ -112,7 +112,7 @@ const MetadataSection: FC = ({ } }, [docMetadata, onDocMetadataChange]) - const handleDocMetadataValueChange = useCallback((index: number, value: string | number | ValueSelector) => { + const handleDocMetadataValueChange = useCallback((index: number, value: string | number | ValueSelector | null) => { if (onDocMetadataChange) { const newMetadata = [...docMetadata] newMetadata[index] = { ...newMetadata[index], value } @@ -265,11 +265,12 @@ const MetadataSection: FC = ({ // Time type - use Datepicker if (metadataType === DataType.time) { + const timeValue = typeof item.value === 'number' ? item.value : undefined return ( handleDocMetadataValueChange(index, value || 0)} + value={timeValue} + onChange={value => handleDocMetadataValueChange(index, value)} /> ) } @@ -279,7 +280,7 @@ const MetadataSection: FC = ({ return ( handleDocMetadataValueChange(index, value)} readOnly={readonly} size="regular" @@ -291,7 +292,7 @@ const MetadataSection: FC = ({ return ( handleDocMetadataValueChange(index, e.target.value)} placeholder={t('placeholder.input', { ns: 'common' }) || ''} disabled={readonly} diff --git a/web/app/components/workflow/nodes/knowledge-base/types.ts b/web/app/components/workflow/nodes/knowledge-base/types.ts index 5a4d093bd0..32cf6d46e1 100644 --- a/web/app/components/workflow/nodes/knowledge-base/types.ts +++ b/web/app/components/workflow/nodes/knowledge-base/types.ts @@ -45,7 +45,7 @@ export type RetrievalSetting = { export type DocMetadataItem = { metadata_id: string - value: string | number | string[] // string[] for ValueSelector + value: string | number | string[] | null // string[] for ValueSelector } export type SummaryIndexSetting = { From 42e36d06c479c7922a9a04d44792612452ba025e Mon Sep 17 00:00:00 2001 From: GuanMu Date: Mon, 9 Feb 2026 13:45:22 +0800 Subject: [PATCH 17/68] feat: Introduce `ConstantValueInput` for dynamic metadata input in workflow nodes, enhance data consistency for document metadata bindings, and improve metadata usage check robustness. --- api/services/dataset_service.py | 11 +-- api/services/metadata_service.py | 4 +- .../components/metadata-section.tsx | 99 +++++++++++-------- 3 files changed, 66 insertions(+), 48 deletions(-) diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 92c3bee7e4..a7902b5136 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -1916,7 +1916,7 @@ class DocumentService: db.session.add(dataset) # 2. Process custom metadata - validate and build dict - custom_metadata: dict = {} + custom_metadata: dict[str, str | int | float | None] = {} metadata_bindings_to_create: list[str] = [] if knowledge_config.doc_metadata: # Batch fetch all metadata definitions to avoid N+1 query @@ -2170,10 +2170,8 @@ class DocumentService: document_ids.append(document.id) documents.append(document) position += 1 - db.session.commit() - - # Create DatasetMetadataBinding records for custom metadata. - # Bindings should exist for both newly created and duplicate-reused documents. + # Create DatasetMetadataBinding records for custom metadata + # before commit so documents and bindings are in a single transaction. if metadata_bindings_to_create: target_document_ids = list(set(document_ids + duplicate_document_ids)) metadata_ids = list(dict.fromkeys(metadata_bindings_to_create)) @@ -2204,7 +2202,8 @@ class DocumentService: created_by=account.id, ) db.session.add(binding) - db.session.commit() + + db.session.commit() # trigger async task if document_ids: diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index beb47f0104..2db94a9e0e 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -156,8 +156,10 @@ class MetadataService: if item.get("metadata_id") == metadata_id: return True, pipeline.name except Exception: + # Fail closed: if we can't parse the workflow, assume metadata is in use + # to prevent accidental deletion of actively used metadata. logger.exception("Error checking metadata usage in pipeline workflow %s", workflow.id) - continue + return True, pipeline.name return False, None diff --git a/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx index 9523c6aaae..32b1ff8264 100644 --- a/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx +++ b/web/app/components/workflow/nodes/knowledge-base/components/metadata-section.tsx @@ -25,6 +25,56 @@ import { } from '@/service/knowledge/use-metadata' import { cn } from '@/utils/classnames' +type ConstantValueInputProps = { + metadataType: DataType | undefined + value: string | number | string[] | null + onChange: (value: string | number | null) => void + readonly?: boolean + placeholder: string +} + +const ConstantValueInput: FC = ({ + metadataType, + value, + onChange, + readonly, + placeholder, +}) => { + if (metadataType === DataType.time) { + const timeValue = typeof value === 'number' ? value : undefined + return ( + onChange(v)} + /> + ) + } + + if (metadataType === DataType.number) { + return ( + onChange(v)} + readOnly={readonly} + size="regular" + /> + ) + } + + return ( + onChange(e.target.value)} + placeholder={placeholder} + disabled={readonly} + className="h-full w-full bg-transparent text-[13px] text-text-primary outline-none placeholder:text-text-placeholder disabled:opacity-50" + /> + ) +} + type MetadataSectionProps = { nodeId: string datasetId?: string @@ -150,7 +200,7 @@ const MetadataSection: FC = ({ = varName === 'timestamp' // sys.timestamp || varName.includes('time') // current_time, expiry_time || varName.includes('date') // created_date, updated_date - || varName.includes('at') // created_at, updated_at + || varName.endsWith('_at') // created_at, updated_at return (variable.type === VarType.number || variable.type === VarType.integer) && isTimeRelated @@ -260,46 +310,13 @@ const MetadataSection: FC = ({ ) : (
- {(() => { - const metadataType = getMetadataType(item.metadata_id) - - // Time type - use Datepicker - if (metadataType === DataType.time) { - const timeValue = typeof item.value === 'number' ? item.value : undefined - return ( - handleDocMetadataValueChange(index, value)} - /> - ) - } - - // Number type - use InputNumber - if (metadataType === DataType.number) { - return ( - handleDocMetadataValueChange(index, value)} - readOnly={readonly} - size="regular" - /> - ) - } - - // String type (default) - use text input - return ( - handleDocMetadataValueChange(index, e.target.value)} - placeholder={t('placeholder.input', { ns: 'common' }) || ''} - disabled={readonly} - className="h-full w-full bg-transparent text-[13px] text-text-primary outline-none placeholder:text-text-placeholder disabled:opacity-50" - /> - ) - })()} + handleDocMetadataValueChange(index, value)} + readonly={readonly} + placeholder={t('placeholder.input', { ns: 'common' }) || ''} + />
)}
From cc3338577442d189adc608f695660c0c2751fa1c Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 9 Feb 2026 06:27:55 +0000 Subject: [PATCH 18/68] [autofix.ci] apply automated fixes --- api/services/dataset_service.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index a7902b5136..e26e31bffb 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -2045,11 +2045,7 @@ class DocumentService: document.batch = batch document.indexing_status = "waiting" if custom_metadata: - doc_metadata = ( - copy.deepcopy(document.doc_metadata) - if document.doc_metadata - else {} - ) + doc_metadata = copy.deepcopy(document.doc_metadata) if document.doc_metadata else {} doc_metadata.update(custom_metadata) document.doc_metadata = doc_metadata db.session.add(document) From b064f222cdba78369916374c114735c47d536a96 Mon Sep 17 00:00:00 2001 From: GuanMu Date: Wed, 11 Feb 2026 15:05:29 +0800 Subject: [PATCH 19/68] feat: Decouple built-in metadata field enablement from pipeline workflow configuration and add checks for built-in metadata status in published pipelines. --- .../rag_pipeline/rag_pipeline_workflow.py | 33 +++ api/services/metadata_service.py | 227 ++++++++++++++---- ...etadata_service_pipeline_built_in_guard.py | 217 +++++++++++++++++ web/app/components/base/switch/index.tsx | 20 +- .../dataset-metadata-drawer.spec.tsx | 23 ++ .../dataset-metadata-drawer.tsx | 21 +- .../rag-pipeline-header/publisher/popup.tsx | 42 ++-- .../components/metadata-section.tsx | 36 ++- web/eslint-suppressions.json | 16 -- 9 files changed, 513 insertions(+), 122 deletions(-) create mode 100644 api/tests/unit_tests/services/test_metadata_service_pipeline_built_in_guard.py diff --git a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py index 29b6b64b94..437773b99a 100644 --- a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py +++ b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py @@ -44,6 +44,7 @@ from models.dataset import Pipeline from models.model import EndUser from services.errors.app import WorkflowHashNotEqualError from services.errors.llm import InvokeRateLimitError +from services.metadata_service import MetadataService from services.rag_pipeline.pipeline_generate_service import PipelineGenerateService from services.rag_pipeline.rag_pipeline import RagPipelineService from services.rag_pipeline.rag_pipeline_manage_service import RagPipelineManageService @@ -612,8 +613,40 @@ class PublishedRagPipelineApi(Resource): session.add(pipeline) workflow_created_at = TimestampField().format(workflow.created_at) + # Extract built-in metadata flag before commit (workflow is in session). + # If ANY knowledge-index node enables built-in metadata, the dataset + # flag should be True (consistent with check_built_in_enabled_in_published_pipeline). + node_built_in_enabled: bool | None = None + dataset = pipeline.retrieve_dataset(session=session) + dataset_id: str | None = dataset.id if dataset else None + try: + enabled = False + for node_data in MetadataService._iter_knowledge_index_nodes(workflow): + if node_data.get("enable_built_in_metadata") is True: + enabled = True + node_built_in_enabled = enabled + except Exception: + logger.exception( + "Skip post-publish built-in metadata sync extraction: failed to parse workflow %s graph", + workflow.id, + ) + session.commit() + # After commit: pipeline.workflow_id points to the new workflow. + # Sync dataset.built_in_field_enabled with the published node config. + # This is best-effort: publish already succeeded, so sync failures + # must not turn the response into an error. + if dataset_id is not None and node_built_in_enabled is not None: + try: + MetadataService.sync_built_in_field_on_publish(dataset_id, node_built_in_enabled) + except Exception: + logger.exception( + "Post-publish built-in metadata sync failed for dataset %s; " + "the Documents page may show a stale toggle until the next publish.", + dataset_id, + ) + return { "result": "success", "created_at": workflow_created_at, diff --git a/api/services/metadata_service.py b/api/services/metadata_service.py index 2db94a9e0e..3d725d412e 100644 --- a/api/services/metadata_service.py +++ b/api/services/metadata_service.py @@ -1,5 +1,7 @@ import copy import logging +import time +from collections.abc import Sequence from sqlalchemy import or_ @@ -18,6 +20,8 @@ from services.entities.knowledge_entities.knowledge_entities import ( logger = logging.getLogger(__name__) +KNOWLEDGE_INDEX_NODE_TYPE = "knowledge-index" + class MetadataService: @staticmethod @@ -98,66 +102,117 @@ class MetadataService: finally: redis_client.delete(lock_key) + # ------------------------------------------------------------------ + # Pipeline workflow guard helpers + # ------------------------------------------------------------------ + @staticmethod - def check_metadata_used_in_pipeline(dataset_id: str, metadata_id: str) -> tuple[bool, str | None]: - """ - Check if a metadata is used in the associated Pipeline's Knowledge Base node. - - Checks both draft and current published workflows to prevent deletion of metadata - that is actively used in production. - - Returns: - tuple[bool, str | None]: (is_used, pipeline_name) - True if used, with pipeline name - """ - # Get the dataset + def _get_pipeline_for_dataset(dataset_id: str) -> Pipeline | None: + """Return the Pipeline associated with *dataset_id*, or ``None``.""" dataset = db.session.query(Dataset).filter_by(id=dataset_id).first() if not dataset or not dataset.pipeline_id: - return False, None + return None + return db.session.query(Pipeline).filter_by(id=dataset.pipeline_id).first() - # Get the pipeline to access workflow_id (current published version) - pipeline = db.session.query(Pipeline).filter_by(id=dataset.pipeline_id).first() + @staticmethod + def _get_pipeline_workflows(pipeline: Pipeline, *, published_only: bool = False) -> Sequence[Workflow]: + """Fetch the relevant workflows for *pipeline*. + + Args: + pipeline: The pipeline whose workflows to fetch. + published_only: When ``True`` only the current published workflow is + returned. When ``False`` both the draft **and** the published + workflow are returned (used for stricter deletion guards). + """ + if published_only: + if not pipeline.workflow_id: + return [] + wf = db.session.query(Workflow).filter_by(id=pipeline.workflow_id).first() + return [wf] if wf else [] + + draft_condition = (Workflow.app_id == pipeline.id) & (Workflow.version == Workflow.VERSION_DRAFT) + if pipeline.workflow_id: + return ( + db.session.query(Workflow) + .where(or_(draft_condition, Workflow.id == pipeline.workflow_id)) + .all() + ) + return db.session.query(Workflow).where(draft_condition).all() + + @staticmethod + def _iter_knowledge_index_nodes(workflow: Workflow): + """Yield each ``knowledge-index`` node data dict from *workflow*. + + Raises on malformed ``graph_dict`` so callers can apply fail-closed + logic in their own ``except`` blocks. + """ + graph_dict = workflow.graph_dict + for node in graph_dict.get("nodes", []): + node_data = node.get("data", {}) + if node_data.get("type") == KNOWLEDGE_INDEX_NODE_TYPE: + yield node_data + + # ------------------------------------------------------------------ + # Public pipeline guard checks + # ------------------------------------------------------------------ + + @staticmethod + def check_built_in_enabled_in_published_pipeline(dataset_id: str) -> tuple[bool, str | None]: + """Check if built-in metadata is enabled in the current **published** Pipeline workflow. + + Used when users disable built-in metadata from the Documents page. + Only the published workflow is checked because the Documents page + should not block on unpublished draft changes. + + Returns: + ``(is_enabled, pipeline_name)`` + """ + pipeline = MetadataService._get_pipeline_for_dataset(dataset_id) if not pipeline: return False, None - # Build conditions for draft and current published workflows only - draft_condition = (Workflow.app_id == pipeline.id) & (Workflow.version == Workflow.VERSION_DRAFT) - - if pipeline.workflow_id: - workflows = ( - db.session.query(Workflow) - .where( - or_( - draft_condition, - Workflow.id == pipeline.workflow_id, - ) - ) - .all() - ) - else: - workflows = db.session.query(Workflow).where(draft_condition).all() - + workflows = MetadataService._get_pipeline_workflows(pipeline, published_only=True) if not workflows: return False, None - # Check each workflow for metadata usage for workflow in workflows: try: - graph_dict = workflow.graph_dict - if "nodes" not in graph_dict: - continue - - for node in graph_dict["nodes"]: - node_data = node.get("data", {}) - # Check if this is a knowledge-index node - if node_data.get("type") == "knowledge-index": - doc_metadata = node_data.get("doc_metadata", []) - if doc_metadata: - for item in doc_metadata: - if item.get("metadata_id") == metadata_id: - return True, pipeline.name + for node_data in MetadataService._iter_knowledge_index_nodes(workflow): + if node_data.get("enable_built_in_metadata") is True: + return True, pipeline.name + except Exception: + logger.exception( + "Error checking built-in metadata in published pipeline workflow %s", workflow.id + ) + return True, pipeline.name + + return False, None + + @staticmethod + def check_metadata_used_in_pipeline(dataset_id: str, metadata_id: str) -> tuple[bool, str | None]: + """Check if a custom metadata field is referenced in the Pipeline's Knowledge Base node. + + Both draft **and** published workflows are inspected to prevent + deletion of metadata that is actively used or about to be published. + + Returns: + ``(is_used, pipeline_name)`` + """ + pipeline = MetadataService._get_pipeline_for_dataset(dataset_id) + if not pipeline: + return False, None + + workflows = MetadataService._get_pipeline_workflows(pipeline, published_only=False) + if not workflows: + return False, None + + for workflow in workflows: + try: + for node_data in MetadataService._iter_knowledge_index_nodes(workflow): + for item in node_data.get("doc_metadata") or []: + if item.get("metadata_id") == metadata_id: + return True, pipeline.name except Exception: - # Fail closed: if we can't parse the workflow, assume metadata is in use - # to prevent accidental deletion of actively used metadata. logger.exception("Error checking metadata usage in pipeline workflow %s", workflow.id) return True, pipeline.name @@ -246,12 +301,23 @@ class MetadataService: redis_client.delete(lock_key) @staticmethod - def disable_built_in_field(dataset: Dataset): + def disable_built_in_field(dataset: Dataset) -> None: if not dataset.built_in_field_enabled: return + lock_key = f"dataset_metadata_lock_{dataset.id}" try: MetadataService.knowledge_base_metadata_lock_check(dataset.id, None) + + # Guard runs under the lock so a concurrent publish cannot change + # the published workflow between the check and the actual disable. + is_enabled, pipeline_name = MetadataService.check_built_in_enabled_in_published_pipeline(dataset.id) + if is_enabled: + raise ValueError( + "Cannot disable built-in metadata because current published " + f"Pipeline '{pipeline_name}' is using it." + ) + db.session.add(dataset) documents = DocumentService.get_working_documents_by_dataset_id(dataset.id) document_ids = [] @@ -271,11 +337,76 @@ class MetadataService: document_ids.append(document.id) dataset.built_in_field_enabled = False db.session.commit() + except ValueError: + raise except Exception: logger.exception("Disable built-in field failed") finally: redis_client.delete(lock_key) + @staticmethod + def sync_built_in_field_on_publish( + dataset_id: str, + enable_built_in_metadata: bool, + *, + max_retries: int = 3, + retry_interval: float = 0.2, + ) -> None: + """Sync dataset.built_in_field_enabled with the published workflow's node config. + + Called **after** the publish transaction commits so that + ``pipeline.workflow_id`` already points to the new workflow. + + Unlike ``enable_built_in_field`` / ``disable_built_in_field`` this + method does **not** populate or strip built-in fields on existing + documents — that is intentional because the publish action only + changes the *future* indexing behaviour. The flag sync ensures the + Documents page reflects the published pipeline's intent. + + Retries on lock contention because a concurrent disable may hold the + lock briefly; giving up would leave the dataset flag stale. + """ + lock_key = f"dataset_metadata_lock_{dataset_id}" + + for attempt in range(max_retries + 1): + lock_acquired = False + try: + MetadataService.knowledge_base_metadata_lock_check(dataset_id, None) + lock_acquired = True + + dataset = db.session.query(Dataset).filter_by(id=dataset_id).first() + if not dataset: + logger.warning("sync_built_in_field_on_publish: dataset %s not found", dataset_id) + return + + if dataset.built_in_field_enabled == enable_built_in_metadata: + return + + logger.info( + "sync_built_in_field_on_publish: dataset=%s, %s -> %s", + dataset_id, dataset.built_in_field_enabled, enable_built_in_metadata, + ) + dataset.built_in_field_enabled = enable_built_in_metadata + db.session.add(dataset) + db.session.commit() + return + except ValueError: + if lock_acquired: + raise + # Lock contention — retry after a short wait + if attempt < max_retries: + logger.info( + "sync_built_in_field_on_publish: lock contention for dataset %s, " + "retrying (%d/%d)", + dataset_id, attempt + 1, max_retries, + ) + time.sleep(retry_interval) + else: + raise + finally: + if lock_acquired: + redis_client.delete(lock_key) + @staticmethod def update_documents_metadata(dataset: Dataset, metadata_args: MetadataOperationData): for operation in metadata_args.operation_data: diff --git a/api/tests/unit_tests/services/test_metadata_service_pipeline_built_in_guard.py b/api/tests/unit_tests/services/test_metadata_service_pipeline_built_in_guard.py new file mode 100644 index 0000000000..cb42eb31b4 --- /dev/null +++ b/api/tests/unit_tests/services/test_metadata_service_pipeline_built_in_guard.py @@ -0,0 +1,217 @@ +from unittest.mock import MagicMock, PropertyMock, patch + +import pytest + +from models.dataset import Dataset, Pipeline +from models.workflow import Workflow +from services.metadata_service import MetadataService + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_pipeline(*, workflow_id: str | None = "workflow-published-1") -> MagicMock: + pipeline = MagicMock(spec=Pipeline) + pipeline.id = "pipeline-1" + pipeline.name = "Pipeline A" + pipeline.workflow_id = workflow_id + return pipeline + + +def _make_workflow(graph_dict: dict) -> MagicMock: + workflow = MagicMock(spec=Workflow) + workflow.id = "workflow-1" + workflow.graph_dict = graph_dict + return workflow + + +def _make_dataset(*, pipeline_id: str | None = "pipeline-1") -> MagicMock: + dataset = MagicMock(spec=Dataset) + dataset.id = "dataset-1" + dataset.pipeline_id = pipeline_id + return dataset + + +def _setup_db_for_check(mock_db, *, dataset=None, pipeline=None, workflow=None): + """Wire up ``db.session.query(Model).filter_by(...).first()`` chains.""" + queries = [] + if dataset is not None: + q = MagicMock() + q.filter_by.return_value.first.return_value = dataset + queries.append(q) + if pipeline is not None: + q = MagicMock() + q.filter_by.return_value.first.return_value = pipeline + queries.append(q) + if workflow is not None: + q = MagicMock() + q.filter_by.return_value.first.return_value = workflow + queries.append(q) + mock_db.session.query.side_effect = queries + + +# --------------------------------------------------------------------------- +# check_built_in_enabled_in_published_pipeline +# --------------------------------------------------------------------------- + +class TestCheckBuiltInEnabledInPublishedPipeline: + @patch("services.metadata_service.db") + def test_returns_true_when_enabled(self, mock_db): + workflow = _make_workflow({ + "nodes": [{"data": {"type": "knowledge-index", "enable_built_in_metadata": True}}], + }) + _setup_db_for_check(mock_db, dataset=_make_dataset(), pipeline=_make_pipeline(), workflow=workflow) + + is_enabled, name = MetadataService.check_built_in_enabled_in_published_pipeline("dataset-1") + + assert is_enabled is True + assert name == "Pipeline A" + + @patch("services.metadata_service.db") + def test_returns_false_when_disabled(self, mock_db): + workflow = _make_workflow({ + "nodes": [{"data": {"type": "knowledge-index", "enable_built_in_metadata": False}}], + }) + _setup_db_for_check(mock_db, dataset=_make_dataset(), pipeline=_make_pipeline(), workflow=workflow) + + is_enabled, name = MetadataService.check_built_in_enabled_in_published_pipeline("dataset-1") + + assert is_enabled is False + assert name is None + + @patch("services.metadata_service.db") + def test_returns_false_when_no_pipeline_id(self, mock_db): + _setup_db_for_check(mock_db, dataset=_make_dataset(pipeline_id=None)) + + is_enabled, name = MetadataService.check_built_in_enabled_in_published_pipeline("dataset-1") + + assert is_enabled is False + assert name is None + + @patch("services.metadata_service.db") + def test_returns_false_when_no_published_workflow(self, mock_db): + _setup_db_for_check( + mock_db, + dataset=_make_dataset(), + pipeline=_make_pipeline(workflow_id=None), + ) + + is_enabled, name = MetadataService.check_built_in_enabled_in_published_pipeline("dataset-1") + + assert is_enabled is False + assert name is None + + @patch("services.metadata_service.db") + def test_fail_closed_on_graph_parse_error(self, mock_db): + """If graph_dict raises, assume built-in is enabled (fail-closed).""" + workflow = MagicMock(spec=Workflow) + workflow.id = "wf-bad" + type(workflow).graph_dict = PropertyMock(side_effect=RuntimeError("corrupt")) + _setup_db_for_check(mock_db, dataset=_make_dataset(), pipeline=_make_pipeline(), workflow=workflow) + + is_enabled, name = MetadataService.check_built_in_enabled_in_published_pipeline("dataset-1") + + assert is_enabled is True + assert name == "Pipeline A" + + @patch("services.metadata_service.db") + def test_multiple_nodes_only_one_enabled(self, mock_db): + workflow = _make_workflow({ + "nodes": [ + {"data": {"type": "knowledge-index", "enable_built_in_metadata": False}}, + {"data": {"type": "knowledge-index", "enable_built_in_metadata": True}}, + ], + }) + _setup_db_for_check(mock_db, dataset=_make_dataset(), pipeline=_make_pipeline(), workflow=workflow) + + is_enabled, name = MetadataService.check_built_in_enabled_in_published_pipeline("dataset-1") + + assert is_enabled is True + assert name == "Pipeline A" + + @patch("services.metadata_service.db") + def test_ignores_non_knowledge_index_nodes(self, mock_db): + workflow = _make_workflow({ + "nodes": [ + {"data": {"type": "llm", "enable_built_in_metadata": True}}, + ], + }) + _setup_db_for_check(mock_db, dataset=_make_dataset(), pipeline=_make_pipeline(), workflow=workflow) + + is_enabled, name = MetadataService.check_built_in_enabled_in_published_pipeline("dataset-1") + + assert is_enabled is False + assert name is None + + +# --------------------------------------------------------------------------- +# disable_built_in_field — always guards against published pipeline usage +# --------------------------------------------------------------------------- + +class TestDisableBuiltInField: + @patch("services.metadata_service.db") + @patch("services.metadata_service.redis_client") + @patch("services.metadata_service.DocumentService") + @patch.object(MetadataService, "check_built_in_enabled_in_published_pipeline") + @patch.object(MetadataService, "knowledge_base_metadata_lock_check") + def test_raises_when_pipeline_uses_built_in( + self, + mock_lock_check, + mock_check_built_in, + mock_document_service, + mock_redis_client, + mock_db, + ): + dataset = MagicMock(spec=Dataset) + dataset.id = "dataset-1" + dataset.built_in_field_enabled = True + mock_check_built_in.return_value = (True, "Pipeline A") + + with pytest.raises( + ValueError, + match="Cannot disable built-in metadata", + ): + MetadataService.disable_built_in_field(dataset) + + mock_check_built_in.assert_called_once_with(dataset.id) + # Lock IS acquired first, then guard rejects — but no DB mutation happens + mock_lock_check.assert_called_once() + mock_db.session.add.assert_not_called() + mock_db.session.commit.assert_not_called() + # Lock is released in finally + mock_redis_client.delete.assert_called_once() + + @patch("services.metadata_service.db") + @patch("services.metadata_service.redis_client") + @patch("services.metadata_service.DocumentService") + @patch.object(MetadataService, "check_built_in_enabled_in_published_pipeline") + @patch.object(MetadataService, "knowledge_base_metadata_lock_check") + def test_allows_when_pipeline_not_using( + self, + mock_lock_check, + mock_check_built_in, + mock_document_service, + mock_redis_client, + mock_db, + ): + dataset = MagicMock(spec=Dataset) + dataset.id = "dataset-1" + dataset.built_in_field_enabled = True + mock_check_built_in.return_value = (False, None) + mock_document_service.get_working_documents_by_dataset_id.return_value = [] + + MetadataService.disable_built_in_field(dataset) + + mock_check_built_in.assert_called_once_with(dataset.id) + assert dataset.built_in_field_enabled is False + mock_db.session.commit.assert_called_once() + + def test_noop_when_already_disabled(self): + dataset = MagicMock(spec=Dataset) + dataset.built_in_field_enabled = False + + # Should return immediately without any side effects + MetadataService.disable_built_in_field(dataset) + + assert dataset.built_in_field_enabled is False diff --git a/web/app/components/base/switch/index.tsx b/web/app/components/base/switch/index.tsx index 6296a33141..5ca476567e 100644 --- a/web/app/components/base/switch/index.tsx +++ b/web/app/components/base/switch/index.tsx @@ -8,6 +8,7 @@ type SwitchProps = { onChange?: (value: boolean) => void size?: 'xs' | 'sm' | 'md' | 'lg' | 'l' defaultValue?: boolean + value?: boolean disabled?: boolean className?: string } @@ -18,16 +19,22 @@ const Switch = ( onChange, size = 'md', defaultValue = false, + value, disabled = false, className, }: SwitchProps & { ref?: React.RefObject }, ) => { - const [enabled, setEnabled] = useState(defaultValue) + const isControlled = value !== undefined + const [uncontrolledEnabled, setUncontrolledEnabled] = useState(defaultValue) + useEffect(() => { - setEnabled(defaultValue) - }, [defaultValue]) + if (!isControlled) + setUncontrolledEnabled(defaultValue) + }, [defaultValue, isControlled]) + + const enabled = isControlled ? value : uncontrolledEnabled const wrapStyle = { lg: 'h-6 w-11', l: 'h-5 w-9', @@ -58,10 +65,13 @@ const Switch = ( onChange={(checked: boolean) => { if (disabled) return - setEnabled(checked) + + if (!isControlled) + setUncontrolledEnabled(checked) + onChange?.(checked) }} - className={cn(wrapStyle[size], enabled ? 'bg-components-toggle-bg' : 'bg-components-toggle-bg-unchecked', 'relative inline-flex shrink-0 cursor-pointer rounded-[5px] border-2 border-transparent transition-colors duration-200 ease-in-out', disabled ? '!cursor-not-allowed !opacity-50' : '', size === 'xs' && 'rounded-sm', className)} + className={cn(wrapStyle[size], enabled ? 'bg-components-toggle-bg' : 'bg-components-toggle-bg-unchecked', 'relative inline-flex shrink-0 cursor-pointer rounded-[5px] border-2 border-transparent transition-colors duration-200 ease-in-out', disabled ? '!cursor-not-allowed !opacity-50' : '', size === 'xs' && 'rounded-sm', className)} >