This commit is contained in:
zyssyz123 2026-03-24 13:26:07 +08:00 committed by GitHub
commit f742a54d71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 257 additions and 121 deletions

View File

@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass
import uuid
from dataclasses import dataclass, field
from enum import StrEnum, auto
logger = logging.getLogger(__name__)
@ -8,49 +9,88 @@ logger = logging.getLogger(__name__)
@dataclass
class QuotaCharge:
"""
Result of a quota consumption operation.
Result of a quota reservation (Reserve phase).
Attributes:
success: Whether the quota charge succeeded
charge_id: UUID for refund, or None if failed/disabled
Lifecycle:
charge = QuotaType.TRIGGER.consume(tenant_id) # Reserve
try:
do_work()
charge.commit() # Confirm consumption
except:
charge.refund() # Release frozen quota
If neither commit() nor refund() is called, the billing system's
cleanup CronJob will auto-release the reservation within ~75 seconds.
"""
success: bool
charge_id: str | None
charge_id: str | None # reservation_id
_quota_type: "QuotaType"
_tenant_id: str | None = None
_feature_key: str | None = None
_amount: int = 0
_committed: bool = field(default=False, repr=False)
def commit(self, actual_amount: int | None = None) -> None:
"""
Confirm the consumption with actual amount.
Args:
actual_amount: Actual amount consumed. Defaults to the reserved amount.
If less than reserved, the difference is refunded automatically.
"""
if self._committed or not self.charge_id or not self._tenant_id or not self._feature_key:
return
try:
from services.billing_service import BillingService
amount = actual_amount if actual_amount is not None else self._amount
BillingService.quota_commit(
tenant_id=self._tenant_id,
feature_key=self._feature_key,
reservation_id=self.charge_id,
actual_amount=amount,
)
self._committed = True
logger.debug(
"Committed %s quota for tenant %s, reservation_id: %s, amount: %d",
self._quota_type.value,
self._tenant_id,
self.charge_id,
amount,
)
except Exception:
logger.exception("Failed to commit quota, reservation_id: %s", self.charge_id)
def refund(self) -> None:
"""
Refund this quota charge.
Release the reserved quota (cancel the charge).
Safe to call even if:
- charge failed or was disabled (charge_id is None)
- already committed (Release after Commit is a no-op)
- already refunded (idempotent)
Safe to call even if charge failed or was disabled.
This method guarantees no exceptions will be raised.
"""
if self.charge_id:
self._quota_type.refund(self.charge_id)
logger.info("Refunded quota for %s with charge_id: %s", self._quota_type.value, self.charge_id)
if not self.charge_id or not self._tenant_id or not self._feature_key:
return
self._quota_type.release(self.charge_id, self._tenant_id, self._feature_key)
class QuotaType(StrEnum):
"""
Supported quota types for tenant feature usage.
Add additional types here whenever new billable features become available.
"""
# Trigger execution quota
TRIGGER = auto()
# Workflow execution quota
WORKFLOW = auto()
UNLIMITED = auto()
@property
def billing_key(self) -> str:
"""
Get the billing key for the feature.
"""
match self:
case QuotaType.TRIGGER:
return "trigger_event"
@ -61,14 +101,45 @@ class QuotaType(StrEnum):
def consume(self, tenant_id: str, amount: int = 1) -> QuotaCharge:
"""
Consume quota for the feature.
Consume quota using Reserve + immediate Commit.
This is the simple one-shot mode: Reserve freezes quota, then Commit
confirms it right away. The returned QuotaCharge supports .refund()
which calls Release (idempotent even after Commit).
For advanced two-phase usage (e.g. streaming), use reserve() directly
and call charge.commit() / charge.refund() manually.
Args:
tenant_id: The tenant identifier
amount: Amount to consume (default: 1)
Returns:
QuotaCharge with success status and charge_id for refund
QuotaCharge with reservation_id for potential refund
Raises:
QuotaExceededError: When quota is insufficient
"""
charge = self.reserve(tenant_id, amount)
if charge.success and charge.charge_id:
charge.commit()
return charge
def reserve(self, tenant_id: str, amount: int = 1) -> QuotaCharge:
"""
Reserve quota before task execution (Reserve phase only).
The caller MUST call charge.commit() after the task succeeds,
or charge.refund() if the task fails.
If neither is called, the reservation auto-expires in ~75 seconds.
Args:
tenant_id: The tenant identifier
amount: Amount to reserve (default: 1)
Returns:
QuotaCharge call .commit() on success, .refund() on failure
Raises:
QuotaExceededError: When quota is insufficient
@ -81,51 +152,57 @@ class QuotaType(StrEnum):
logger.debug("Billing disabled, allowing request for %s", tenant_id)
return QuotaCharge(success=True, charge_id=None, _quota_type=self)
logger.info("Consuming %d %s quota for tenant %s", amount, self.value, tenant_id)
logger.info("Reserving %d %s quota for tenant %s", amount, self.value, tenant_id)
if amount <= 0:
raise ValueError("Amount to consume must be greater than 0")
raise ValueError("Amount to reserve must be greater than 0")
request_id = str(uuid.uuid4())
feature_key = self.billing_key
try:
response = BillingService.update_tenant_feature_plan_usage(tenant_id, self.billing_key, delta=amount)
reserve_resp = BillingService.quota_reserve(
tenant_id=tenant_id,
feature_key=feature_key,
request_id=request_id,
amount=amount,
)
if response.get("result") != "success":
reservation_id = reserve_resp.get("reservation_id")
if not reservation_id:
logger.warning(
"Failed to consume quota for %s, feature %s details: %s",
"Reserve returned no reservation_id for %s, feature %s, response: %s",
tenant_id,
self.value,
response.get("detail"),
reserve_resp,
)
raise QuotaExceededError(feature=self.value, tenant_id=tenant_id, required=amount)
charge_id = response.get("history_id")
logger.debug(
"Successfully consumed %d %s quota for tenant %s, charge_id: %s",
"Reserved %d %s quota for tenant %s, reservation_id: %s",
amount,
self.value,
tenant_id,
charge_id,
reservation_id,
)
return QuotaCharge(
success=True,
charge_id=reservation_id,
_quota_type=self,
_tenant_id=tenant_id,
_feature_key=feature_key,
_amount=amount,
)
return QuotaCharge(success=True, charge_id=charge_id, _quota_type=self)
except QuotaExceededError:
raise
except ValueError:
raise
except Exception:
# fail-safe: allow request on billing errors
logger.exception("Failed to consume quota for %s, feature %s", tenant_id, self.value)
logger.exception("Failed to reserve quota for %s, feature %s", tenant_id, self.value)
return unlimited()
def check(self, tenant_id: str, amount: int = 1) -> bool:
"""
Check if tenant has sufficient quota without consuming.
Args:
tenant_id: The tenant identifier
amount: Amount to check (default: 1)
Returns:
True if quota is sufficient, False otherwise
"""
from configs import dify_config
if not dify_config.BILLING_ENABLED:
@ -139,18 +216,11 @@ class QuotaType(StrEnum):
return remaining >= amount if remaining != -1 else True
except Exception:
logger.exception("Failed to check quota for %s, feature %s", tenant_id, self.value)
# fail-safe: allow request on billing errors
return True
def refund(self, charge_id: str) -> None:
def release(self, reservation_id: str, tenant_id: str, feature_key: str) -> None:
"""
Refund quota using charge_id from consume().
This method guarantees no exceptions will be raised.
All errors are logged but silently handled.
Args:
charge_id: The UUID returned from consume()
Release a reservation. Guarantees no exceptions.
"""
try:
from configs import dify_config
@ -159,51 +229,36 @@ class QuotaType(StrEnum):
if not dify_config.BILLING_ENABLED:
return
if not charge_id:
logger.warning("Cannot refund: charge_id is empty")
if not reservation_id:
return
logger.info("Refunding %s quota with charge_id: %s", self.value, charge_id)
response = BillingService.refund_tenant_feature_plan_usage(charge_id)
if response.get("result") == "success":
logger.debug("Successfully refunded %s quota, charge_id: %s", self.value, charge_id)
else:
logger.warning("Refund failed for charge_id: %s", charge_id)
logger.info("Releasing %s quota, reservation_id: %s", self.value, reservation_id)
BillingService.quota_release(
tenant_id=tenant_id,
feature_key=feature_key,
reservation_id=reservation_id,
)
except Exception:
# Catch ALL exceptions - refund must never fail
logger.exception("Failed to refund quota for charge_id: %s", charge_id)
# Don't raise - refund is best-effort and must be silent
logger.exception("Failed to release quota, reservation_id: %s", reservation_id)
def get_remaining(self, tenant_id: str) -> int:
"""
Get remaining quota for the tenant.
Args:
tenant_id: The tenant identifier
Returns:
Remaining quota amount
"""
from services.billing_service import BillingService
try:
usage_info = BillingService.get_tenant_feature_plan_usage(tenant_id, self.billing_key)
# Assuming the API returns a dict with 'remaining' or 'limit' and 'used'
usage_info = BillingService.get_tenant_feature_plan_usage_info(tenant_id)
if isinstance(usage_info, dict):
return usage_info.get("remaining", 0)
# If it returns a simple number, treat it as remaining
return int(usage_info) if usage_info else 0
feature_info = usage_info.get(self.billing_key, {})
if isinstance(feature_info, dict):
limit = feature_info.get("limit", 0)
usage = feature_info.get("usage", 0)
if limit == -1:
return -1
return max(0, limit - usage)
return 0
except Exception:
logger.exception("Failed to get remaining quota for %s, feature %s", tenant_id, self.value)
return -1
def unlimited() -> QuotaCharge:
"""
Return a quota charge for unlimited quota.
This is useful for features that are not subject to quota limits, such as the UNLIMITED quota type.
"""
return QuotaCharge(success=True, charge_id=None, _quota_type=QuotaType.UNLIMITED)

View File

@ -106,7 +106,7 @@ class AppGenerateService:
quota_charge = unlimited()
if dify_config.BILLING_ENABLED:
try:
quota_charge = QuotaType.WORKFLOW.consume(app_model.tenant_id)
quota_charge = QuotaType.WORKFLOW.reserve(app_model.tenant_id)
except QuotaExceededError:
raise InvokeRateLimitError(f"Workflow execution quota limit reached for tenant {app_model.tenant_id}")
@ -116,6 +116,7 @@ class AppGenerateService:
request_id = RateLimit.gen_request_key()
try:
request_id = rate_limit.enter(request_id)
quota_charge.commit()
if app_model.mode == AppMode.COMPLETION:
return rate_limit.generate(
CompletionAppGenerator.convert_to_event_stream(

View File

@ -13,7 +13,7 @@ from celery.result import AsyncResult
from sqlalchemy import select
from sqlalchemy.orm import Session
from enums.quota_type import QuotaType
from enums.quota_type import QuotaType, unlimited
from extensions.ext_database import db
from models.account import Account
from models.enums import CreatorUserRole, WorkflowTriggerStatus
@ -131,9 +131,10 @@ class AsyncWorkflowService:
trigger_log = trigger_log_repo.create(trigger_log)
session.commit()
# 7. Check and consume quota
# 7. Reserve quota (commit after successful dispatch)
quota_charge = unlimited()
try:
QuotaType.WORKFLOW.consume(trigger_data.tenant_id)
quota_charge = QuotaType.WORKFLOW.reserve(trigger_data.tenant_id)
except QuotaExceededError as e:
# Update trigger log status
trigger_log.status = WorkflowTriggerStatus.RATE_LIMITED
@ -153,6 +154,7 @@ class AsyncWorkflowService:
# 9. Dispatch to appropriate queue
task_data_dict = task_data.model_dump(mode="json")
try:
task: AsyncResult[Any] | None = None
if queue_name == QueuePriority.PROFESSIONAL:
task = execute_workflow_professional.delay(task_data_dict)
@ -160,6 +162,10 @@ class AsyncWorkflowService:
task = execute_workflow_team.delay(task_data_dict)
else: # SANDBOX
task = execute_workflow_sandbox.delay(task_data_dict)
quota_charge.commit()
except Exception:
quota_charge.refund()
raise
# 10. Update trigger log with task info
trigger_log.status = WorkflowTriggerStatus.QUEUED

View File

@ -47,10 +47,64 @@ class BillingService:
@classmethod
def get_tenant_feature_plan_usage_info(cls, tenant_id: str):
params = {"tenant_id": tenant_id}
usage_info = cls._send_request("GET", "/tenant-feature-usage/info", params=params)
usage_info = cls._send_request("GET", "/quota/info", params=params)
return usage_info
@classmethod
def quota_reserve(
cls, tenant_id: str, feature_key: str, request_id: str, amount: int = 1, meta: dict | None = None
) -> dict:
"""Reserve quota before task execution.
Returns:
{"reservation_id": "uuid", "available": int, "reserved": int}
"""
payload: dict = {
"tenant_id": tenant_id,
"feature_key": feature_key,
"request_id": request_id,
"amount": amount,
}
if meta:
payload["meta"] = meta
return cls._send_request("POST", "/quota/reserve", json=payload)
@classmethod
def quota_commit(
cls, tenant_id: str, feature_key: str, reservation_id: str, actual_amount: int, meta: dict | None = None
) -> dict:
"""Commit a reservation with actual consumption.
Returns:
{"available": int, "reserved": int, "refunded": int}
"""
payload: dict = {
"tenant_id": tenant_id,
"feature_key": feature_key,
"reservation_id": reservation_id,
"actual_amount": actual_amount,
}
if meta:
payload["meta"] = meta
return cls._send_request("POST", "/quota/commit", json=payload)
@classmethod
def quota_release(cls, tenant_id: str, feature_key: str, reservation_id: str) -> dict:
"""Release a reservation (cancel, return frozen quota).
Returns:
{"available": int, "reserved": int, "released": int}
"""
return cls._send_request(
"POST",
"/quota/release",
json={
"tenant_id": tenant_id,
"feature_key": feature_key,
"reservation_id": reservation_id,
},
)
@classmethod
def get_knowledge_rate_limit(cls, tenant_id: str):
params = {"tenant_id": tenant_id}

View File

@ -779,9 +779,9 @@ class WebhookService:
user_id=None,
)
# consume quota before triggering workflow execution
# reserve quota before triggering workflow execution
try:
QuotaType.TRIGGER.consume(webhook_trigger.tenant_id)
quota_charge = QuotaType.TRIGGER.reserve(webhook_trigger.tenant_id)
except QuotaExceededError:
AppTriggerService.mark_tenant_triggers_rate_limited(webhook_trigger.tenant_id)
logger.info(
@ -792,11 +792,16 @@ class WebhookService:
raise
# Trigger workflow execution asynchronously
try:
AsyncWorkflowService.trigger_workflow_async(
session,
end_user,
trigger_data,
)
quota_charge.commit()
except Exception:
quota_charge.refund()
raise
except Exception:
logger.exception("Failed to trigger workflow for webhook %s", webhook_trigger.webhook_id)

View File

@ -298,10 +298,10 @@ def dispatch_triggered_workflow(
icon_dark_filename=trigger_entity.identity.icon_dark or "",
)
# consume quota before invoking trigger
# reserve quota before invoking trigger
quota_charge = unlimited()
try:
quota_charge = QuotaType.TRIGGER.consume(subscription.tenant_id)
quota_charge = QuotaType.TRIGGER.reserve(subscription.tenant_id)
except QuotaExceededError:
AppTriggerService.mark_tenant_triggers_rate_limited(subscription.tenant_id)
logger.info(
@ -387,6 +387,7 @@ def dispatch_triggered_workflow(
raise ValueError(f"End user not found for app {plugin_trigger.app_id}")
AsyncWorkflowService.trigger_workflow_async(session=session, user=end_user, trigger_data=trigger_data)
quota_charge.commit()
dispatched_count += 1
logger.info(
"Triggered workflow for app %s with trigger event %s",

View File

@ -43,7 +43,7 @@ def run_schedule_trigger(schedule_id: str) -> None:
quota_charge = unlimited()
try:
quota_charge = QuotaType.TRIGGER.consume(schedule.tenant_id)
quota_charge = QuotaType.TRIGGER.reserve(schedule.tenant_id)
except QuotaExceededError:
AppTriggerService.mark_tenant_triggers_rate_limited(schedule.tenant_id)
logger.info("Tenant %s rate limited, skipping schedule trigger %s", schedule.tenant_id, schedule_id)
@ -61,6 +61,7 @@ def run_schedule_trigger(schedule_id: str) -> None:
tenant_id=schedule.tenant_id,
),
)
quota_charge.commit()
logger.info("Schedule %s triggered workflow: %s", schedule_id, response.workflow_trigger_log_id)
except Exception as e:
quota_charge.refund()

View File

@ -39,9 +39,15 @@ class TestAppGenerateService:
patch("configs.dify_config", autospec=True) as mock_global_dify_config,
):
# Setup default mock returns for billing service
mock_billing_service.update_tenant_feature_plan_usage.return_value = {
"result": "success",
"history_id": "test_history_id",
mock_billing_service.quota_reserve.return_value = {
"reservation_id": "test-reservation-id",
"available": 100,
"reserved": 1,
}
mock_billing_service.quota_commit.return_value = {
"available": 99,
"reserved": 0,
"refunded": 0,
}
# Setup default mock returns for workflow service
@ -478,8 +484,10 @@ class TestAppGenerateService:
# Verify the result
assert result == ["test_response"]
# Verify billing service was called to consume quota
mock_external_service_dependencies["billing_service"].update_tenant_feature_plan_usage.assert_called_once()
# Verify billing two-phase quota (reserve + commit)
billing = mock_external_service_dependencies["billing_service"]
billing.quota_reserve.assert_called_once()
billing.quota_commit.assert_called_once()
def test_generate_with_invalid_app_mode(
self, db_session_with_containers: Session, mock_external_service_dependencies

View File

@ -447,8 +447,8 @@ class TestGenerateBilling:
def test_billing_enabled_consumes_quota(self, mocker, monkeypatch):
monkeypatch.setattr(ags_module.dify_config, "BILLING_ENABLED", True)
quota_charge = MagicMock()
consume_mock = mocker.patch(
"services.app_generate_service.QuotaType.WORKFLOW.consume",
reserve_mock = mocker.patch(
"services.app_generate_service.QuotaType.WORKFLOW.reserve",
return_value=quota_charge,
)
mocker.patch(
@ -467,7 +467,8 @@ class TestGenerateBilling:
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
)
consume_mock.assert_called_once_with("tenant-id")
reserve_mock.assert_called_once_with("tenant-id")
quota_charge.commit.assert_called_once()
def test_billing_quota_exceeded_raises_rate_limit_error(self, mocker, monkeypatch):
from services.errors.app import QuotaExceededError
@ -475,7 +476,7 @@ class TestGenerateBilling:
monkeypatch.setattr(ags_module.dify_config, "BILLING_ENABLED", True)
mocker.patch(
"services.app_generate_service.QuotaType.WORKFLOW.consume",
"services.app_generate_service.QuotaType.WORKFLOW.reserve",
side_effect=QuotaExceededError(feature="workflow", tenant_id="t", required=1),
)
@ -492,7 +493,7 @@ class TestGenerateBilling:
monkeypatch.setattr(ags_module.dify_config, "BILLING_ENABLED", True)
quota_charge = MagicMock()
mocker.patch(
"services.app_generate_service.QuotaType.WORKFLOW.consume",
"services.app_generate_service.QuotaType.WORKFLOW.reserve",
return_value=quota_charge,
)
mocker.patch(

View File

@ -146,6 +146,9 @@ class TestAsyncWorkflowService:
mocks["team_task"].delay.return_value = task_result
mocks["sandbox_task"].delay.return_value = task_result
quota_charge_mock = MagicMock()
mocks["quota_workflow"].reserve.return_value = quota_charge_mock
class DummyAccount:
def __init__(self, user_id: str):
self.id = user_id
@ -163,7 +166,8 @@ class TestAsyncWorkflowService:
assert result.status == "queued"
assert result.queue == queue_name
mocks["quota_workflow"].consume.assert_called_once_with("tenant-123")
mocks["quota_workflow"].reserve.assert_called_once_with("tenant-123")
quota_charge_mock.commit.assert_called_once()
assert session.commit.call_count == 2
created_log = mocks["repo"].create.call_args[0][0]
@ -250,7 +254,7 @@ class TestAsyncWorkflowService:
mocks = async_workflow_trigger_mocks
mocks["dispatcher"].get_queue_name.return_value = QueuePriority.TEAM
mocks["get_workflow"].return_value = workflow
mocks["quota_workflow"].consume.side_effect = QuotaExceededError(
mocks["quota_workflow"].reserve.side_effect = QuotaExceededError(
feature="workflow",
tenant_id="tenant-123",
required=1,

View File

@ -426,7 +426,7 @@ class TestBillingServiceUsageCalculation:
# Assert
assert result == expected_response
mock_send_request.assert_called_once_with("GET", "/tenant-feature-usage/info", params={"tenant_id": tenant_id})
mock_send_request.assert_called_once_with("GET", "/quota/info", params={"tenant_id": tenant_id})
def test_update_tenant_feature_plan_usage_positive_delta(self, mock_send_request):
"""Test updating tenant feature usage with positive delta (adding credits)."""