fix: reconnect Weaviate client when connection times out during idle

The Weaviate client connection can silently close after idle periods,
causing "WeaviateClient is closed" errors when performing recall tests
hours after knowledge base creation.

Add _ensure_connected() that checks client.is_connected() before each
operation and calls client.connect() to re-establish the connection.
Falls back to creating a fresh client if reconnection fails.

Closes #33771

Signed-off-by: Tan <alvinttang@gmail.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
gambletan 2026-03-20 10:19:49 +08:00
parent 35caa04fe7
commit e63a36940e
1 changed files with 35 additions and 0 deletions

View File

@ -78,6 +78,7 @@ class WeaviateVector(BaseVector):
attributes: List of metadata attributes to store
"""
super().__init__(collection_name)
self._config = config
self._client = self._init_client(config)
self._attributes = attributes
@ -137,6 +138,32 @@ class WeaviateVector(BaseVector):
return client
def _ensure_connected(self) -> None:
"""
Ensures the Weaviate client connection is active.
The Weaviate client connection can time out during idle periods (e.g. hours
between knowledge base creation and recall testing). This method checks the
connection state and reconnects if necessary, preventing
"WeaviateClient is closed" errors.
"""
if self._client.is_connected():
return
try:
self._client.connect()
logger.info("Reconnected to Weaviate (collection=%s)", self._collection_name)
except Exception:
logger.warning(
"Failed to reconnect existing Weaviate client, creating a new one (collection=%s)",
self._collection_name,
)
try:
self._client.close()
except Exception:
pass
self._client = self._init_client(self._config)
def get_type(self) -> str:
"""Returns the vector database type identifier."""
return VectorType.WEAVIATE
@ -173,6 +200,7 @@ class WeaviateVector(BaseVector):
Uses Redis locking to prevent concurrent creation attempts.
"""
self._ensure_connected()
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
cache_key = f"vector_indexing_{self._collection_name}"
@ -255,6 +283,7 @@ class WeaviateVector(BaseVector):
Batches insertions for efficiency and returns the list of inserted object IDs.
"""
self._ensure_connected()
uuids = self._get_uuids(documents)
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
@ -301,6 +330,7 @@ class WeaviateVector(BaseVector):
def delete_by_metadata_field(self, key: str, value: str) -> None:
"""Deletes all objects matching a specific metadata field value."""
self._ensure_connected()
if not self._client.collections.exists(self._collection_name):
return
@ -309,11 +339,13 @@ class WeaviateVector(BaseVector):
def delete(self):
"""Deletes the entire collection from Weaviate."""
self._ensure_connected()
if self._client.collections.exists(self._collection_name):
self._client.collections.delete(self._collection_name)
def text_exists(self, id: str) -> bool:
"""Checks if a document with the given doc_id exists in the collection."""
self._ensure_connected()
if not self._client.collections.exists(self._collection_name):
return False
@ -332,6 +364,7 @@ class WeaviateVector(BaseVector):
Silently ignores 404 errors for non-existent IDs.
"""
self._ensure_connected()
if not self._client.collections.exists(self._collection_name):
return
@ -351,6 +384,7 @@ class WeaviateVector(BaseVector):
Filters by document IDs if provided and applies score threshold.
Returns documents sorted by relevance score.
"""
self._ensure_connected()
if not self._client.collections.exists(self._collection_name):
return []
@ -398,6 +432,7 @@ class WeaviateVector(BaseVector):
Filters by document IDs if provided and returns matching documents with vectors.
"""
self._ensure_connected()
if not self._client.collections.exists(self._collection_name):
return []