This commit is contained in:
RickDamon 2026-03-24 23:14:07 +08:00 committed by GitHub
commit 0618880a2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 387 additions and 10 deletions

View File

@ -37,6 +37,12 @@ logger = logging.getLogger(__name__)
_weaviate_client: weaviate.WeaviateClient | None = None
_weaviate_client_lock = threading.Lock()
# Process-level cache: tracks collections whose required properties have been
# verified in this process lifetime. Guarded by _ensured_collections_lock so
# concurrent Celery/thread workers can share the set safely.
_ensured_collections: set[str] = set()
_ensured_collections_lock = threading.Lock()
def _shutdown_weaviate_client() -> None:
"""
@ -230,26 +236,35 @@ class WeaviateVector(BaseVector):
vector_config=wc.Configure.Vectors.self_provided(),
)
self._ensure_properties()
if self._ensure_properties():
# Mark collection as ensured so _maybe_ensure_properties() is
# a no-op for subsequent operations in this process.
_ensured_collections.add(self._collection_name)
redis_client.set(cache_key, 1, ex=3600)
except Exception as e:
logger.exception("Error creating collection %s", self._collection_name)
raise
def _ensure_properties(self) -> None:
"""
Ensures all required properties exist in the collection schema.
def _ensure_properties(self) -> bool:
"""Ensures all required properties exist in the collection schema.
Adds missing properties if the collection exists but lacks them.
Adds missing properties (document_id, doc_id, doc_type, chunk_index) if the
collection exists but lacks them. This handles backward compatibility for
collections created before these properties were introduced.
Returns ``True`` if all properties are confirmed present (either they
already existed or were successfully added). Returns ``False`` if
any property addition failed, signalling that the caller should NOT
cache this collection as "ensured".
"""
if not self._client.collections.exists(self._collection_name):
return
return True
col = self._client.collections.use(self._collection_name)
cfg = col.config.get()
existing = {p.name for p in (cfg.properties or [])}
to_add = []
to_add: list[wc.Property] = []
if "document_id" not in existing:
to_add.append(wc.Property(name="document_id", data_type=wc.DataType.TEXT))
if "doc_id" not in existing:
@ -259,11 +274,40 @@ class WeaviateVector(BaseVector):
if "chunk_index" not in existing:
to_add.append(wc.Property(name="chunk_index", data_type=wc.DataType.INT))
all_succeeded = True
for prop in to_add:
try:
col.config.add_property(prop)
except Exception as e:
logger.warning("Could not add property %s: %s", prop.name, e)
all_succeeded = False
return all_succeeded
def _maybe_ensure_properties(self) -> None:
"""One-time proactive schema check per collection per process lifetime.
On first access to a collection, verifies that all required properties
(document_id, doc_id, doc_type, chunk_index) exist in the schema. If any
are missing they are added transparently.
Subsequent calls for the same collection in this process are no-ops
(just a fast ``set`` membership test).
Uses double-checked locking so concurrent threads in the same process
don't duplicate work.
If property migration fails (e.g. Weaviate is temporarily unavailable),
the collection is *not* marked as ensured so that the next request will
retry the migration automatically.
"""
if self._collection_name in _ensured_collections:
return
with _ensured_collections_lock:
if self._collection_name in _ensured_collections:
return
if self._ensure_properties():
_ensured_collections.add(self._collection_name)
def _get_uuids(self, documents: list[Document]) -> list[str]:
"""
@ -335,6 +379,7 @@ class WeaviateVector(BaseVector):
if not self._client.collections.exists(self._collection_name):
return
self._maybe_ensure_properties()
col = self._client.collections.use(self._collection_name)
col.data.delete_many(where=Filter.by_property(key).equal(value))
@ -348,6 +393,7 @@ class WeaviateVector(BaseVector):
if not self._client.collections.exists(self._collection_name):
return False
self._maybe_ensure_properties()
col = self._client.collections.use(self._collection_name)
res = col.query.fetch_objects(
filters=Filter.by_property("doc_id").equal(id),
@ -385,6 +431,7 @@ class WeaviateVector(BaseVector):
if not self._client.collections.exists(self._collection_name):
return []
self._maybe_ensure_properties()
col = self._client.collections.use(self._collection_name)
props = list({*self._attributes, self._DOCUMENT_ID_PROPERTY, Field.TEXT_KEY.value})
@ -432,6 +479,7 @@ class WeaviateVector(BaseVector):
if not self._client.collections.exists(self._collection_name):
return []
self._maybe_ensure_properties()
col = self._client.collections.use(self._collection_name)
props = list({*self._attributes, Field.TEXT_KEY.value})

View File

@ -2,7 +2,8 @@
Focuses on verifying that doc_type is properly handled in:
- Collection schema creation (_create_collection)
- Property migration (_ensure_properties)
- Property migration (_ensure_properties / _maybe_ensure_properties)
- Proactive schema migration on first access per collection per process
- Vector search result metadata (search_by_vector)
- Full-text search result metadata (search_by_full_text)
"""
@ -21,6 +22,7 @@ class TestWeaviateVector(unittest.TestCase):
def setUp(self):
weaviate_vector_module._weaviate_client = None
weaviate_vector_module._ensured_collections.clear()
self.config = WeaviateConfig(
endpoint="http://localhost:8080",
api_key="test-key",
@ -31,6 +33,7 @@ class TestWeaviateVector(unittest.TestCase):
def tearDown(self):
weaviate_vector_module._weaviate_client = None
weaviate_vector_module._ensured_collections.clear()
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def _create_weaviate_vector(self, mock_weaviate_module):
@ -111,6 +114,9 @@ class TestWeaviateVector(unittest.TestCase):
f"doc_type should be in collection schema properties, got: {property_names}"
)
# Verify collection is marked as ensured after _create_collection
assert self.collection_name in weaviate_vector_module._ensured_collections
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_ensure_properties_adds_missing_doc_type(self, mock_weaviate_module):
"""Test that _ensure_properties adds doc_type when it's missing from existing schema."""
@ -139,12 +145,13 @@ class TestWeaviateVector(unittest.TestCase):
config=self.config,
attributes=self.attributes,
)
wv._ensure_properties()
result = wv._ensure_properties()
# Verify add_property was called and includes doc_type
add_calls = mock_col.config.add_property.call_args_list
added_names = [call.args[0].name for call in add_calls]
assert "doc_type" in added_names, f"doc_type should be added to existing collection, added: {added_names}"
assert result is True
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_ensure_properties_skips_existing_doc_type(self, mock_weaviate_module):
@ -174,10 +181,148 @@ class TestWeaviateVector(unittest.TestCase):
config=self.config,
attributes=self.attributes,
)
wv._ensure_properties()
result = wv._ensure_properties()
# No properties should be added
mock_col.config.add_property.assert_not_called()
assert result is True
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_ensure_properties_returns_false_on_failure(self, mock_weaviate_module):
"""_ensure_properties should return False when add_property fails."""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
mock_client.collections.exists.return_value = True
mock_col = MagicMock()
mock_client.collections.use.return_value = mock_col
# Schema missing doc_type → will attempt add_property
existing_props = [
SimpleNamespace(name="text"),
SimpleNamespace(name="document_id"),
SimpleNamespace(name="doc_id"),
SimpleNamespace(name="chunk_index"),
]
mock_cfg = MagicMock()
mock_cfg.properties = existing_props
mock_col.config.get.return_value = mock_cfg
# Simulate Weaviate failure during add_property
mock_col.config.add_property.side_effect = Exception("Weaviate unavailable")
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
attributes=self.attributes,
)
result = wv._ensure_properties()
assert result is False, "Should return False when property addition fails"
# ------------------------------------------------------------------ #
# _maybe_ensure_properties: proactive migration with process cache #
# ------------------------------------------------------------------ #
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_maybe_ensure_properties_calls_ensure_on_first_access(self, mock_weaviate_module):
"""First call to _maybe_ensure_properties should delegate to _ensure_properties."""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
attributes=self.attributes,
)
wv._ensure_properties = MagicMock(return_value=True)
wv._maybe_ensure_properties()
wv._ensure_properties.assert_called_once()
assert self.collection_name in weaviate_vector_module._ensured_collections
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_maybe_ensure_properties_skips_on_second_access(self, mock_weaviate_module):
"""Subsequent calls to _maybe_ensure_properties should be no-ops."""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
attributes=self.attributes,
)
wv._ensure_properties = MagicMock(return_value=True)
# First call — should invoke _ensure_properties
wv._maybe_ensure_properties()
# Second call — should be a no-op
wv._maybe_ensure_properties()
wv._ensure_properties.assert_called_once()
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_maybe_ensure_properties_retries_on_failure(self, mock_weaviate_module):
"""When _ensure_properties returns False, the collection should NOT be cached.
The next call must retry _ensure_properties so that transient failures
(e.g. Weaviate temporarily unavailable) don't permanently prevent migration.
"""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
attributes=self.attributes,
)
# First call fails
wv._ensure_properties = MagicMock(side_effect=[False, True])
wv._maybe_ensure_properties()
assert self.collection_name not in weaviate_vector_module._ensured_collections
# Second call succeeds → now cached
wv._maybe_ensure_properties()
assert self.collection_name in weaviate_vector_module._ensured_collections
assert wv._ensure_properties.call_count == 2
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_maybe_ensure_properties_independent_per_collection(self, mock_weaviate_module):
"""Different collections should each get their own one-time check."""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
wv1 = WeaviateVector(
collection_name="Collection_A",
config=self.config,
attributes=self.attributes,
)
wv2 = WeaviateVector(
collection_name="Collection_B",
config=self.config,
attributes=self.attributes,
)
wv1._ensure_properties = MagicMock(return_value=True)
wv2._ensure_properties = MagicMock(return_value=True)
wv1._maybe_ensure_properties()
wv2._maybe_ensure_properties()
wv1._ensure_properties.assert_called_once()
wv2._ensure_properties.assert_called_once()
assert "Collection_A" in weaviate_vector_module._ensured_collections
assert "Collection_B" in weaviate_vector_module._ensured_collections
# ------------------------------------------------------------------ #
# search_by_vector #
# ------------------------------------------------------------------ #
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_search_by_vector_returns_doc_type_in_metadata(self, mock_weaviate_module):
@ -210,6 +355,17 @@ class TestWeaviateVector(unittest.TestCase):
mock_result.objects = [mock_obj]
mock_col.query.near_vector.return_value = mock_result
# Pre-populate schema so _maybe_ensure_properties is a no-op
mock_cfg = MagicMock()
mock_cfg.properties = [
SimpleNamespace(name="text"),
SimpleNamespace(name="document_id"),
SimpleNamespace(name="doc_id"),
SimpleNamespace(name="doc_type"),
SimpleNamespace(name="chunk_index"),
]
mock_col.config.get.return_value = mock_cfg
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
@ -226,6 +382,82 @@ class TestWeaviateVector(unittest.TestCase):
assert len(docs) == 1
assert docs[0].metadata.get("doc_type") == "image"
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_search_by_vector_calls_maybe_ensure_properties(self, mock_weaviate_module):
"""search_by_vector should proactively call _maybe_ensure_properties."""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
mock_client.collections.exists.return_value = True
mock_col = MagicMock()
mock_client.collections.use.return_value = mock_col
mock_obj = MagicMock()
mock_obj.properties = {"text": "hello", "doc_id": "id_1"}
mock_obj.metadata.distance = 0.2
mock_result = MagicMock()
mock_result.objects = [mock_obj]
mock_col.query.near_vector.return_value = mock_result
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
attributes=self.attributes,
)
wv._maybe_ensure_properties = MagicMock()
wv.search_by_vector(query_vector=[0.1] * 128, top_k=1)
wv._maybe_ensure_properties.assert_called_once()
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_search_by_vector_migrates_schema_on_first_access_to_old_collection(self, mock_weaviate_module):
"""search_by_vector on a pre-upgrade collection should proactively add missing properties."""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
mock_client.collections.exists.return_value = True
mock_col = MagicMock()
mock_client.collections.use.return_value = mock_col
mock_obj = MagicMock()
mock_obj.properties = {"text": "hello world", "doc_id": "id_1"}
mock_obj.metadata.distance = 0.2
mock_result = MagicMock()
mock_result.objects = [mock_obj]
mock_col.query.near_vector.return_value = mock_result
# Schema is missing doc_type
existing_props = [
SimpleNamespace(name="text"),
SimpleNamespace(name="document_id"),
SimpleNamespace(name="doc_id"),
SimpleNamespace(name="chunk_index"),
]
mock_cfg = MagicMock()
mock_cfg.properties = existing_props
mock_col.config.get.return_value = mock_cfg
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
attributes=self.attributes,
)
docs = wv.search_by_vector(query_vector=[0.1] * 128, top_k=1)
# Verify doc_type was added proactively
add_calls = mock_col.config.add_property.call_args_list
added_names = [call.args[0].name for call in add_calls]
assert "doc_type" in added_names
assert len(docs) == 1
# Only one query call (no retry needed with proactive approach)
assert mock_col.query.near_vector.call_count == 1
# ------------------------------------------------------------------ #
# search_by_full_text #
# ------------------------------------------------------------------ #
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_search_by_full_text_returns_doc_type_in_metadata(self, mock_weaviate_module):
"""Test that search_by_full_text also returns doc_type in document metadata."""
@ -250,6 +482,17 @@ class TestWeaviateVector(unittest.TestCase):
mock_result.objects = [mock_obj]
mock_col.query.bm25.return_value = mock_result
# Pre-populate schema
mock_cfg = MagicMock()
mock_cfg.properties = [
SimpleNamespace(name="text"),
SimpleNamespace(name="document_id"),
SimpleNamespace(name="doc_id"),
SimpleNamespace(name="doc_type"),
SimpleNamespace(name="chunk_index"),
]
mock_col.config.get.return_value = mock_cfg
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
@ -268,6 +511,92 @@ class TestWeaviateVector(unittest.TestCase):
assert len(docs) == 1
assert docs[0].metadata.get("doc_type") == "image"
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_search_by_full_text_calls_maybe_ensure_properties(self, mock_weaviate_module):
"""search_by_full_text should proactively call _maybe_ensure_properties."""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
mock_client.collections.exists.return_value = True
mock_col = MagicMock()
mock_client.collections.use.return_value = mock_col
mock_obj = MagicMock()
mock_obj.properties = {"text": "hello", "doc_id": "id_1"}
mock_obj.vector = {"default": [0.1] * 128}
mock_result = MagicMock()
mock_result.objects = [mock_obj]
mock_col.query.bm25.return_value = mock_result
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
attributes=self.attributes,
)
wv._maybe_ensure_properties = MagicMock()
wv.search_by_full_text(query="hello", top_k=1)
wv._maybe_ensure_properties.assert_called_once()
# ------------------------------------------------------------------ #
# text_exists #
# ------------------------------------------------------------------ #
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_text_exists_calls_maybe_ensure_properties(self, mock_weaviate_module):
"""text_exists should proactively call _maybe_ensure_properties."""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
mock_client.collections.exists.return_value = True
mock_col = MagicMock()
mock_client.collections.use.return_value = mock_col
mock_result = MagicMock()
mock_result.objects = []
mock_col.query.fetch_objects.return_value = mock_result
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
attributes=self.attributes,
)
wv._maybe_ensure_properties = MagicMock()
wv.text_exists("some_doc_id")
wv._maybe_ensure_properties.assert_called_once()
# ------------------------------------------------------------------ #
# delete_by_metadata_field #
# ------------------------------------------------------------------ #
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_delete_by_metadata_field_calls_maybe_ensure_properties(self, mock_weaviate_module):
"""delete_by_metadata_field should proactively call _maybe_ensure_properties."""
mock_client = MagicMock()
mock_client.is_ready.return_value = True
mock_weaviate_module.connect_to_custom.return_value = mock_client
mock_client.collections.exists.return_value = True
mock_col = MagicMock()
mock_client.collections.use.return_value = mock_col
wv = WeaviateVector(
collection_name=self.collection_name,
config=self.config,
attributes=self.attributes,
)
wv._maybe_ensure_properties = MagicMock()
wv.delete_by_metadata_field("annotation_id", "ann_123")
wv._maybe_ensure_properties.assert_called_once()
# ------------------------------------------------------------------ #
# add_texts #
# ------------------------------------------------------------------ #
@patch("core.rag.datasource.vdb.weaviate.weaviate_vector.weaviate")
def test_add_texts_stores_doc_type_in_properties(self, mock_weaviate_module):
"""Test that add_texts includes doc_type from document metadata in stored properties."""