From 658ac155893a69822eb581e0c83d51e0480a6e86 Mon Sep 17 00:00:00 2001 From: Yansong Zhang <916125788@qq.com> Date: Fri, 20 Mar 2026 13:29:22 +0800 Subject: [PATCH 1/3] use new quota system --- api/enums/quota_type.py | 227 ++++++++++++++---------- api/services/app_generate_service.py | 3 +- api/services/async_workflow_service.py | 26 +-- api/services/billing_service.py | 54 +++++- api/services/trigger/webhook_service.py | 19 +- api/tasks/trigger_processing_tasks.py | 5 +- api/tasks/workflow_schedule_tasks.py | 3 +- 7 files changed, 224 insertions(+), 113 deletions(-) diff --git a/api/enums/quota_type.py b/api/enums/quota_type.py index 9f511b88ef..305f5b9808 100644 --- a/api/enums/quota_type.py +++ b/api/enums/quota_type.py @@ -1,5 +1,6 @@ import logging -from dataclasses import dataclass +import uuid +from dataclasses import dataclass, field from enum import StrEnum, auto logger = logging.getLogger(__name__) @@ -8,49 +9,85 @@ logger = logging.getLogger(__name__) @dataclass class QuotaCharge: """ - Result of a quota consumption operation. + Result of a quota reservation (Reserve phase). - Attributes: - success: Whether the quota charge succeeded - charge_id: UUID for refund, or None if failed/disabled + Lifecycle: + charge = QuotaType.TRIGGER.consume(tenant_id) # Reserve + try: + do_work() + charge.commit() # Confirm consumption + except: + charge.refund() # Release frozen quota + + If neither commit() nor refund() is called, the billing system's + cleanup CronJob will auto-release the reservation within ~75 seconds. """ success: bool - charge_id: str | None + charge_id: str | None # reservation_id _quota_type: "QuotaType" + _tenant_id: str | None = None + _feature_key: str | None = None + _amount: int = 0 + _committed: bool = field(default=False, repr=False) + + def commit(self, actual_amount: int | None = None) -> None: + """ + Confirm the consumption with actual amount. + + Args: + actual_amount: Actual amount consumed. Defaults to the reserved amount. + If less than reserved, the difference is refunded automatically. + """ + if self._committed or not self.charge_id: + return + + try: + from services.billing_service import BillingService + + amount = actual_amount if actual_amount is not None else self._amount + BillingService.quota_commit( + tenant_id=self._tenant_id, + feature_key=self._feature_key, + reservation_id=self.charge_id, + actual_amount=amount, + ) + self._committed = True + logger.debug( + "Committed %s quota for tenant %s, reservation_id: %s, amount: %d", + self._quota_type.value, self._tenant_id, self.charge_id, amount, + ) + except Exception: + logger.exception("Failed to commit quota, reservation_id: %s", self.charge_id) def refund(self) -> None: """ - Refund this quota charge. + Release the reserved quota (cancel the charge). + + Safe to call even if: + - charge failed or was disabled (charge_id is None) + - already committed (Release after Commit is a no-op) + - already refunded (idempotent) - Safe to call even if charge failed or was disabled. This method guarantees no exceptions will be raised. """ - if self.charge_id: - self._quota_type.refund(self.charge_id) - logger.info("Refunded quota for %s with charge_id: %s", self._quota_type.value, self.charge_id) + if not self.charge_id or not self._tenant_id or not self._feature_key: + return + + self._quota_type.release(self.charge_id, self._tenant_id, self._feature_key) class QuotaType(StrEnum): """ Supported quota types for tenant feature usage. - - Add additional types here whenever new billable features become available. """ - # Trigger execution quota TRIGGER = auto() - - # Workflow execution quota WORKFLOW = auto() - UNLIMITED = auto() @property def billing_key(self) -> str: - """ - Get the billing key for the feature. - """ match self: case QuotaType.TRIGGER: return "trigger_event" @@ -61,14 +98,45 @@ class QuotaType(StrEnum): def consume(self, tenant_id: str, amount: int = 1) -> QuotaCharge: """ - Consume quota for the feature. + Consume quota using Reserve + immediate Commit. + + This is the simple one-shot mode: Reserve freezes quota, then Commit + confirms it right away. The returned QuotaCharge supports .refund() + which calls Release (idempotent even after Commit). + + For advanced two-phase usage (e.g. streaming), use reserve() directly + and call charge.commit() / charge.refund() manually. Args: tenant_id: The tenant identifier amount: Amount to consume (default: 1) Returns: - QuotaCharge with success status and charge_id for refund + QuotaCharge with reservation_id for potential refund + + Raises: + QuotaExceededError: When quota is insufficient + """ + charge = self.reserve(tenant_id, amount) + if charge.success and charge.charge_id: + charge.commit() + return charge + + def reserve(self, tenant_id: str, amount: int = 1) -> QuotaCharge: + """ + Reserve quota before task execution (Reserve phase only). + + The caller MUST call charge.commit() after the task succeeds, + or charge.refund() if the task fails. + + If neither is called, the reservation auto-expires in ~75 seconds. + + Args: + tenant_id: The tenant identifier + amount: Amount to reserve (default: 1) + + Returns: + QuotaCharge — call .commit() on success, .refund() on failure Raises: QuotaExceededError: When quota is insufficient @@ -81,51 +149,52 @@ class QuotaType(StrEnum): logger.debug("Billing disabled, allowing request for %s", tenant_id) return QuotaCharge(success=True, charge_id=None, _quota_type=self) - logger.info("Consuming %d %s quota for tenant %s", amount, self.value, tenant_id) + logger.info("Reserving %d %s quota for tenant %s", amount, self.value, tenant_id) if amount <= 0: - raise ValueError("Amount to consume must be greater than 0") + raise ValueError("Amount to reserve must be greater than 0") + + request_id = str(uuid.uuid4()) + feature_key = self.billing_key try: - response = BillingService.update_tenant_feature_plan_usage(tenant_id, self.billing_key, delta=amount) + reserve_resp = BillingService.quota_reserve( + tenant_id=tenant_id, + feature_key=feature_key, + request_id=request_id, + amount=amount, + ) - if response.get("result") != "success": + reservation_id = reserve_resp.get("reservation_id") + if not reservation_id: logger.warning( - "Failed to consume quota for %s, feature %s details: %s", - tenant_id, - self.value, - response.get("detail"), + "Reserve returned no reservation_id for %s, feature %s, response: %s", + tenant_id, self.value, reserve_resp, ) raise QuotaExceededError(feature=self.value, tenant_id=tenant_id, required=amount) - charge_id = response.get("history_id") logger.debug( - "Successfully consumed %d %s quota for tenant %s, charge_id: %s", - amount, - self.value, - tenant_id, - charge_id, + "Reserved %d %s quota for tenant %s, reservation_id: %s", + amount, self.value, tenant_id, reservation_id, + ) + return QuotaCharge( + success=True, + charge_id=reservation_id, + _quota_type=self, + _tenant_id=tenant_id, + _feature_key=feature_key, + _amount=amount, ) - return QuotaCharge(success=True, charge_id=charge_id, _quota_type=self) except QuotaExceededError: raise + except ValueError: + raise except Exception: - # fail-safe: allow request on billing errors - logger.exception("Failed to consume quota for %s, feature %s", tenant_id, self.value) + logger.exception("Failed to reserve quota for %s, feature %s", tenant_id, self.value) return unlimited() def check(self, tenant_id: str, amount: int = 1) -> bool: - """ - Check if tenant has sufficient quota without consuming. - - Args: - tenant_id: The tenant identifier - amount: Amount to check (default: 1) - - Returns: - True if quota is sufficient, False otherwise - """ from configs import dify_config if not dify_config.BILLING_ENABLED: @@ -139,18 +208,11 @@ class QuotaType(StrEnum): return remaining >= amount if remaining != -1 else True except Exception: logger.exception("Failed to check quota for %s, feature %s", tenant_id, self.value) - # fail-safe: allow request on billing errors return True - def refund(self, charge_id: str) -> None: + def release(self, reservation_id: str, tenant_id: str, feature_key: str) -> None: """ - Refund quota using charge_id from consume(). - - This method guarantees no exceptions will be raised. - All errors are logged but silently handled. - - Args: - charge_id: The UUID returned from consume() + Release a reservation. Guarantees no exceptions. """ try: from configs import dify_config @@ -159,51 +221,36 @@ class QuotaType(StrEnum): if not dify_config.BILLING_ENABLED: return - if not charge_id: - logger.warning("Cannot refund: charge_id is empty") + if not reservation_id: return - logger.info("Refunding %s quota with charge_id: %s", self.value, charge_id) - - response = BillingService.refund_tenant_feature_plan_usage(charge_id) - if response.get("result") == "success": - logger.debug("Successfully refunded %s quota, charge_id: %s", self.value, charge_id) - else: - logger.warning("Refund failed for charge_id: %s", charge_id) - + logger.info("Releasing %s quota, reservation_id: %s", self.value, reservation_id) + BillingService.quota_release( + tenant_id=tenant_id, + feature_key=feature_key, + reservation_id=reservation_id, + ) except Exception: - # Catch ALL exceptions - refund must never fail - logger.exception("Failed to refund quota for charge_id: %s", charge_id) - # Don't raise - refund is best-effort and must be silent + logger.exception("Failed to release quota, reservation_id: %s", reservation_id) def get_remaining(self, tenant_id: str) -> int: - """ - Get remaining quota for the tenant. - - Args: - tenant_id: The tenant identifier - - Returns: - Remaining quota amount - """ from services.billing_service import BillingService try: - usage_info = BillingService.get_tenant_feature_plan_usage(tenant_id, self.billing_key) - # Assuming the API returns a dict with 'remaining' or 'limit' and 'used' + usage_info = BillingService.get_tenant_feature_plan_usage_info(tenant_id) if isinstance(usage_info, dict): - return usage_info.get("remaining", 0) - # If it returns a simple number, treat it as remaining - return int(usage_info) if usage_info else 0 + feature_info = usage_info.get(self.billing_key, {}) + if isinstance(feature_info, dict): + limit = feature_info.get("limit", 0) + usage = feature_info.get("usage", 0) + if limit == -1: + return -1 + return max(0, limit - usage) + return 0 except Exception: logger.exception("Failed to get remaining quota for %s, feature %s", tenant_id, self.value) return -1 def unlimited() -> QuotaCharge: - """ - Return a quota charge for unlimited quota. - - This is useful for features that are not subject to quota limits, such as the UNLIMITED quota type. - """ return QuotaCharge(success=True, charge_id=None, _quota_type=QuotaType.UNLIMITED) diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index 40013f2b66..ce68a1dcba 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -106,7 +106,7 @@ class AppGenerateService: quota_charge = unlimited() if dify_config.BILLING_ENABLED: try: - quota_charge = QuotaType.WORKFLOW.consume(app_model.tenant_id) + quota_charge = QuotaType.WORKFLOW.reserve(app_model.tenant_id) except QuotaExceededError: raise InvokeRateLimitError(f"Workflow execution quota limit reached for tenant {app_model.tenant_id}") @@ -116,6 +116,7 @@ class AppGenerateService: request_id = RateLimit.gen_request_key() try: request_id = rate_limit.enter(request_id) + quota_charge.commit() if app_model.mode == AppMode.COMPLETION: return rate_limit.generate( CompletionAppGenerator.convert_to_event_stream( diff --git a/api/services/async_workflow_service.py b/api/services/async_workflow_service.py index 0133634e5a..5f7201de11 100644 --- a/api/services/async_workflow_service.py +++ b/api/services/async_workflow_service.py @@ -13,7 +13,7 @@ from celery.result import AsyncResult from sqlalchemy import select from sqlalchemy.orm import Session -from enums.quota_type import QuotaType +from enums.quota_type import QuotaType, unlimited from extensions.ext_database import db from models.account import Account from models.enums import CreatorUserRole, WorkflowTriggerStatus @@ -131,9 +131,10 @@ class AsyncWorkflowService: trigger_log = trigger_log_repo.create(trigger_log) session.commit() - # 7. Check and consume quota + # 7. Reserve quota (commit after successful dispatch) + quota_charge = unlimited() try: - QuotaType.WORKFLOW.consume(trigger_data.tenant_id) + quota_charge = QuotaType.WORKFLOW.reserve(trigger_data.tenant_id) except QuotaExceededError as e: # Update trigger log status trigger_log.status = WorkflowTriggerStatus.RATE_LIMITED @@ -153,13 +154,18 @@ class AsyncWorkflowService: # 9. Dispatch to appropriate queue task_data_dict = task_data.model_dump(mode="json") - task: AsyncResult[Any] | None = None - if queue_name == QueuePriority.PROFESSIONAL: - task = execute_workflow_professional.delay(task_data_dict) - elif queue_name == QueuePriority.TEAM: - task = execute_workflow_team.delay(task_data_dict) - else: # SANDBOX - task = execute_workflow_sandbox.delay(task_data_dict) + try: + task: AsyncResult[Any] | None = None + if queue_name == QueuePriority.PROFESSIONAL: + task = execute_workflow_professional.delay(task_data_dict) + elif queue_name == QueuePriority.TEAM: + task = execute_workflow_team.delay(task_data_dict) + else: # SANDBOX + task = execute_workflow_sandbox.delay(task_data_dict) + quota_charge.commit() + except Exception: + quota_charge.refund() + raise # 10. Update trigger log with task info trigger_log.status = WorkflowTriggerStatus.QUEUED diff --git a/api/services/billing_service.py b/api/services/billing_service.py index 5ab47c799a..4648d675c8 100644 --- a/api/services/billing_service.py +++ b/api/services/billing_service.py @@ -47,10 +47,60 @@ class BillingService: @classmethod def get_tenant_feature_plan_usage_info(cls, tenant_id: str): params = {"tenant_id": tenant_id} - - usage_info = cls._send_request("GET", "/tenant-feature-usage/info", params=params) + usage_info = cls._send_request("GET", "/quota/info", params=params) return usage_info + @classmethod + def quota_reserve( + cls, tenant_id: str, feature_key: str, request_id: str, amount: int = 1, meta: dict | None = None + ) -> dict: + """Reserve quota before task execution. + + Returns: + {"reservation_id": "uuid", "available": int, "reserved": int} + """ + payload = { + "tenant_id": tenant_id, + "feature_key": feature_key, + "request_id": request_id, + "amount": amount, + } + if meta: + payload["meta"] = meta + return cls._send_request("POST", "/quota/reserve", json=payload) + + @classmethod + def quota_commit( + cls, tenant_id: str, feature_key: str, reservation_id: str, actual_amount: int, meta: dict | None = None + ) -> dict: + """Commit a reservation with actual consumption. + + Returns: + {"available": int, "reserved": int, "refunded": int} + """ + payload = { + "tenant_id": tenant_id, + "feature_key": feature_key, + "reservation_id": reservation_id, + "actual_amount": actual_amount, + } + if meta: + payload["meta"] = meta + return cls._send_request("POST", "/quota/commit", json=payload) + + @classmethod + def quota_release(cls, tenant_id: str, feature_key: str, reservation_id: str) -> dict: + """Release a reservation (cancel, return frozen quota). + + Returns: + {"available": int, "reserved": int, "released": int} + """ + return cls._send_request("POST", "/quota/release", json={ + "tenant_id": tenant_id, + "feature_key": feature_key, + "reservation_id": reservation_id, + }) + @classmethod def get_knowledge_rate_limit(cls, tenant_id: str): params = {"tenant_id": tenant_id} diff --git a/api/services/trigger/webhook_service.py b/api/services/trigger/webhook_service.py index 3c1a4cc747..3bc64423b3 100644 --- a/api/services/trigger/webhook_service.py +++ b/api/services/trigger/webhook_service.py @@ -779,9 +779,9 @@ class WebhookService: user_id=None, ) - # consume quota before triggering workflow execution + # reserve quota before triggering workflow execution try: - QuotaType.TRIGGER.consume(webhook_trigger.tenant_id) + quota_charge = QuotaType.TRIGGER.reserve(webhook_trigger.tenant_id) except QuotaExceededError: AppTriggerService.mark_tenant_triggers_rate_limited(webhook_trigger.tenant_id) logger.info( @@ -792,11 +792,16 @@ class WebhookService: raise # Trigger workflow execution asynchronously - AsyncWorkflowService.trigger_workflow_async( - session, - end_user, - trigger_data, - ) + try: + AsyncWorkflowService.trigger_workflow_async( + session, + end_user, + trigger_data, + ) + quota_charge.commit() + except Exception: + quota_charge.refund() + raise except Exception: logger.exception("Failed to trigger workflow for webhook %s", webhook_trigger.webhook_id) diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 75ae1f6316..3a9a663759 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -298,10 +298,10 @@ def dispatch_triggered_workflow( icon_dark_filename=trigger_entity.identity.icon_dark or "", ) - # consume quota before invoking trigger + # reserve quota before invoking trigger quota_charge = unlimited() try: - quota_charge = QuotaType.TRIGGER.consume(subscription.tenant_id) + quota_charge = QuotaType.TRIGGER.reserve(subscription.tenant_id) except QuotaExceededError: AppTriggerService.mark_tenant_triggers_rate_limited(subscription.tenant_id) logger.info( @@ -387,6 +387,7 @@ def dispatch_triggered_workflow( raise ValueError(f"End user not found for app {plugin_trigger.app_id}") AsyncWorkflowService.trigger_workflow_async(session=session, user=end_user, trigger_data=trigger_data) + quota_charge.commit() dispatched_count += 1 logger.info( "Triggered workflow for app %s with trigger event %s", diff --git a/api/tasks/workflow_schedule_tasks.py b/api/tasks/workflow_schedule_tasks.py index 8c64d3ab27..9aa90c3793 100644 --- a/api/tasks/workflow_schedule_tasks.py +++ b/api/tasks/workflow_schedule_tasks.py @@ -43,7 +43,7 @@ def run_schedule_trigger(schedule_id: str) -> None: quota_charge = unlimited() try: - quota_charge = QuotaType.TRIGGER.consume(schedule.tenant_id) + quota_charge = QuotaType.TRIGGER.reserve(schedule.tenant_id) except QuotaExceededError: AppTriggerService.mark_tenant_triggers_rate_limited(schedule.tenant_id) logger.info("Tenant %s rate limited, skipping schedule trigger %s", schedule.tenant_id, schedule_id) @@ -61,6 +61,7 @@ def run_schedule_trigger(schedule_id: str) -> None: tenant_id=schedule.tenant_id, ), ) + quota_charge.commit() logger.info("Schedule %s triggered workflow: %s", schedule_id, response.workflow_trigger_log_id) except Exception as e: quota_charge.refund() From 3888969af3aae0ee9cdcc8dadf450456004fbd26 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 20 Mar 2026 05:45:30 +0000 Subject: [PATCH 2/3] [autofix.ci] apply automated fixes --- api/enums/quota_type.py | 14 +++++++++++--- api/services/billing_service.py | 14 +++++++++----- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/api/enums/quota_type.py b/api/enums/quota_type.py index 305f5b9808..4caf9ec98b 100644 --- a/api/enums/quota_type.py +++ b/api/enums/quota_type.py @@ -55,7 +55,10 @@ class QuotaCharge: self._committed = True logger.debug( "Committed %s quota for tenant %s, reservation_id: %s, amount: %d", - self._quota_type.value, self._tenant_id, self.charge_id, amount, + self._quota_type.value, + self._tenant_id, + self.charge_id, + amount, ) except Exception: logger.exception("Failed to commit quota, reservation_id: %s", self.charge_id) @@ -169,13 +172,18 @@ class QuotaType(StrEnum): if not reservation_id: logger.warning( "Reserve returned no reservation_id for %s, feature %s, response: %s", - tenant_id, self.value, reserve_resp, + tenant_id, + self.value, + reserve_resp, ) raise QuotaExceededError(feature=self.value, tenant_id=tenant_id, required=amount) logger.debug( "Reserved %d %s quota for tenant %s, reservation_id: %s", - amount, self.value, tenant_id, reservation_id, + amount, + self.value, + tenant_id, + reservation_id, ) return QuotaCharge( success=True, diff --git a/api/services/billing_service.py b/api/services/billing_service.py index 4648d675c8..2bddd3ab11 100644 --- a/api/services/billing_service.py +++ b/api/services/billing_service.py @@ -95,11 +95,15 @@ class BillingService: Returns: {"available": int, "reserved": int, "released": int} """ - return cls._send_request("POST", "/quota/release", json={ - "tenant_id": tenant_id, - "feature_key": feature_key, - "reservation_id": reservation_id, - }) + return cls._send_request( + "POST", + "/quota/release", + json={ + "tenant_id": tenant_id, + "feature_key": feature_key, + "reservation_id": reservation_id, + }, + ) @classmethod def get_knowledge_rate_limit(cls, tenant_id: str): From 80b4633e8f226511215ad8d1ab72f9bf96effa55 Mon Sep 17 00:00:00 2001 From: Yansong Zhang <916125788@qq.com> Date: Fri, 20 Mar 2026 14:58:31 +0800 Subject: [PATCH 3/3] fix style check and test --- api/enums/quota_type.py | 2 +- api/services/billing_service.py | 4 ++-- .../services/test_app_generate_service.py | 18 +++++++++++++----- .../services/test_app_generate_service.py | 11 ++++++----- .../services/test_async_workflow_service.py | 8 ++++++-- .../services/test_billing_service.py | 2 +- 6 files changed, 29 insertions(+), 16 deletions(-) diff --git a/api/enums/quota_type.py b/api/enums/quota_type.py index 4caf9ec98b..a2a7f689a5 100644 --- a/api/enums/quota_type.py +++ b/api/enums/quota_type.py @@ -39,7 +39,7 @@ class QuotaCharge: actual_amount: Actual amount consumed. Defaults to the reserved amount. If less than reserved, the difference is refunded automatically. """ - if self._committed or not self.charge_id: + if self._committed or not self.charge_id or not self._tenant_id or not self._feature_key: return try: diff --git a/api/services/billing_service.py b/api/services/billing_service.py index 2bddd3ab11..6fe61a1a52 100644 --- a/api/services/billing_service.py +++ b/api/services/billing_service.py @@ -59,7 +59,7 @@ class BillingService: Returns: {"reservation_id": "uuid", "available": int, "reserved": int} """ - payload = { + payload: dict = { "tenant_id": tenant_id, "feature_key": feature_key, "request_id": request_id, @@ -78,7 +78,7 @@ class BillingService: Returns: {"available": int, "reserved": int, "refunded": int} """ - payload = { + payload: dict = { "tenant_id": tenant_id, "feature_key": feature_key, "reservation_id": reservation_id, diff --git a/api/tests/test_containers_integration_tests/services/test_app_generate_service.py b/api/tests/test_containers_integration_tests/services/test_app_generate_service.py index 5b1a4790f5..eea9673710 100644 --- a/api/tests/test_containers_integration_tests/services/test_app_generate_service.py +++ b/api/tests/test_containers_integration_tests/services/test_app_generate_service.py @@ -39,9 +39,15 @@ class TestAppGenerateService: patch("configs.dify_config", autospec=True) as mock_global_dify_config, ): # Setup default mock returns for billing service - mock_billing_service.update_tenant_feature_plan_usage.return_value = { - "result": "success", - "history_id": "test_history_id", + mock_billing_service.quota_reserve.return_value = { + "reservation_id": "test-reservation-id", + "available": 100, + "reserved": 1, + } + mock_billing_service.quota_commit.return_value = { + "available": 99, + "reserved": 0, + "refunded": 0, } # Setup default mock returns for workflow service @@ -478,8 +484,10 @@ class TestAppGenerateService: # Verify the result assert result == ["test_response"] - # Verify billing service was called to consume quota - mock_external_service_dependencies["billing_service"].update_tenant_feature_plan_usage.assert_called_once() + # Verify billing two-phase quota (reserve + commit) + billing = mock_external_service_dependencies["billing_service"] + billing.quota_reserve.assert_called_once() + billing.quota_commit.assert_called_once() def test_generate_with_invalid_app_mode( self, db_session_with_containers: Session, mock_external_service_dependencies diff --git a/api/tests/unit_tests/services/test_app_generate_service.py b/api/tests/unit_tests/services/test_app_generate_service.py index c2b430c551..68ee6ae9d6 100644 --- a/api/tests/unit_tests/services/test_app_generate_service.py +++ b/api/tests/unit_tests/services/test_app_generate_service.py @@ -447,8 +447,8 @@ class TestGenerateBilling: def test_billing_enabled_consumes_quota(self, mocker, monkeypatch): monkeypatch.setattr(ags_module.dify_config, "BILLING_ENABLED", True) quota_charge = MagicMock() - consume_mock = mocker.patch( - "services.app_generate_service.QuotaType.WORKFLOW.consume", + reserve_mock = mocker.patch( + "services.app_generate_service.QuotaType.WORKFLOW.reserve", return_value=quota_charge, ) mocker.patch( @@ -467,7 +467,8 @@ class TestGenerateBilling: invoke_from=InvokeFrom.SERVICE_API, streaming=False, ) - consume_mock.assert_called_once_with("tenant-id") + reserve_mock.assert_called_once_with("tenant-id") + quota_charge.commit.assert_called_once() def test_billing_quota_exceeded_raises_rate_limit_error(self, mocker, monkeypatch): from services.errors.app import QuotaExceededError @@ -475,7 +476,7 @@ class TestGenerateBilling: monkeypatch.setattr(ags_module.dify_config, "BILLING_ENABLED", True) mocker.patch( - "services.app_generate_service.QuotaType.WORKFLOW.consume", + "services.app_generate_service.QuotaType.WORKFLOW.reserve", side_effect=QuotaExceededError(feature="workflow", tenant_id="t", required=1), ) @@ -492,7 +493,7 @@ class TestGenerateBilling: monkeypatch.setattr(ags_module.dify_config, "BILLING_ENABLED", True) quota_charge = MagicMock() mocker.patch( - "services.app_generate_service.QuotaType.WORKFLOW.consume", + "services.app_generate_service.QuotaType.WORKFLOW.reserve", return_value=quota_charge, ) mocker.patch( diff --git a/api/tests/unit_tests/services/test_async_workflow_service.py b/api/tests/unit_tests/services/test_async_workflow_service.py index 639e091041..ab83c8020f 100644 --- a/api/tests/unit_tests/services/test_async_workflow_service.py +++ b/api/tests/unit_tests/services/test_async_workflow_service.py @@ -146,6 +146,9 @@ class TestAsyncWorkflowService: mocks["team_task"].delay.return_value = task_result mocks["sandbox_task"].delay.return_value = task_result + quota_charge_mock = MagicMock() + mocks["quota_workflow"].reserve.return_value = quota_charge_mock + class DummyAccount: def __init__(self, user_id: str): self.id = user_id @@ -163,7 +166,8 @@ class TestAsyncWorkflowService: assert result.status == "queued" assert result.queue == queue_name - mocks["quota_workflow"].consume.assert_called_once_with("tenant-123") + mocks["quota_workflow"].reserve.assert_called_once_with("tenant-123") + quota_charge_mock.commit.assert_called_once() assert session.commit.call_count == 2 created_log = mocks["repo"].create.call_args[0][0] @@ -250,7 +254,7 @@ class TestAsyncWorkflowService: mocks = async_workflow_trigger_mocks mocks["dispatcher"].get_queue_name.return_value = QueuePriority.TEAM mocks["get_workflow"].return_value = workflow - mocks["quota_workflow"].consume.side_effect = QuotaExceededError( + mocks["quota_workflow"].reserve.side_effect = QuotaExceededError( feature="workflow", tenant_id="tenant-123", required=1, diff --git a/api/tests/unit_tests/services/test_billing_service.py b/api/tests/unit_tests/services/test_billing_service.py index eecb3c7672..135c2e9962 100644 --- a/api/tests/unit_tests/services/test_billing_service.py +++ b/api/tests/unit_tests/services/test_billing_service.py @@ -426,7 +426,7 @@ class TestBillingServiceUsageCalculation: # Assert assert result == expected_response - mock_send_request.assert_called_once_with("GET", "/tenant-feature-usage/info", params={"tenant_id": tenant_id}) + mock_send_request.assert_called_once_with("GET", "/quota/info", params={"tenant_id": tenant_id}) def test_update_tenant_feature_plan_usage_positive_delta(self, mock_send_request): """Test updating tenant feature usage with positive delta (adding credits)."""