diff --git a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py index b48dd93f04..4cf7afa1f2 100644 --- a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py +++ b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py @@ -78,6 +78,7 @@ class WeaviateVector(BaseVector): attributes: List of metadata attributes to store """ super().__init__(collection_name) + self._config = config self._client = self._init_client(config) self._attributes = attributes @@ -137,6 +138,32 @@ class WeaviateVector(BaseVector): return client + def _ensure_connected(self) -> None: + """ + Ensures the Weaviate client connection is active. + + The Weaviate client connection can time out during idle periods (e.g. hours + between knowledge base creation and recall testing). This method checks the + connection state and reconnects if necessary, preventing + "WeaviateClient is closed" errors. + """ + if self._client.is_connected(): + return + + try: + self._client.connect() + logger.info("Reconnected to Weaviate (collection=%s)", self._collection_name) + except Exception: + logger.warning( + "Failed to reconnect existing Weaviate client, creating a new one (collection=%s)", + self._collection_name, + ) + try: + self._client.close() + except Exception: + pass + self._client = self._init_client(self._config) + def get_type(self) -> str: """Returns the vector database type identifier.""" return VectorType.WEAVIATE @@ -173,6 +200,7 @@ class WeaviateVector(BaseVector): Uses Redis locking to prevent concurrent creation attempts. """ + self._ensure_connected() lock_name = f"vector_indexing_lock_{self._collection_name}" with redis_client.lock(lock_name, timeout=20): cache_key = f"vector_indexing_{self._collection_name}" @@ -255,6 +283,7 @@ class WeaviateVector(BaseVector): Batches insertions for efficiency and returns the list of inserted object IDs. """ + self._ensure_connected() uuids = self._get_uuids(documents) texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] @@ -301,6 +330,7 @@ class WeaviateVector(BaseVector): def delete_by_metadata_field(self, key: str, value: str) -> None: """Deletes all objects matching a specific metadata field value.""" + self._ensure_connected() if not self._client.collections.exists(self._collection_name): return @@ -309,11 +339,13 @@ class WeaviateVector(BaseVector): def delete(self): """Deletes the entire collection from Weaviate.""" + self._ensure_connected() if self._client.collections.exists(self._collection_name): self._client.collections.delete(self._collection_name) def text_exists(self, id: str) -> bool: """Checks if a document with the given doc_id exists in the collection.""" + self._ensure_connected() if not self._client.collections.exists(self._collection_name): return False @@ -332,6 +364,7 @@ class WeaviateVector(BaseVector): Silently ignores 404 errors for non-existent IDs. """ + self._ensure_connected() if not self._client.collections.exists(self._collection_name): return @@ -351,6 +384,7 @@ class WeaviateVector(BaseVector): Filters by document IDs if provided and applies score threshold. Returns documents sorted by relevance score. """ + self._ensure_connected() if not self._client.collections.exists(self._collection_name): return [] @@ -398,6 +432,7 @@ class WeaviateVector(BaseVector): Filters by document IDs if provided and returns matching documents with vectors. """ + self._ensure_connected() if not self._client.collections.exists(self._collection_name): return []