diff --git a/api/tests/test_containers_integration_tests/services/test_dataset_service.py b/api/tests/test_containers_integration_tests/services/test_dataset_service.py index 513ec13e87..47201e0852 100644 --- a/api/tests/test_containers_integration_tests/services/test_dataset_service.py +++ b/api/tests/test_containers_integration_tests/services/test_dataset_service.py @@ -709,83 +709,101 @@ class TestDatasetServiceRetrievalConfiguration: class TestDocumentServicePauseRecoverRetry: - """Tests for pause/recover/retry orchestration with mocked non-DB collaborators.""" + """Tests for pause/recover/retry orchestration using real DB and Redis.""" - @pytest.fixture - def mock_deps(self): - with ( - patch("services.dataset_service.redis_client") as mock_redis, - patch("services.dataset_service.db.session") as mock_db, - patch("services.dataset_service.current_user") as mock_current_user, - ): - mock_current_user.id = "user-123" - yield { - "redis_client": mock_redis, - "db_session": mock_db, - "current_user": mock_current_user, - } - - @staticmethod - def _make_document(document_id="doc-123", dataset_id="dataset-123", indexing_status="completed", is_paused=False): - doc = Mock(spec=Document) - doc.id = document_id - doc.dataset_id = dataset_id + def _create_indexing_document(self, db_session_with_containers, indexing_status="indexing"): + factory = DatasetServiceIntegrationDataFactory + account, tenant = factory.create_account_with_tenant(db_session_with_containers) + dataset = factory.create_dataset(db_session_with_containers, tenant.id, account.id) + doc = factory.create_document(db_session_with_containers, dataset, account.id) doc.indexing_status = indexing_status - doc.is_paused = is_paused - doc.paused_by = None - doc.paused_at = None - return doc + db_session_with_containers.commit() + return doc, account - def test_pause_document_success(self, mock_deps): + def test_pause_document_success(self, db_session_with_containers): + from extensions.ext_redis import redis_client from services.dataset_service import DocumentService - document = self._make_document(indexing_status="indexing") - DocumentService.pause_document(document) + doc, account = self._create_indexing_document(db_session_with_containers, indexing_status="indexing") - assert document.is_paused is True - assert document.paused_by == "user-123" - mock_deps["db_session"].add.assert_called_once_with(document) - mock_deps["db_session"].commit.assert_called_once() - mock_deps["redis_client"].setnx.assert_called_once_with(f"document_{document.id}_is_paused", "True") + with patch("services.dataset_service.current_user") as mock_user: + mock_user.id = account.id + DocumentService.pause_document(doc) - def test_pause_document_invalid_status_error(self, mock_deps): + db_session_with_containers.refresh(doc) + assert doc.is_paused is True + assert doc.paused_by == account.id + assert doc.paused_at is not None + + cache_key = f"document_{doc.id}_is_paused" + assert redis_client.get(cache_key) is not None + redis_client.delete(cache_key) + + def test_pause_document_invalid_status_error(self, db_session_with_containers): from services.dataset_service import DocumentService from services.errors.document import DocumentIndexingError - document = self._make_document(indexing_status="completed") - with pytest.raises(DocumentIndexingError): - DocumentService.pause_document(document) + doc, account = self._create_indexing_document(db_session_with_containers, indexing_status="completed") - def test_recover_document_success(self, mock_deps): + with patch("services.dataset_service.current_user") as mock_user: + mock_user.id = account.id + with pytest.raises(DocumentIndexingError): + DocumentService.pause_document(doc) + + def test_recover_document_success(self, db_session_with_containers): + from extensions.ext_redis import redis_client from services.dataset_service import DocumentService - document = self._make_document(indexing_status="indexing", is_paused=True) + doc, account = self._create_indexing_document(db_session_with_containers, indexing_status="indexing") + + # Pause first + with patch("services.dataset_service.current_user") as mock_user: + mock_user.id = account.id + DocumentService.pause_document(doc) + + # Recover with patch("services.dataset_service.recover_document_indexing_task") as recover_task: - DocumentService.recover_document(document) + DocumentService.recover_document(doc) - assert document.is_paused is False - assert document.paused_by is None - assert document.paused_at is None - mock_deps["db_session"].add.assert_called_once_with(document) - mock_deps["db_session"].commit.assert_called_once() - mock_deps["redis_client"].delete.assert_called_once_with(f"document_{document.id}_is_paused") - recover_task.delay.assert_called_once_with(document.dataset_id, document.id) + db_session_with_containers.refresh(doc) + assert doc.is_paused is False + assert doc.paused_by is None + assert doc.paused_at is None - def test_retry_document_indexing_success(self, mock_deps): + cache_key = f"document_{doc.id}_is_paused" + assert redis_client.get(cache_key) is None + recover_task.delay.assert_called_once_with(doc.dataset_id, doc.id) + + def test_retry_document_indexing_success(self, db_session_with_containers): + from extensions.ext_redis import redis_client from services.dataset_service import DocumentService - dataset_id = "dataset-123" - documents = [ - self._make_document(document_id="doc-1", indexing_status="error"), - self._make_document(document_id="doc-2", indexing_status="error"), - ] - mock_deps["redis_client"].get.return_value = None + factory = DatasetServiceIntegrationDataFactory + account, tenant = factory.create_account_with_tenant(db_session_with_containers) + dataset = factory.create_dataset(db_session_with_containers, tenant.id, account.id) + doc1 = factory.create_document(db_session_with_containers, dataset, account.id, name="doc1.txt") + doc2 = factory.create_document(db_session_with_containers, dataset, account.id, name="doc2.txt") + doc2.position = 2 + doc1.indexing_status = "error" + doc2.indexing_status = "error" + db_session_with_containers.commit() - with patch("services.dataset_service.retry_document_indexing_task") as retry_task: - DocumentService.retry_document(dataset_id, documents) + with ( + patch("services.dataset_service.current_user") as mock_user, + patch("services.dataset_service.retry_document_indexing_task") as retry_task, + ): + mock_user.id = account.id + DocumentService.retry_document(dataset.id, [doc1, doc2]) - assert all(doc.indexing_status == "waiting" for doc in documents) - assert mock_deps["db_session"].add.call_count == 2 - assert mock_deps["db_session"].commit.call_count == 2 - assert mock_deps["redis_client"].setex.call_count == 2 - retry_task.delay.assert_called_once_with(dataset_id, ["doc-1", "doc-2"], "user-123") + db_session_with_containers.refresh(doc1) + db_session_with_containers.refresh(doc2) + assert doc1.indexing_status == "waiting" + assert doc2.indexing_status == "waiting" + + # Verify redis keys were set + assert redis_client.get(f"document_{doc1.id}_is_retried") is not None + assert redis_client.get(f"document_{doc2.id}_is_retried") is not None + retry_task.delay.assert_called_once_with(dataset.id, [doc1.id, doc2.id], account.id) + + # Cleanup + redis_client.delete(f"document_{doc1.id}_is_retried", f"document_{doc2.id}_is_retried")