diff --git a/api/commands/retention.py b/api/commands/retention.py index 5a91c1cc70..82a77ea77a 100644 --- a/api/commands/retention.py +++ b/api/commands/retention.py @@ -88,6 +88,8 @@ def clean_workflow_runs( """ Clean workflow runs and related workflow data for free tenants. """ + from extensions.otel.runtime import flush_telemetry + if (start_from is None) ^ (end_before is None): raise click.UsageError("--start-from and --end-before must be provided together.") @@ -104,16 +106,27 @@ def clean_workflow_runs( end_before = now - datetime.timedelta(days=to_days_ago) before_days = 0 + if from_days_ago is not None and to_days_ago is not None: + task_label = f"{from_days_ago}to{to_days_ago}" + elif start_from is None: + task_label = f"before-{before_days}" + else: + task_label = "custom" + start_time = datetime.datetime.now(datetime.UTC) click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white")) - WorkflowRunCleanup( - days=before_days, - batch_size=batch_size, - start_from=start_from, - end_before=end_before, - dry_run=dry_run, - ).run() + try: + WorkflowRunCleanup( + days=before_days, + batch_size=batch_size, + start_from=start_from, + end_before=end_before, + dry_run=dry_run, + task_label=task_label, + ).run() + finally: + flush_telemetry() end_time = datetime.datetime.now(datetime.UTC) elapsed = end_time - start_time @@ -659,6 +672,8 @@ def clean_expired_messages( """ Clean expired messages and related data for tenants based on clean policy. """ + from extensions.otel.runtime import flush_telemetry + click.echo(click.style("clean_messages: start clean messages.", fg="green")) start_at = time.perf_counter() @@ -698,6 +713,13 @@ def clean_expired_messages( # NOTE: graceful_period will be ignored when billing is disabled. policy = create_message_clean_policy(graceful_period_days=graceful_period) + if from_days_ago is not None and before_days is not None: + task_label = f"{from_days_ago}to{before_days}" + elif start_from is None and before_days is not None: + task_label = f"before-{before_days}" + else: + task_label = "custom" + # Create and run the cleanup service if abs_mode: assert start_from is not None @@ -708,6 +730,7 @@ def clean_expired_messages( end_before=end_before, batch_size=batch_size, dry_run=dry_run, + task_label=task_label, ) elif from_days_ago is None: assert before_days is not None @@ -716,6 +739,7 @@ def clean_expired_messages( days=before_days, batch_size=batch_size, dry_run=dry_run, + task_label=task_label, ) else: assert before_days is not None @@ -727,6 +751,7 @@ def clean_expired_messages( end_before=now - datetime.timedelta(days=before_days), batch_size=batch_size, dry_run=dry_run, + task_label=task_label, ) stats = service.run() @@ -752,6 +777,8 @@ def clean_expired_messages( ) ) raise + finally: + flush_telemetry() click.echo(click.style("messages cleanup completed.", fg="green")) diff --git a/api/extensions/otel/runtime.py b/api/extensions/otel/runtime.py index b1c703f944..149d76b07b 100644 --- a/api/extensions/otel/runtime.py +++ b/api/extensions/otel/runtime.py @@ -5,7 +5,7 @@ from typing import Union from celery.signals import worker_init from flask_login import user_loaded_from_request, user_logged_in -from opentelemetry import trace +from opentelemetry import metrics, trace from opentelemetry.propagate import set_global_textmap from opentelemetry.propagators.b3 import B3MultiFormat from opentelemetry.propagators.composite import CompositePropagator @@ -31,9 +31,29 @@ def setup_context_propagation() -> None: def shutdown_tracer() -> None: + flush_telemetry() + + +def flush_telemetry() -> None: + """ + Best-effort flush for telemetry providers. + + This is mainly used by short-lived command processes (e.g. Kubernetes CronJob) + so counters/histograms are exported before the process exits. + """ provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): - provider.force_flush() + try: + provider.force_flush() + except Exception: + logger.exception("otel: failed to flush trace provider") + + metric_provider = metrics.get_meter_provider() + if hasattr(metric_provider, "force_flush"): + try: + metric_provider.force_flush() + except Exception: + logger.exception("otel: failed to flush metric provider") def is_celery_worker(): diff --git a/api/services/retention/conversation/messages_clean_service.py b/api/services/retention/conversation/messages_clean_service.py index 04265817d7..48c3e72af0 100644 --- a/api/services/retention/conversation/messages_clean_service.py +++ b/api/services/retention/conversation/messages_clean_service.py @@ -1,16 +1,16 @@ import datetime import logging -import os import random import time from collections.abc import Sequence -from typing import cast +from typing import TYPE_CHECKING, cast import sqlalchemy as sa from sqlalchemy import delete, select, tuple_ from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session +from configs import dify_config from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from models.model import ( @@ -33,6 +33,131 @@ from services.retention.conversation.messages_clean_policy import ( logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from opentelemetry.metrics import Counter, Histogram + + +class MessagesCleanupMetrics: + """ + Records low-cardinality OpenTelemetry metrics for expired message cleanup jobs. + + We keep labels stable (dry_run/window_mode/task_label/status) so these metrics remain + dashboard-friendly for long-running CronJob executions. + """ + + _job_runs_total: "Counter | None" + _batches_total: "Counter | None" + _messages_scanned_total: "Counter | None" + _messages_filtered_total: "Counter | None" + _messages_deleted_total: "Counter | None" + _job_duration_seconds: "Histogram | None" + _batch_duration_seconds: "Histogram | None" + _base_attributes: dict[str, str] + + def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None: + self._job_runs_total = None + self._batches_total = None + self._messages_scanned_total = None + self._messages_filtered_total = None + self._messages_deleted_total = None + self._job_duration_seconds = None + self._batch_duration_seconds = None + self._base_attributes = { + "job_name": "messages_cleanup", + "dry_run": str(dry_run).lower(), + "window_mode": "between" if has_window else "before_cutoff", + "task_label": task_label, + } + self._init_instruments() + + def _init_instruments(self) -> None: + if not dify_config.ENABLE_OTEL: + return + + try: + from opentelemetry.metrics import get_meter + + meter = get_meter("messages_cleanup", version=dify_config.project.version) + self._job_runs_total = meter.create_counter( + "messages_cleanup_jobs_total", + description="Total number of expired message cleanup jobs by status.", + unit="{job}", + ) + self._batches_total = meter.create_counter( + "messages_cleanup_batches_total", + description="Total number of message cleanup batches processed.", + unit="{batch}", + ) + self._messages_scanned_total = meter.create_counter( + "messages_cleanup_scanned_messages_total", + description="Total messages scanned by cleanup jobs.", + unit="{message}", + ) + self._messages_filtered_total = meter.create_counter( + "messages_cleanup_filtered_messages_total", + description="Total messages selected by cleanup policy.", + unit="{message}", + ) + self._messages_deleted_total = meter.create_counter( + "messages_cleanup_deleted_messages_total", + description="Total messages deleted by cleanup jobs.", + unit="{message}", + ) + self._job_duration_seconds = meter.create_histogram( + "messages_cleanup_job_duration_seconds", + description="Duration of expired message cleanup jobs in seconds.", + unit="s", + ) + self._batch_duration_seconds = meter.create_histogram( + "messages_cleanup_batch_duration_seconds", + description="Duration of expired message cleanup batch processing in seconds.", + unit="s", + ) + except Exception: + logger.exception("messages_cleanup_metrics: failed to initialize instruments") + + def _attrs(self, **extra: str) -> dict[str, str]: + return {**self._base_attributes, **extra} + + @staticmethod + def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None: + if not counter or value <= 0: + return + try: + counter.add(value, attributes) + except Exception: + logger.exception("messages_cleanup_metrics: failed to add counter value") + + @staticmethod + def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None: + if not histogram: + return + try: + histogram.record(value, attributes) + except Exception: + logger.exception("messages_cleanup_metrics: failed to record histogram value") + + def record_batch( + self, + *, + scanned_messages: int, + filtered_messages: int, + deleted_messages: int, + batch_duration_seconds: float, + ) -> None: + attributes = self._attrs() + self._add(self._batches_total, 1, attributes) + self._add(self._messages_scanned_total, scanned_messages, attributes) + self._add(self._messages_filtered_total, filtered_messages, attributes) + self._add(self._messages_deleted_total, deleted_messages, attributes) + self._record(self._batch_duration_seconds, batch_duration_seconds, attributes) + + def record_completion(self, *, status: str, job_duration_seconds: float) -> None: + attributes = self._attrs(status=status) + self._add(self._job_runs_total, 1, attributes) + self._record(self._job_duration_seconds, job_duration_seconds, attributes) + + class MessagesCleanService: """ Service for cleaning expired messages based on retention policies. @@ -48,6 +173,7 @@ class MessagesCleanService: start_from: datetime.datetime | None = None, batch_size: int = 1000, dry_run: bool = False, + task_label: str = "custom", ) -> None: """ Initialize the service with cleanup parameters. @@ -58,12 +184,18 @@ class MessagesCleanService: start_from: Optional start time (inclusive) of the range batch_size: Number of messages to process per batch dry_run: Whether to perform a dry run (no actual deletion) + task_label: Optional task label for retention metrics """ self._policy = policy self._end_before = end_before self._start_from = start_from self._batch_size = batch_size self._dry_run = dry_run + self._metrics = MessagesCleanupMetrics( + dry_run=dry_run, + has_window=bool(start_from), + task_label=task_label, + ) @classmethod def from_time_range( @@ -73,6 +205,7 @@ class MessagesCleanService: end_before: datetime.datetime, batch_size: int = 1000, dry_run: bool = False, + task_label: str = "custom", ) -> "MessagesCleanService": """ Create a service instance for cleaning messages within a specific time range. @@ -85,6 +218,7 @@ class MessagesCleanService: end_before: End time (exclusive) of the range batch_size: Number of messages to process per batch dry_run: Whether to perform a dry run (no actual deletion) + task_label: Optional task label for retention metrics Returns: MessagesCleanService instance @@ -112,6 +246,7 @@ class MessagesCleanService: start_from=start_from, batch_size=batch_size, dry_run=dry_run, + task_label=task_label, ) @classmethod @@ -121,6 +256,7 @@ class MessagesCleanService: days: int = 30, batch_size: int = 1000, dry_run: bool = False, + task_label: str = "custom", ) -> "MessagesCleanService": """ Create a service instance for cleaning messages older than specified days. @@ -130,6 +266,7 @@ class MessagesCleanService: days: Number of days to look back from now batch_size: Number of messages to process per batch dry_run: Whether to perform a dry run (no actual deletion) + task_label: Optional task label for retention metrics Returns: MessagesCleanService instance @@ -153,7 +290,14 @@ class MessagesCleanService: policy.__class__.__name__, ) - return cls(policy=policy, end_before=end_before, start_from=None, batch_size=batch_size, dry_run=dry_run) + return cls( + policy=policy, + end_before=end_before, + start_from=None, + batch_size=batch_size, + dry_run=dry_run, + task_label=task_label, + ) def run(self) -> dict[str, int]: """ @@ -162,7 +306,18 @@ class MessagesCleanService: Returns: Dict with statistics: batches, filtered_messages, total_deleted """ - return self._clean_messages_by_time_range() + status = "success" + run_start = time.monotonic() + try: + return self._clean_messages_by_time_range() + except Exception: + status = "failed" + raise + finally: + self._metrics.record_completion( + status=status, + job_duration_seconds=time.monotonic() - run_start, + ) def _clean_messages_by_time_range(self) -> dict[str, int]: """ @@ -197,11 +352,14 @@ class MessagesCleanService: self._end_before, ) - max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200)) + max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL while True: stats["batches"] += 1 batch_start = time.monotonic() + batch_scanned_messages = 0 + batch_filtered_messages = 0 + batch_deleted_messages = 0 # Step 1: Fetch a batch of messages using cursor with Session(db.engine, expire_on_commit=False) as session: @@ -240,9 +398,16 @@ class MessagesCleanService: # Track total messages fetched across all batches stats["total_messages"] += len(messages) + batch_scanned_messages = len(messages) if not messages: logger.info("clean_messages (batch %s): no more messages to process", stats["batches"]) + self._metrics.record_batch( + scanned_messages=batch_scanned_messages, + filtered_messages=batch_filtered_messages, + deleted_messages=batch_deleted_messages, + batch_duration_seconds=time.monotonic() - batch_start, + ) break # Update cursor to the last message's (created_at, id) @@ -268,6 +433,12 @@ class MessagesCleanService: if not apps: logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"]) + self._metrics.record_batch( + scanned_messages=batch_scanned_messages, + filtered_messages=batch_filtered_messages, + deleted_messages=batch_deleted_messages, + batch_duration_seconds=time.monotonic() - batch_start, + ) continue # Build app_id -> tenant_id mapping @@ -286,9 +457,16 @@ class MessagesCleanService: if not message_ids_to_delete: logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"]) + self._metrics.record_batch( + scanned_messages=batch_scanned_messages, + filtered_messages=batch_filtered_messages, + deleted_messages=batch_deleted_messages, + batch_duration_seconds=time.monotonic() - batch_start, + ) continue stats["filtered_messages"] += len(message_ids_to_delete) + batch_filtered_messages = len(message_ids_to_delete) # Step 4: Batch delete messages and their relations if not self._dry_run: @@ -309,6 +487,7 @@ class MessagesCleanService: commit_ms = int((time.monotonic() - commit_start) * 1000) stats["total_deleted"] += messages_deleted + batch_deleted_messages = messages_deleted logger.info( "clean_messages (batch %s): processed %s messages, deleted %s messages", @@ -343,6 +522,13 @@ class MessagesCleanService: for msg_id in sampled_ids: logger.info("clean_messages (batch %s, dry_run) sample: message_id=%s", stats["batches"], msg_id) + self._metrics.record_batch( + scanned_messages=batch_scanned_messages, + filtered_messages=batch_filtered_messages, + deleted_messages=batch_deleted_messages, + batch_duration_seconds=time.monotonic() - batch_start, + ) + logger.info( "clean_messages completed: total batches: %s, total messages: %s, filtered messages: %s, total deleted: %s", stats["batches"], diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index 2c94cb5324..62bc9f5f10 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -1,9 +1,9 @@ import datetime import logging -import os import random import time from collections.abc import Iterable, Sequence +from typing import TYPE_CHECKING import click from sqlalchemy.orm import Session, sessionmaker @@ -20,6 +20,159 @@ from services.billing_service import BillingService, SubscriptionPlan logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from opentelemetry.metrics import Counter, Histogram + + +class WorkflowRunCleanupMetrics: + """ + Records low-cardinality OpenTelemetry metrics for workflow run cleanup jobs. + + Metrics are emitted with stable labels only (dry_run/window_mode/task_label/status) + to keep dashboard and alert cardinality predictable in production clusters. + """ + + _job_runs_total: "Counter | None" + _batches_total: "Counter | None" + _runs_scanned_total: "Counter | None" + _runs_targeted_total: "Counter | None" + _runs_deleted_total: "Counter | None" + _runs_skipped_total: "Counter | None" + _related_records_total: "Counter | None" + _job_duration_seconds: "Histogram | None" + _batch_duration_seconds: "Histogram | None" + _base_attributes: dict[str, str] + + def __init__(self, *, dry_run: bool, has_window: bool, task_label: str) -> None: + self._job_runs_total = None + self._batches_total = None + self._runs_scanned_total = None + self._runs_targeted_total = None + self._runs_deleted_total = None + self._runs_skipped_total = None + self._related_records_total = None + self._job_duration_seconds = None + self._batch_duration_seconds = None + self._base_attributes = { + "job_name": "workflow_run_cleanup", + "dry_run": str(dry_run).lower(), + "window_mode": "between" if has_window else "before_cutoff", + "task_label": task_label, + } + self._init_instruments() + + def _init_instruments(self) -> None: + if not dify_config.ENABLE_OTEL: + return + + try: + from opentelemetry.metrics import get_meter + + meter = get_meter("workflow_run_cleanup", version=dify_config.project.version) + self._job_runs_total = meter.create_counter( + "workflow_run_cleanup_jobs_total", + description="Total number of workflow run cleanup jobs by status.", + unit="{job}", + ) + self._batches_total = meter.create_counter( + "workflow_run_cleanup_batches_total", + description="Total number of processed cleanup batches.", + unit="{batch}", + ) + self._runs_scanned_total = meter.create_counter( + "workflow_run_cleanup_scanned_runs_total", + description="Total workflow runs scanned by cleanup jobs.", + unit="{run}", + ) + self._runs_targeted_total = meter.create_counter( + "workflow_run_cleanup_targeted_runs_total", + description="Total workflow runs targeted by cleanup policy.", + unit="{run}", + ) + self._runs_deleted_total = meter.create_counter( + "workflow_run_cleanup_deleted_runs_total", + description="Total workflow runs deleted by cleanup jobs.", + unit="{run}", + ) + self._runs_skipped_total = meter.create_counter( + "workflow_run_cleanup_skipped_runs_total", + description="Total workflow runs skipped because tenant is paid/unknown.", + unit="{run}", + ) + self._related_records_total = meter.create_counter( + "workflow_run_cleanup_related_records_total", + description="Total related records processed by cleanup jobs.", + unit="{record}", + ) + self._job_duration_seconds = meter.create_histogram( + "workflow_run_cleanup_job_duration_seconds", + description="Duration of workflow run cleanup jobs in seconds.", + unit="s", + ) + self._batch_duration_seconds = meter.create_histogram( + "workflow_run_cleanup_batch_duration_seconds", + description="Duration of workflow run cleanup batch processing in seconds.", + unit="s", + ) + except Exception: + logger.exception("workflow_run_cleanup_metrics: failed to initialize instruments") + + def _attrs(self, **extra: str) -> dict[str, str]: + return {**self._base_attributes, **extra} + + @staticmethod + def _add(counter: "Counter | None", value: int, attributes: dict[str, str]) -> None: + if not counter or value <= 0: + return + try: + counter.add(value, attributes) + except Exception: + logger.exception("workflow_run_cleanup_metrics: failed to add counter value") + + @staticmethod + def _record(histogram: "Histogram | None", value: float, attributes: dict[str, str]) -> None: + if not histogram: + return + try: + histogram.record(value, attributes) + except Exception: + logger.exception("workflow_run_cleanup_metrics: failed to record histogram value") + + def record_batch( + self, + *, + batch_rows: int, + targeted_runs: int, + skipped_runs: int, + deleted_runs: int, + related_counts: dict[str, int] | None, + related_action: str | None, + batch_duration_seconds: float, + ) -> None: + attributes = self._attrs() + self._add(self._batches_total, 1, attributes) + self._add(self._runs_scanned_total, batch_rows, attributes) + self._add(self._runs_targeted_total, targeted_runs, attributes) + self._add(self._runs_skipped_total, skipped_runs, attributes) + self._add(self._runs_deleted_total, deleted_runs, attributes) + self._record(self._batch_duration_seconds, batch_duration_seconds, attributes) + + if not related_counts or not related_action: + return + + for record_type, count in related_counts.items(): + self._add( + self._related_records_total, + count, + self._attrs(action=related_action, record_type=record_type), + ) + + def record_completion(self, *, status: str, job_duration_seconds: float) -> None: + attributes = self._attrs(status=status) + self._add(self._job_runs_total, 1, attributes) + self._record(self._job_duration_seconds, job_duration_seconds, attributes) + + class WorkflowRunCleanup: def __init__( self, @@ -29,6 +182,7 @@ class WorkflowRunCleanup: end_before: datetime.datetime | None = None, workflow_run_repo: APIWorkflowRunRepository | None = None, dry_run: bool = False, + task_label: str = "custom", ): if (start_from is None) ^ (end_before is None): raise ValueError("start_from and end_before must be both set or both omitted.") @@ -46,6 +200,11 @@ class WorkflowRunCleanup: self.batch_size = batch_size self._cleanup_whitelist: set[str] | None = None self.dry_run = dry_run + self._metrics = WorkflowRunCleanupMetrics( + dry_run=dry_run, + has_window=bool(start_from), + task_label=task_label, + ) self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD self.workflow_run_repo: APIWorkflowRunRepository if workflow_run_repo: @@ -74,153 +233,193 @@ class WorkflowRunCleanup: related_totals = self._empty_related_counts() if self.dry_run else None batch_index = 0 last_seen: tuple[datetime.datetime, str] | None = None + status = "success" + run_start = time.monotonic() + max_batch_interval_ms = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL - max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200)) + try: + while True: + batch_start = time.monotonic() - while True: - batch_start = time.monotonic() - - fetch_start = time.monotonic() - run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( - start_from=self.window_start, - end_before=self.window_end, - last_seen=last_seen, - batch_size=self.batch_size, - ) - if not run_rows: - logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1) - break - - batch_index += 1 - last_seen = (run_rows[-1].created_at, run_rows[-1].id) - logger.info( - "workflow_run_cleanup (batch #%s): fetched %s rows in %sms", - batch_index, - len(run_rows), - int((time.monotonic() - fetch_start) * 1000), - ) - - tenant_ids = {row.tenant_id for row in run_rows} - - filter_start = time.monotonic() - free_tenants = self._filter_free_tenants(tenant_ids) - logger.info( - "workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms", - batch_index, - len(free_tenants), - len(tenant_ids), - int((time.monotonic() - filter_start) * 1000), - ) - - free_runs = [row for row in run_rows if row.tenant_id in free_tenants] - paid_or_skipped = len(run_rows) - len(free_runs) - - if not free_runs: - skipped_message = ( - f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)" + fetch_start = time.monotonic() + run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( + start_from=self.window_start, + end_before=self.window_end, + last_seen=last_seen, + batch_size=self.batch_size, ) - click.echo( - click.style( - skipped_message, - fg="yellow", - ) - ) - continue + if not run_rows: + logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1) + break - total_runs_targeted += len(free_runs) - - if self.dry_run: - count_start = time.monotonic() - batch_counts = self.workflow_run_repo.count_runs_with_related( - free_runs, - count_node_executions=self._count_node_executions, - count_trigger_logs=self._count_trigger_logs, - ) + batch_index += 1 + last_seen = (run_rows[-1].created_at, run_rows[-1].id) logger.info( - "workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms", + "workflow_run_cleanup (batch #%s): fetched %s rows in %sms", batch_index, - int((time.monotonic() - count_start) * 1000), + len(run_rows), + int((time.monotonic() - fetch_start) * 1000), ) - if related_totals is not None: - for key in related_totals: - related_totals[key] += batch_counts.get(key, 0) - sample_ids = ", ".join(run.id for run in free_runs[:5]) + + tenant_ids = {row.tenant_id for row in run_rows} + + filter_start = time.monotonic() + free_tenants = self._filter_free_tenants(tenant_ids) + logger.info( + "workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms", + batch_index, + len(free_tenants), + len(tenant_ids), + int((time.monotonic() - filter_start) * 1000), + ) + + free_runs = [row for row in run_rows if row.tenant_id in free_tenants] + paid_or_skipped = len(run_rows) - len(free_runs) + + if not free_runs: + skipped_message = ( + f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)" + ) + click.echo( + click.style( + skipped_message, + fg="yellow", + ) + ) + self._metrics.record_batch( + batch_rows=len(run_rows), + targeted_runs=0, + skipped_runs=paid_or_skipped, + deleted_runs=0, + related_counts=None, + related_action=None, + batch_duration_seconds=time.monotonic() - batch_start, + ) + continue + + total_runs_targeted += len(free_runs) + + if self.dry_run: + count_start = time.monotonic() + batch_counts = self.workflow_run_repo.count_runs_with_related( + free_runs, + count_node_executions=self._count_node_executions, + count_trigger_logs=self._count_trigger_logs, + ) + logger.info( + "workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms", + batch_index, + int((time.monotonic() - count_start) * 1000), + ) + if related_totals is not None: + for key in related_totals: + related_totals[key] += batch_counts.get(key, 0) + sample_ids = ", ".join(run.id for run in free_runs[:5]) + click.echo( + click.style( + f"[batch #{batch_index}] would delete {len(free_runs)} runs " + f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown", + fg="yellow", + ) + ) + logger.info( + "workflow_run_cleanup (batch #%s, dry_run): batch total %sms", + batch_index, + int((time.monotonic() - batch_start) * 1000), + ) + self._metrics.record_batch( + batch_rows=len(run_rows), + targeted_runs=len(free_runs), + skipped_runs=paid_or_skipped, + deleted_runs=0, + related_counts={key: batch_counts.get(key, 0) for key in self._empty_related_counts()}, + related_action="would_delete", + batch_duration_seconds=time.monotonic() - batch_start, + ) + continue + + try: + delete_start = time.monotonic() + counts = self.workflow_run_repo.delete_runs_with_related( + free_runs, + delete_node_executions=self._delete_node_executions, + delete_trigger_logs=self._delete_trigger_logs, + ) + delete_ms = int((time.monotonic() - delete_start) * 1000) + except Exception: + logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) + raise + + total_runs_deleted += counts["runs"] click.echo( click.style( - f"[batch #{batch_index}] would delete {len(free_runs)} runs " - f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown", - fg="yellow", + f"[batch #{batch_index}] deleted runs: {counts['runs']} " + f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, " + f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, " + f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); " + f"skipped {paid_or_skipped} paid/unknown", + fg="green", ) ) logger.info( - "workflow_run_cleanup (batch #%s, dry_run): batch total %sms", + "workflow_run_cleanup (batch #%s): delete %sms, batch total %sms", batch_index, + delete_ms, int((time.monotonic() - batch_start) * 1000), ) - continue - - try: - delete_start = time.monotonic() - counts = self.workflow_run_repo.delete_runs_with_related( - free_runs, - delete_node_executions=self._delete_node_executions, - delete_trigger_logs=self._delete_trigger_logs, + self._metrics.record_batch( + batch_rows=len(run_rows), + targeted_runs=len(free_runs), + skipped_runs=paid_or_skipped, + deleted_runs=counts["runs"], + related_counts={key: counts.get(key, 0) for key in self._empty_related_counts()}, + related_action="deleted", + batch_duration_seconds=time.monotonic() - batch_start, ) - delete_ms = int((time.monotonic() - delete_start) * 1000) - except Exception: - logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) - raise - total_runs_deleted += counts["runs"] - click.echo( - click.style( - f"[batch #{batch_index}] deleted runs: {counts['runs']} " - f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, " - f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, " - f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); " - f"skipped {paid_or_skipped} paid/unknown", - fg="green", - ) - ) - logger.info( - "workflow_run_cleanup (batch #%s): delete %sms, batch total %sms", - batch_index, - delete_ms, - int((time.monotonic() - batch_start) * 1000), - ) + # Random sleep between batches to avoid overwhelming the database + sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311 + logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms) + time.sleep(sleep_ms / 1000) - # Random sleep between batches to avoid overwhelming the database - sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311 - logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms) - time.sleep(sleep_ms / 1000) - - if self.dry_run: - if self.window_start: - summary_message = ( - f"Dry run complete. Would delete {total_runs_targeted} workflow runs " - f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" - ) + if self.dry_run: + if self.window_start: + summary_message = ( + f"Dry run complete. Would delete {total_runs_targeted} workflow runs " + f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" + ) + else: + summary_message = ( + f"Dry run complete. Would delete {total_runs_targeted} workflow runs " + f"before {self.window_end.isoformat()}" + ) + if related_totals is not None: + summary_message = ( + f"{summary_message}; related records: {self._format_related_counts(related_totals)}" + ) + summary_color = "yellow" else: - summary_message = ( - f"Dry run complete. Would delete {total_runs_targeted} workflow runs " - f"before {self.window_end.isoformat()}" - ) - if related_totals is not None: - summary_message = f"{summary_message}; related records: {self._format_related_counts(related_totals)}" - summary_color = "yellow" - else: - if self.window_start: - summary_message = ( - f"Cleanup complete. Deleted {total_runs_deleted} workflow runs " - f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" - ) - else: - summary_message = ( - f"Cleanup complete. Deleted {total_runs_deleted} workflow runs before {self.window_end.isoformat()}" - ) - summary_color = "white" + if self.window_start: + summary_message = ( + f"Cleanup complete. Deleted {total_runs_deleted} workflow runs " + f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" + ) + else: + summary_message = ( + f"Cleanup complete. Deleted {total_runs_deleted} workflow runs " + f"before {self.window_end.isoformat()}" + ) + summary_color = "white" - click.echo(click.style(summary_message, fg=summary_color)) + click.echo(click.style(summary_message, fg=summary_color)) + except Exception: + status = "failed" + raise + finally: + self._metrics.record_completion( + status=status, + job_duration_seconds=time.monotonic() - run_start, + ) def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]: tenant_id_list = list(tenant_ids) diff --git a/api/tests/unit_tests/commands/test_clean_expired_messages.py b/api/tests/unit_tests/commands/test_clean_expired_messages.py index 60173f723d..5375988a69 100644 --- a/api/tests/unit_tests/commands/test_clean_expired_messages.py +++ b/api/tests/unit_tests/commands/test_clean_expired_messages.py @@ -46,6 +46,7 @@ def test_absolute_mode_calls_from_time_range(): end_before=end_before, batch_size=200, dry_run=True, + task_label="custom", ) mock_from_days.assert_not_called() @@ -74,6 +75,7 @@ def test_relative_mode_before_days_only_calls_from_days(): days=30, batch_size=500, dry_run=False, + task_label="before-30", ) mock_from_time_range.assert_not_called() @@ -105,6 +107,7 @@ def test_relative_mode_with_from_days_ago_calls_from_time_range(): end_before=fixed_now - datetime.timedelta(days=30), batch_size=1000, dry_run=False, + task_label="60to30", ) mock_from_days.assert_not_called() diff --git a/api/tests/unit_tests/services/retention/conversation/test_messages_clean_service.py b/api/tests/unit_tests/services/retention/conversation/test_messages_clean_service.py index a34defeba9..f9d901fca2 100644 --- a/api/tests/unit_tests/services/retention/conversation/test_messages_clean_service.py +++ b/api/tests/unit_tests/services/retention/conversation/test_messages_clean_service.py @@ -1,5 +1,4 @@ import datetime -import os from unittest.mock import MagicMock, patch import pytest @@ -282,7 +281,6 @@ class TestMessagesCleanService: MessagesCleanService._batch_delete_message_relations(mock_db_session, ["msg1", "msg2"]) assert mock_db_session.execute.call_count == 8 # 8 tables to clean up - @patch.dict(os.environ, {"SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL": "500"}) def test_clean_messages_interval_from_env(self, mock_db_session, mock_policy): service = MessagesCleanService( policy=mock_policy, @@ -301,9 +299,13 @@ class TestMessagesCleanService: mock_db_session.execute.side_effect = mock_returns mock_policy.filter_message_ids.return_value = ["msg1"] - with patch("services.retention.conversation.messages_clean_service.time.sleep") as mock_sleep: - with patch("services.retention.conversation.messages_clean_service.random.uniform") as mock_uniform: - mock_uniform.return_value = 300.0 - service.run() - mock_uniform.assert_called_with(0, 500) - mock_sleep.assert_called_with(0.3) + with patch( + "services.retention.conversation.messages_clean_service.dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", + 500, + ): + with patch("services.retention.conversation.messages_clean_service.time.sleep") as mock_sleep: + with patch("services.retention.conversation.messages_clean_service.random.uniform") as mock_uniform: + mock_uniform.return_value = 300.0 + service.run() + mock_uniform.assert_called_with(0, 500) + mock_sleep.assert_called_with(0.3) diff --git a/api/tests/unit_tests/services/retention/workflow_run/test_clear_free_plan_expired_workflow_run_logs.py b/api/tests/unit_tests/services/retention/workflow_run/test_clear_free_plan_expired_workflow_run_logs.py index 0013cde79e..7d30645d38 100644 --- a/api/tests/unit_tests/services/retention/workflow_run/test_clear_free_plan_expired_workflow_run_logs.py +++ b/api/tests/unit_tests/services/retention/workflow_run/test_clear_free_plan_expired_workflow_run_logs.py @@ -80,7 +80,13 @@ class TestWorkflowRunCleanupInit: cfg.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD = 0 cfg.BILLING_ENABLED = False with pytest.raises(ValueError): - WorkflowRunCleanup(days=30, batch_size=10, start_from=dt, end_before=dt, workflow_run_repo=mock_repo) + WorkflowRunCleanup( + days=30, + batch_size=10, + start_from=dt, + end_before=dt, + workflow_run_repo=mock_repo, + ) def test_zero_batch_size_raises(self, mock_repo): with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: @@ -102,10 +108,24 @@ class TestWorkflowRunCleanupInit: cfg.BILLING_ENABLED = False start = datetime.datetime(2024, 1, 1) end = datetime.datetime(2024, 6, 1) - c = WorkflowRunCleanup(days=30, batch_size=5, start_from=start, end_before=end, workflow_run_repo=mock_repo) + c = WorkflowRunCleanup( + days=30, + batch_size=5, + start_from=start, + end_before=end, + workflow_run_repo=mock_repo, + ) assert c.window_start == start assert c.window_end == end + def test_default_task_label_is_custom(self, mock_repo): + with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: + cfg.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD = 0 + cfg.BILLING_ENABLED = False + c = WorkflowRunCleanup(days=30, batch_size=10, workflow_run_repo=mock_repo) + + assert c._metrics._base_attributes["task_label"] == "custom" + # --------------------------------------------------------------------------- # _empty_related_counts / _format_related_counts @@ -393,7 +413,12 @@ class TestRunDryRunMode: with patch("services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs.dify_config") as cfg: cfg.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD = 0 cfg.BILLING_ENABLED = False - return WorkflowRunCleanup(days=30, batch_size=10, workflow_run_repo=mock_repo, dry_run=True) + return WorkflowRunCleanup( + days=30, + batch_size=10, + workflow_run_repo=mock_repo, + dry_run=True, + ) def test_dry_run_no_delete_called(self, mock_repo): run = make_run("t1") diff --git a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py index 50826d6798..6bf78d3411 100644 --- a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py +++ b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py @@ -265,6 +265,61 @@ def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None: cleanup.run() +def test_run_records_metrics_on_success(monkeypatch: pytest.MonkeyPatch) -> None: + cutoff = datetime.datetime.now() + repo = FakeRepo( + batches=[[FakeRun("run-free", "t_free", cutoff)]], + delete_result={ + "runs": 0, + "node_executions": 2, + "offloads": 1, + "app_logs": 3, + "trigger_logs": 4, + "pauses": 5, + "pause_reasons": 6, + }, + ) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False) + + batch_calls: list[dict[str, object]] = [] + completion_calls: list[dict[str, object]] = [] + monkeypatch.setattr(cleanup._metrics, "record_batch", lambda **kwargs: batch_calls.append(kwargs)) + monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs)) + + cleanup.run() + + assert len(batch_calls) == 1 + assert batch_calls[0]["batch_rows"] == 1 + assert batch_calls[0]["targeted_runs"] == 1 + assert batch_calls[0]["deleted_runs"] == 1 + assert batch_calls[0]["related_action"] == "deleted" + assert len(completion_calls) == 1 + assert completion_calls[0]["status"] == "success" + + +def test_run_records_failed_metrics(monkeypatch: pytest.MonkeyPatch) -> None: + class FailingRepo(FakeRepo): + def delete_runs_with_related( + self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None + ) -> dict[str, int]: + raise RuntimeError("delete failed") + + cutoff = datetime.datetime.now() + repo = FailingRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]]) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False) + + completion_calls: list[dict[str, object]] = [] + monkeypatch.setattr(cleanup._metrics, "record_completion", lambda **kwargs: completion_calls.append(kwargs)) + + with pytest.raises(RuntimeError, match="delete failed"): + cleanup.run() + + assert len(completion_calls) == 1 + assert completion_calls[0]["status"] == "failed" + + def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None: cutoff = datetime.datetime.now() repo = FakeRepo( diff --git a/api/tests/unit_tests/services/test_messages_clean_service.py b/api/tests/unit_tests/services/test_messages_clean_service.py index 4449b442d6..f3efc4463e 100644 --- a/api/tests/unit_tests/services/test_messages_clean_service.py +++ b/api/tests/unit_tests/services/test_messages_clean_service.py @@ -540,6 +540,20 @@ class TestMessagesCleanServiceFromTimeRange: assert service._batch_size == 1000 # default assert service._dry_run is False # default + def test_explicit_task_label(self): + start_from = datetime.datetime(2024, 1, 1) + end_before = datetime.datetime(2024, 1, 2) + policy = BillingDisabledPolicy() + + service = MessagesCleanService.from_time_range( + policy=policy, + start_from=start_from, + end_before=end_before, + task_label="60to30", + ) + + assert service._metrics._base_attributes["task_label"] == "60to30" + class TestMessagesCleanServiceFromDays: """Unit tests for MessagesCleanService.from_days factory method.""" @@ -619,3 +633,54 @@ class TestMessagesCleanServiceFromDays: assert service._end_before == expected_end_before assert service._batch_size == 1000 # default assert service._dry_run is False # default + assert service._metrics._base_attributes["task_label"] == "custom" + + +class TestMessagesCleanServiceRun: + """Unit tests for MessagesCleanService.run instrumentation behavior.""" + + def test_run_records_completion_metrics_on_success(self): + # Arrange + service = MessagesCleanService( + policy=BillingDisabledPolicy(), + start_from=datetime.datetime(2024, 1, 1), + end_before=datetime.datetime(2024, 1, 2), + batch_size=100, + dry_run=False, + ) + expected_stats = { + "batches": 1, + "total_messages": 10, + "filtered_messages": 5, + "total_deleted": 5, + } + service._clean_messages_by_time_range = MagicMock(return_value=expected_stats) # type: ignore[method-assign] + completion_calls: list[dict[str, object]] = [] + service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign] + + # Act + result = service.run() + + # Assert + assert result == expected_stats + assert len(completion_calls) == 1 + assert completion_calls[0]["status"] == "success" + + def test_run_records_completion_metrics_on_failure(self): + # Arrange + service = MessagesCleanService( + policy=BillingDisabledPolicy(), + start_from=datetime.datetime(2024, 1, 1), + end_before=datetime.datetime(2024, 1, 2), + batch_size=100, + dry_run=False, + ) + service._clean_messages_by_time_range = MagicMock(side_effect=RuntimeError("clean failed")) # type: ignore[method-assign] + completion_calls: list[dict[str, object]] = [] + service._metrics.record_completion = lambda **kwargs: completion_calls.append(kwargs) # type: ignore[method-assign] + + # Act & Assert + with pytest.raises(RuntimeError, match="clean failed"): + service.run() + assert len(completion_calls) == 1 + assert completion_calls[0]["status"] == "failed"