From fb11f416d91fec6ed440cfb761e76fa1e177656f Mon Sep 17 00:00:00 2001 From: RickDamon <719196134@qq.com> Date: Thu, 19 Mar 2026 15:51:24 +0800 Subject: [PATCH 1/2] fix(weaviate): proactive schema migration for pre-upgrade collections Collections created before doc_type was introduced cause 'no such prop with name doc_type' GRPC errors after upgrading. - _ensure_properties() checks/adds missing properties, returns bool - _maybe_ensure_properties() uses double-checked locking with a module-level set for once-per-process-per-collection verification - All query/filter paths call _maybe_ensure_properties() before executing, transparently fixing old schemas on first access - Failed migrations are not cached, allowing automatic retry Closes #33716 --- .../vdb/weaviate/weaviate_vector.py | 66 +++- .../vdb/weaviate/test_weaviate_vector.py | 335 +++++++++++++++++- 2 files changed, 389 insertions(+), 12 deletions(-) diff --git a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py index d29d62c93f..9a7411ff61 100644 --- a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py +++ b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py @@ -37,6 +37,12 @@ logger = logging.getLogger(__name__) _weaviate_client: weaviate.WeaviateClient | None = None _weaviate_client_lock = threading.Lock() +# Process-level cache: tracks collections whose required properties have been +# verified in this process lifetime. Guarded by _ensured_collections_lock so +# concurrent Celery/thread workers can share the set safely. +_ensured_collections: set[str] = set() +_ensured_collections_lock = threading.Lock() + def _shutdown_weaviate_client() -> None: """ @@ -230,26 +236,35 @@ class WeaviateVector(BaseVector): vector_config=wc.Configure.Vectors.self_provided(), ) - self._ensure_properties() + if self._ensure_properties(): + # Mark collection as ensured so _maybe_ensure_properties() is + # a no-op for subsequent operations in this process. + _ensured_collections.add(self._collection_name) redis_client.set(cache_key, 1, ex=3600) except Exception as e: logger.exception("Error creating collection %s", self._collection_name) raise - def _ensure_properties(self) -> None: - """ - Ensures all required properties exist in the collection schema. + def _ensure_properties(self) -> bool: + """Ensures all required properties exist in the collection schema. - Adds missing properties if the collection exists but lacks them. + Adds missing properties (document_id, doc_id, doc_type, chunk_index) if the + collection exists but lacks them. This handles backward compatibility for + collections created before these properties were introduced. + + Returns ``True`` if all properties are confirmed present (either they + already existed or were successfully added). Returns ``False`` if + any property addition failed, signalling that the caller should NOT + cache this collection as "ensured". """ if not self._client.collections.exists(self._collection_name): - return + return True col = self._client.collections.use(self._collection_name) cfg = col.config.get() existing = {p.name for p in (cfg.properties or [])} - to_add = [] + to_add: list[wc.Property] = [] if "document_id" not in existing: to_add.append(wc.Property(name="document_id", data_type=wc.DataType.TEXT)) if "doc_id" not in existing: @@ -259,13 +274,42 @@ class WeaviateVector(BaseVector): if "chunk_index" not in existing: to_add.append(wc.Property(name="chunk_index", data_type=wc.DataType.INT)) + all_succeeded = True for prop in to_add: try: col.config.add_property(prop) except Exception as e: logger.warning("Could not add property %s: %s", prop.name, e) + all_succeeded = False - def _get_uuids(self, documents: list[Document]) -> list[str]: + return all_succeeded + + def _maybe_ensure_properties(self) -> None: + """One-time proactive schema check per collection per process lifetime. + + On first access to a collection, verifies that all required properties + (document_id, doc_id, doc_type, chunk_index) exist in the schema. If any + are missing they are added transparently. + + Subsequent calls for the same collection in this process are no-ops + (just a fast ``set`` membership test). + + Uses double-checked locking so concurrent threads in the same process + don't duplicate work. + + If property migration fails (e.g. Weaviate is temporarily unavailable), + the collection is *not* marked as ensured so that the next request will + retry the migration automatically. + """ + if self._collection_name in _ensured_collections: + return + with _ensured_collections_lock: + if self._collection_name in _ensured_collections: + return + if self._ensure_properties(): + _ensured_collections.add(self._collection_name) + + def _get_uuids(self, texts: list[Document]) -> list[str]: """ Generates deterministic UUIDs for documents based on their content. @@ -274,7 +318,7 @@ class WeaviateVector(BaseVector): URL_NAMESPACE = _uuid.UUID("6ba7b811-9dad-11d1-80b4-00c04fd430c8") uuids = [] - for doc in documents: + for doc in texts: uuid_val = _uuid.uuid5(URL_NAMESPACE, doc.page_content) uuids.append(str(uuid_val)) @@ -335,6 +379,7 @@ class WeaviateVector(BaseVector): if not self._client.collections.exists(self._collection_name): return + self._maybe_ensure_properties() col = self._client.collections.use(self._collection_name) col.data.delete_many(where=Filter.by_property(key).equal(value)) @@ -348,6 +393,7 @@ class WeaviateVector(BaseVector): if not self._client.collections.exists(self._collection_name): return False + self._maybe_ensure_properties() col = self._client.collections.use(self._collection_name) res = col.query.fetch_objects( filters=Filter.by_property("doc_id").equal(id), @@ -385,6 +431,7 @@ class WeaviateVector(BaseVector): if not self._client.collections.exists(self._collection_name): return [] + self._maybe_ensure_properties() col = self._client.collections.use(self._collection_name) props = list({*self._attributes, self._DOCUMENT_ID_PROPERTY, Field.TEXT_KEY.value}) @@ -432,6 +479,7 @@ class WeaviateVector(BaseVector): if not self._client.collections.exists(self._collection_name): return [] + self._maybe_ensure_properties() col = self._client.collections.use(self._collection_name) props = list({*self._attributes, Field.TEXT_KEY.value}) diff --git a/api/tests/unit_tests/core/rag/datasource/vdb/weaviate/test_weaviate_vector.py b/api/tests/unit_tests/core/rag/datasource/vdb/weaviate/test_weaviate_vector.py index 3bd656ba84..99fea7e8c2 100644 --- a/api/tests/unit_tests/core/rag/datasource/vdb/weaviate/test_weaviate_vector.py +++ b/api/tests/unit_tests/core/rag/datasource/vdb/weaviate/test_weaviate_vector.py @@ -2,7 +2,8 @@ Focuses on verifying that doc_type is properly handled in: - Collection schema creation (_create_collection) -- Property migration (_ensure_properties) +- Property migration (_ensure_properties / _maybe_ensure_properties) +- Proactive schema migration on first access per collection per process - Vector search result metadata (search_by_vector) - Full-text search result metadata (search_by_full_text) """ @@ -21,6 +22,7 @@ class TestWeaviateVector(unittest.TestCase): def setUp(self): weaviate_vector_module._weaviate_client = None + weaviate_vector_module._ensured_collections.clear() self.config = WeaviateConfig( endpoint="http://localhost:8080", api_key="test-key", @@ -31,6 +33,7 @@ class TestWeaviateVector(unittest.TestCase): def tearDown(self): weaviate_vector_module._weaviate_client = None + weaviate_vector_module._ensured_collections.clear() @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") def _create_weaviate_vector(self, mock_weaviate_module): @@ -111,6 +114,9 @@ class TestWeaviateVector(unittest.TestCase): f"doc_type should be in collection schema properties, got: {property_names}" ) + # Verify collection is marked as ensured after _create_collection + assert self.collection_name in weaviate_vector_module._ensured_collections + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") def test_ensure_properties_adds_missing_doc_type(self, mock_weaviate_module): """Test that _ensure_properties adds doc_type when it's missing from existing schema.""" @@ -139,12 +145,13 @@ class TestWeaviateVector(unittest.TestCase): config=self.config, attributes=self.attributes, ) - wv._ensure_properties() + result = wv._ensure_properties() # Verify add_property was called and includes doc_type add_calls = mock_col.config.add_property.call_args_list added_names = [call.args[0].name for call in add_calls] assert "doc_type" in added_names, f"doc_type should be added to existing collection, added: {added_names}" + assert result is True @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") def test_ensure_properties_skips_existing_doc_type(self, mock_weaviate_module): @@ -174,10 +181,148 @@ class TestWeaviateVector(unittest.TestCase): config=self.config, attributes=self.attributes, ) - wv._ensure_properties() + result = wv._ensure_properties() # No properties should be added mock_col.config.add_property.assert_not_called() + assert result is True + + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_ensure_properties_returns_false_on_failure(self, mock_weaviate_module): + """_ensure_properties should return False when add_property fails.""" + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + mock_client.collections.exists.return_value = True + mock_col = MagicMock() + mock_client.collections.use.return_value = mock_col + + # Schema missing doc_type → will attempt add_property + existing_props = [ + SimpleNamespace(name="text"), + SimpleNamespace(name="document_id"), + SimpleNamespace(name="doc_id"), + SimpleNamespace(name="chunk_index"), + ] + mock_cfg = MagicMock() + mock_cfg.properties = existing_props + mock_col.config.get.return_value = mock_cfg + + # Simulate Weaviate failure during add_property + mock_col.config.add_property.side_effect = Exception("Weaviate unavailable") + + wv = WeaviateVector( + collection_name=self.collection_name, + config=self.config, + attributes=self.attributes, + ) + result = wv._ensure_properties() + + assert result is False, "Should return False when property addition fails" + + # ------------------------------------------------------------------ # + # _maybe_ensure_properties: proactive migration with process cache # + # ------------------------------------------------------------------ # + + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_maybe_ensure_properties_calls_ensure_on_first_access(self, mock_weaviate_module): + """First call to _maybe_ensure_properties should delegate to _ensure_properties.""" + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + wv = WeaviateVector( + collection_name=self.collection_name, + config=self.config, + attributes=self.attributes, + ) + wv._ensure_properties = MagicMock(return_value=True) + + wv._maybe_ensure_properties() + + wv._ensure_properties.assert_called_once() + assert self.collection_name in weaviate_vector_module._ensured_collections + + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_maybe_ensure_properties_skips_on_second_access(self, mock_weaviate_module): + """Subsequent calls to _maybe_ensure_properties should be no-ops.""" + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + wv = WeaviateVector( + collection_name=self.collection_name, + config=self.config, + attributes=self.attributes, + ) + wv._ensure_properties = MagicMock(return_value=True) + + # First call — should invoke _ensure_properties + wv._maybe_ensure_properties() + # Second call — should be a no-op + wv._maybe_ensure_properties() + + wv._ensure_properties.assert_called_once() + + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_maybe_ensure_properties_retries_on_failure(self, mock_weaviate_module): + """When _ensure_properties returns False, the collection should NOT be cached. + + The next call must retry _ensure_properties so that transient failures + (e.g. Weaviate temporarily unavailable) don't permanently prevent migration. + """ + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + wv = WeaviateVector( + collection_name=self.collection_name, + config=self.config, + attributes=self.attributes, + ) + # First call fails + wv._ensure_properties = MagicMock(side_effect=[False, True]) + + wv._maybe_ensure_properties() + assert self.collection_name not in weaviate_vector_module._ensured_collections + + # Second call succeeds → now cached + wv._maybe_ensure_properties() + assert self.collection_name in weaviate_vector_module._ensured_collections + assert wv._ensure_properties.call_count == 2 + + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_maybe_ensure_properties_independent_per_collection(self, mock_weaviate_module): + """Different collections should each get their own one-time check.""" + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + wv1 = WeaviateVector( + collection_name="Collection_A", + config=self.config, + attributes=self.attributes, + ) + wv2 = WeaviateVector( + collection_name="Collection_B", + config=self.config, + attributes=self.attributes, + ) + wv1._ensure_properties = MagicMock(return_value=True) + wv2._ensure_properties = MagicMock(return_value=True) + + wv1._maybe_ensure_properties() + wv2._maybe_ensure_properties() + + wv1._ensure_properties.assert_called_once() + wv2._ensure_properties.assert_called_once() + assert "Collection_A" in weaviate_vector_module._ensured_collections + assert "Collection_B" in weaviate_vector_module._ensured_collections + + # ------------------------------------------------------------------ # + # search_by_vector # + # ------------------------------------------------------------------ # @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") def test_search_by_vector_returns_doc_type_in_metadata(self, mock_weaviate_module): @@ -210,6 +355,17 @@ class TestWeaviateVector(unittest.TestCase): mock_result.objects = [mock_obj] mock_col.query.near_vector.return_value = mock_result + # Pre-populate schema so _maybe_ensure_properties is a no-op + mock_cfg = MagicMock() + mock_cfg.properties = [ + SimpleNamespace(name="text"), + SimpleNamespace(name="document_id"), + SimpleNamespace(name="doc_id"), + SimpleNamespace(name="doc_type"), + SimpleNamespace(name="chunk_index"), + ] + mock_col.config.get.return_value = mock_cfg + wv = WeaviateVector( collection_name=self.collection_name, config=self.config, @@ -226,6 +382,82 @@ class TestWeaviateVector(unittest.TestCase): assert len(docs) == 1 assert docs[0].metadata.get("doc_type") == "image" + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_search_by_vector_calls_maybe_ensure_properties(self, mock_weaviate_module): + """search_by_vector should proactively call _maybe_ensure_properties.""" + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + mock_client.collections.exists.return_value = True + mock_col = MagicMock() + mock_client.collections.use.return_value = mock_col + + mock_obj = MagicMock() + mock_obj.properties = {"text": "hello", "doc_id": "id_1"} + mock_obj.metadata.distance = 0.2 + mock_result = MagicMock() + mock_result.objects = [mock_obj] + mock_col.query.near_vector.return_value = mock_result + + wv = WeaviateVector( + collection_name=self.collection_name, + config=self.config, + attributes=self.attributes, + ) + wv._maybe_ensure_properties = MagicMock() + wv.search_by_vector(query_vector=[0.1] * 128, top_k=1) + + wv._maybe_ensure_properties.assert_called_once() + + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_search_by_vector_migrates_schema_on_first_access_to_old_collection(self, mock_weaviate_module): + """search_by_vector on a pre-upgrade collection should proactively add missing properties.""" + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + mock_client.collections.exists.return_value = True + mock_col = MagicMock() + mock_client.collections.use.return_value = mock_col + + mock_obj = MagicMock() + mock_obj.properties = {"text": "hello world", "doc_id": "id_1"} + mock_obj.metadata.distance = 0.2 + mock_result = MagicMock() + mock_result.objects = [mock_obj] + mock_col.query.near_vector.return_value = mock_result + + # Schema is missing doc_type + existing_props = [ + SimpleNamespace(name="text"), + SimpleNamespace(name="document_id"), + SimpleNamespace(name="doc_id"), + SimpleNamespace(name="chunk_index"), + ] + mock_cfg = MagicMock() + mock_cfg.properties = existing_props + mock_col.config.get.return_value = mock_cfg + + wv = WeaviateVector( + collection_name=self.collection_name, + config=self.config, + attributes=self.attributes, + ) + docs = wv.search_by_vector(query_vector=[0.1] * 128, top_k=1) + + # Verify doc_type was added proactively + add_calls = mock_col.config.add_property.call_args_list + added_names = [call.args[0].name for call in add_calls] + assert "doc_type" in added_names + assert len(docs) == 1 + # Only one query call (no retry needed with proactive approach) + assert mock_col.query.near_vector.call_count == 1 + + # ------------------------------------------------------------------ # + # search_by_full_text # + # ------------------------------------------------------------------ # + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") def test_search_by_full_text_returns_doc_type_in_metadata(self, mock_weaviate_module): """Test that search_by_full_text also returns doc_type in document metadata.""" @@ -250,6 +482,17 @@ class TestWeaviateVector(unittest.TestCase): mock_result.objects = [mock_obj] mock_col.query.bm25.return_value = mock_result + # Pre-populate schema + mock_cfg = MagicMock() + mock_cfg.properties = [ + SimpleNamespace(name="text"), + SimpleNamespace(name="document_id"), + SimpleNamespace(name="doc_id"), + SimpleNamespace(name="doc_type"), + SimpleNamespace(name="chunk_index"), + ] + mock_col.config.get.return_value = mock_cfg + wv = WeaviateVector( collection_name=self.collection_name, config=self.config, @@ -268,6 +511,92 @@ class TestWeaviateVector(unittest.TestCase): assert len(docs) == 1 assert docs[0].metadata.get("doc_type") == "image" + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_search_by_full_text_calls_maybe_ensure_properties(self, mock_weaviate_module): + """search_by_full_text should proactively call _maybe_ensure_properties.""" + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + mock_client.collections.exists.return_value = True + mock_col = MagicMock() + mock_client.collections.use.return_value = mock_col + + mock_obj = MagicMock() + mock_obj.properties = {"text": "hello", "doc_id": "id_1"} + mock_obj.vector = {"default": [0.1] * 128} + mock_result = MagicMock() + mock_result.objects = [mock_obj] + mock_col.query.bm25.return_value = mock_result + + wv = WeaviateVector( + collection_name=self.collection_name, + config=self.config, + attributes=self.attributes, + ) + wv._maybe_ensure_properties = MagicMock() + wv.search_by_full_text(query="hello", top_k=1) + + wv._maybe_ensure_properties.assert_called_once() + + # ------------------------------------------------------------------ # + # text_exists # + # ------------------------------------------------------------------ # + + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_text_exists_calls_maybe_ensure_properties(self, mock_weaviate_module): + """text_exists should proactively call _maybe_ensure_properties.""" + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + mock_client.collections.exists.return_value = True + mock_col = MagicMock() + mock_client.collections.use.return_value = mock_col + + mock_result = MagicMock() + mock_result.objects = [] + mock_col.query.fetch_objects.return_value = mock_result + + wv = WeaviateVector( + collection_name=self.collection_name, + config=self.config, + attributes=self.attributes, + ) + wv._maybe_ensure_properties = MagicMock() + wv.text_exists("some_doc_id") + + wv._maybe_ensure_properties.assert_called_once() + + # ------------------------------------------------------------------ # + # delete_by_metadata_field # + # ------------------------------------------------------------------ # + + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") + def test_delete_by_metadata_field_calls_maybe_ensure_properties(self, mock_weaviate_module): + """delete_by_metadata_field should proactively call _maybe_ensure_properties.""" + mock_client = MagicMock() + mock_client.is_ready.return_value = True + mock_weaviate_module.connect_to_custom.return_value = mock_client + + mock_client.collections.exists.return_value = True + mock_col = MagicMock() + mock_client.collections.use.return_value = mock_col + + wv = WeaviateVector( + collection_name=self.collection_name, + config=self.config, + attributes=self.attributes, + ) + wv._maybe_ensure_properties = MagicMock() + wv.delete_by_metadata_field("annotation_id", "ann_123") + + wv._maybe_ensure_properties.assert_called_once() + + # ------------------------------------------------------------------ # + # add_texts # + # ------------------------------------------------------------------ # + @patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate") def test_add_texts_stores_doc_type_in_properties(self, mock_weaviate_module): """Test that add_texts includes doc_type from document metadata in stored properties.""" From 3d3666a9738a697d0cb234e7767d61d937a7bc52 Mon Sep 17 00:00:00 2001 From: RickDamon <719196134@qq.com> Date: Thu, 19 Mar 2026 20:52:46 +0800 Subject: [PATCH 2/2] fix: revert _get_uuids param name to 'documents' for clarity --- api/core/rag/datasource/vdb/weaviate/weaviate_vector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py index 9a7411ff61..24a2cfefea 100644 --- a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py +++ b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py @@ -309,7 +309,7 @@ class WeaviateVector(BaseVector): if self._ensure_properties(): _ensured_collections.add(self._collection_name) - def _get_uuids(self, texts: list[Document]) -> list[str]: + def _get_uuids(self, documents: list[Document]) -> list[str]: """ Generates deterministic UUIDs for documents based on their content. @@ -318,7 +318,7 @@ class WeaviateVector(BaseVector): URL_NAMESPACE = _uuid.UUID("6ba7b811-9dad-11d1-80b4-00c04fd430c8") uuids = [] - for doc in texts: + for doc in documents: uuid_val = _uuid.uuid5(URL_NAMESPACE, doc.page_content) uuids.append(str(uuid_val))