refactor: migrate workflow run repository tests from mocks to … (#33837)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
YBoy 2026-03-21 07:23:11 +02:00 committed by GitHub
parent f41d1d0822
commit 097773c9f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 391 additions and 251 deletions

View File

@ -0,0 +1,391 @@
"""Integration tests for get_paginated_workflow_runs and get_workflow_runs_count using testcontainers."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import timedelta
from uuid import uuid4
import pytest
from sqlalchemy import Engine, delete
from sqlalchemy import exc as sa_exc
from sqlalchemy.orm import Session, sessionmaker
from dify_graph.entities import WorkflowExecution
from dify_graph.enums import WorkflowExecutionStatus
from libs.datetime_utils import naive_utc_now
from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom
from models.workflow import WorkflowRun, WorkflowType
from repositories.sqlalchemy_api_workflow_run_repository import DifyAPISQLAlchemyWorkflowRunRepository
class _TestWorkflowRunRepository(DifyAPISQLAlchemyWorkflowRunRepository):
"""Concrete repository for tests where save() is not under test."""
def save(self, execution: WorkflowExecution) -> None:
return None
@dataclass
class _TestScope:
"""Per-test data scope used to isolate DB rows."""
tenant_id: str = field(default_factory=lambda: str(uuid4()))
app_id: str = field(default_factory=lambda: str(uuid4()))
workflow_id: str = field(default_factory=lambda: str(uuid4()))
user_id: str = field(default_factory=lambda: str(uuid4()))
def _create_workflow_run(
session: Session,
scope: _TestScope,
*,
status: WorkflowExecutionStatus,
triggered_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.DEBUGGING,
created_at_offset: timedelta | None = None,
) -> WorkflowRun:
"""Create and persist a workflow run bound to the current test scope."""
now = naive_utc_now()
workflow_run = WorkflowRun(
id=str(uuid4()),
tenant_id=scope.tenant_id,
app_id=scope.app_id,
workflow_id=scope.workflow_id,
type=WorkflowType.WORKFLOW,
triggered_from=triggered_from,
version="draft",
graph="{}",
inputs="{}",
status=status,
created_by_role=CreatorUserRole.ACCOUNT,
created_by=scope.user_id,
created_at=now + created_at_offset if created_at_offset is not None else now,
)
session.add(workflow_run)
session.commit()
return workflow_run
def _cleanup_scope_data(session: Session, scope: _TestScope) -> None:
"""Remove test-created DB rows for a test scope."""
session.execute(
delete(WorkflowRun).where(
WorkflowRun.tenant_id == scope.tenant_id,
WorkflowRun.app_id == scope.app_id,
)
)
session.commit()
@pytest.fixture
def repository(db_session_with_containers: Session) -> DifyAPISQLAlchemyWorkflowRunRepository:
"""Build a repository backed by the testcontainers database engine."""
engine = db_session_with_containers.get_bind()
assert isinstance(engine, Engine)
return _TestWorkflowRunRepository(session_maker=sessionmaker(bind=engine, expire_on_commit=False))
@pytest.fixture
def test_scope(db_session_with_containers: Session) -> _TestScope:
"""Provide an isolated scope and clean related data after each test."""
scope = _TestScope()
yield scope
_cleanup_scope_data(db_session_with_containers, scope)
class TestGetPaginatedWorkflowRuns:
"""Integration tests for get_paginated_workflow_runs."""
def test_returns_runs_without_status_filter(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Return all runs for the given tenant/app when no status filter is applied."""
for status in (
WorkflowExecutionStatus.SUCCEEDED,
WorkflowExecutionStatus.FAILED,
WorkflowExecutionStatus.RUNNING,
):
_create_workflow_run(db_session_with_containers, test_scope, status=status)
result = repository.get_paginated_workflow_runs(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=20,
last_id=None,
status=None,
)
assert len(result.data) == 3
assert result.limit == 20
assert result.has_more is False
def test_filters_by_status(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Return only runs matching the requested status."""
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.SUCCEEDED)
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.SUCCEEDED)
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.FAILED)
result = repository.get_paginated_workflow_runs(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=20,
last_id=None,
status="succeeded",
)
assert len(result.data) == 2
assert all(run.status == WorkflowExecutionStatus.SUCCEEDED for run in result.data)
def test_pagination_has_more(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Return has_more=True when more records exist beyond the limit."""
for i in range(5):
_create_workflow_run(
db_session_with_containers,
test_scope,
status=WorkflowExecutionStatus.SUCCEEDED,
created_at_offset=timedelta(seconds=i),
)
result = repository.get_paginated_workflow_runs(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=3,
last_id=None,
status=None,
)
assert len(result.data) == 3
assert result.has_more is True
def test_cursor_based_pagination(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Cursor-based pagination returns the next page of results."""
for i in range(5):
_create_workflow_run(
db_session_with_containers,
test_scope,
status=WorkflowExecutionStatus.SUCCEEDED,
created_at_offset=timedelta(seconds=i),
)
# First page
page1 = repository.get_paginated_workflow_runs(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=3,
last_id=None,
status=None,
)
assert len(page1.data) == 3
assert page1.has_more is True
# Second page using cursor
page2 = repository.get_paginated_workflow_runs(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=3,
last_id=page1.data[-1].id,
status=None,
)
assert len(page2.data) == 2
assert page2.has_more is False
# No overlap between pages
page1_ids = {r.id for r in page1.data}
page2_ids = {r.id for r in page2.data}
assert page1_ids.isdisjoint(page2_ids)
def test_invalid_last_id_raises(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
test_scope: _TestScope,
) -> None:
"""Raise ValueError when last_id refers to a non-existent run."""
with pytest.raises(ValueError, match="Last workflow run not exists"):
repository.get_paginated_workflow_runs(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=20,
last_id=str(uuid4()),
status=None,
)
def test_tenant_isolation(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Runs from other tenants are not returned."""
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.SUCCEEDED)
other_scope = _TestScope(app_id=test_scope.app_id)
try:
_create_workflow_run(db_session_with_containers, other_scope, status=WorkflowExecutionStatus.SUCCEEDED)
result = repository.get_paginated_workflow_runs(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=20,
last_id=None,
status=None,
)
assert len(result.data) == 1
assert result.data[0].tenant_id == test_scope.tenant_id
finally:
_cleanup_scope_data(db_session_with_containers, other_scope)
class TestGetWorkflowRunsCount:
"""Integration tests for get_workflow_runs_count."""
def test_count_without_status_filter(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Count all runs grouped by status when no status filter is applied."""
for _ in range(3):
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.SUCCEEDED)
for _ in range(2):
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.FAILED)
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.RUNNING)
result = repository.get_workflow_runs_count(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status=None,
)
assert result["total"] == 6
assert result["succeeded"] == 3
assert result["failed"] == 2
assert result["running"] == 1
assert result["stopped"] == 0
assert result["partial-succeeded"] == 0
def test_count_with_status_filter(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Count only runs matching the requested status."""
for _ in range(3):
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.SUCCEEDED)
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.FAILED)
result = repository.get_workflow_runs_count(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status="succeeded",
)
assert result["total"] == 3
assert result["succeeded"] == 3
assert result["failed"] == 0
def test_count_with_invalid_status_raises(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Invalid status raises StatementError because the column uses an enum type."""
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.SUCCEEDED)
with pytest.raises(sa_exc.StatementError) as exc_info:
repository.get_workflow_runs_count(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status="invalid_status",
)
assert isinstance(exc_info.value.orig, ValueError)
def test_count_with_time_range(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Time range filter excludes runs created outside the window."""
# Recent run (within 1 day)
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.SUCCEEDED)
# Old run (8 days ago)
_create_workflow_run(
db_session_with_containers,
test_scope,
status=WorkflowExecutionStatus.SUCCEEDED,
created_at_offset=timedelta(days=-8),
)
result = repository.get_workflow_runs_count(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status=None,
time_range="7d",
)
assert result["total"] == 1
assert result["succeeded"] == 1
def test_count_with_status_and_time_range(
self,
repository: DifyAPISQLAlchemyWorkflowRunRepository,
db_session_with_containers: Session,
test_scope: _TestScope,
) -> None:
"""Both status and time_range filters apply together."""
# Recent succeeded
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.SUCCEEDED)
# Recent failed
_create_workflow_run(db_session_with_containers, test_scope, status=WorkflowExecutionStatus.FAILED)
# Old succeeded (outside time range)
_create_workflow_run(
db_session_with_containers,
test_scope,
status=WorkflowExecutionStatus.SUCCEEDED,
created_at_offset=timedelta(days=-8),
)
result = repository.get_workflow_runs_count(
tenant_id=test_scope.tenant_id,
app_id=test_scope.app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status="succeeded",
time_range="7d",
)
assert result["total"] == 1
assert result["succeeded"] == 1
assert result["failed"] == 0

View File

@ -1,251 +0,0 @@
"""Unit tests for workflow run repository with status filter."""
import uuid
from unittest.mock import MagicMock
import pytest
from sqlalchemy.orm import sessionmaker
from models import WorkflowRun, WorkflowRunTriggeredFrom
from repositories.sqlalchemy_api_workflow_run_repository import DifyAPISQLAlchemyWorkflowRunRepository
class TestDifyAPISQLAlchemyWorkflowRunRepository:
"""Test workflow run repository with status filtering."""
@pytest.fixture
def mock_session_maker(self):
"""Create a mock session maker."""
return MagicMock(spec=sessionmaker)
@pytest.fixture
def repository(self, mock_session_maker):
"""Create repository instance with mock session."""
return DifyAPISQLAlchemyWorkflowRunRepository(mock_session_maker)
def test_get_paginated_workflow_runs_without_status(self, repository, mock_session_maker):
"""Test getting paginated workflow runs without status filter."""
# Arrange
tenant_id = str(uuid.uuid4())
app_id = str(uuid.uuid4())
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_runs = [MagicMock(spec=WorkflowRun) for _ in range(3)]
mock_session.scalars.return_value.all.return_value = mock_runs
# Act
result = repository.get_paginated_workflow_runs(
tenant_id=tenant_id,
app_id=app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=20,
last_id=None,
status=None,
)
# Assert
assert len(result.data) == 3
assert result.limit == 20
assert result.has_more is False
def test_get_paginated_workflow_runs_with_status_filter(self, repository, mock_session_maker):
"""Test getting paginated workflow runs with status filter."""
# Arrange
tenant_id = str(uuid.uuid4())
app_id = str(uuid.uuid4())
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
mock_runs = [MagicMock(spec=WorkflowRun, status="succeeded") for _ in range(2)]
mock_session.scalars.return_value.all.return_value = mock_runs
# Act
result = repository.get_paginated_workflow_runs(
tenant_id=tenant_id,
app_id=app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=20,
last_id=None,
status="succeeded",
)
# Assert
assert len(result.data) == 2
assert all(run.status == "succeeded" for run in result.data)
def test_get_workflow_runs_count_without_status(self, repository, mock_session_maker):
"""Test getting workflow runs count without status filter."""
# Arrange
tenant_id = str(uuid.uuid4())
app_id = str(uuid.uuid4())
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
# Mock the GROUP BY query results
mock_results = [
("succeeded", 5),
("failed", 2),
("running", 1),
]
mock_session.execute.return_value.all.return_value = mock_results
# Act
result = repository.get_workflow_runs_count(
tenant_id=tenant_id,
app_id=app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status=None,
)
# Assert
assert result["total"] == 8
assert result["succeeded"] == 5
assert result["failed"] == 2
assert result["running"] == 1
assert result["stopped"] == 0
assert result["partial-succeeded"] == 0
def test_get_workflow_runs_count_with_status_filter(self, repository, mock_session_maker):
"""Test getting workflow runs count with status filter."""
# Arrange
tenant_id = str(uuid.uuid4())
app_id = str(uuid.uuid4())
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
# Mock the count query for succeeded status
mock_session.scalar.return_value = 5
# Act
result = repository.get_workflow_runs_count(
tenant_id=tenant_id,
app_id=app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status="succeeded",
)
# Assert
assert result["total"] == 5
assert result["succeeded"] == 5
assert result["running"] == 0
assert result["failed"] == 0
assert result["stopped"] == 0
assert result["partial-succeeded"] == 0
def test_get_workflow_runs_count_with_invalid_status(self, repository, mock_session_maker):
"""Test that invalid status is still counted in total but not in any specific status."""
# Arrange
tenant_id = str(uuid.uuid4())
app_id = str(uuid.uuid4())
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
# Mock count query returning 0 for invalid status
mock_session.scalar.return_value = 0
# Act
result = repository.get_workflow_runs_count(
tenant_id=tenant_id,
app_id=app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status="invalid_status",
)
# Assert
assert result["total"] == 0
assert all(result[status] == 0 for status in ["running", "succeeded", "failed", "stopped", "partial-succeeded"])
def test_get_workflow_runs_count_with_time_range(self, repository, mock_session_maker):
"""Test getting workflow runs count with time range filter verifies SQL query construction."""
# Arrange
tenant_id = str(uuid.uuid4())
app_id = str(uuid.uuid4())
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
# Mock the GROUP BY query results
mock_results = [
("succeeded", 3),
("running", 2),
]
mock_session.execute.return_value.all.return_value = mock_results
# Act
result = repository.get_workflow_runs_count(
tenant_id=tenant_id,
app_id=app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status=None,
time_range="1d",
)
# Assert results
assert result["total"] == 5
assert result["succeeded"] == 3
assert result["running"] == 2
assert result["failed"] == 0
# Verify that execute was called (which means GROUP BY query was used)
assert mock_session.execute.called, "execute should have been called for GROUP BY query"
# Verify SQL query includes time filter by checking the statement
call_args = mock_session.execute.call_args
assert call_args is not None, "execute should have been called with a statement"
# The first argument should be the SQL statement
stmt = call_args[0][0]
# Convert to string to inspect the query
query_str = str(stmt.compile(compile_kwargs={"literal_binds": True}))
# Verify the query includes created_at filter
# The query should have a WHERE clause with created_at comparison
assert "created_at" in query_str.lower() or "workflow_runs.created_at" in query_str.lower(), (
"Query should include created_at filter for time range"
)
def test_get_workflow_runs_count_with_status_and_time_range(self, repository, mock_session_maker):
"""Test getting workflow runs count with both status and time range filters verifies SQL query."""
# Arrange
tenant_id = str(uuid.uuid4())
app_id = str(uuid.uuid4())
mock_session = MagicMock()
mock_session_maker.return_value.__enter__.return_value = mock_session
# Mock the count query for running status within time range
mock_session.scalar.return_value = 2
# Act
result = repository.get_workflow_runs_count(
tenant_id=tenant_id,
app_id=app_id,
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
status="running",
time_range="1d",
)
# Assert results
assert result["total"] == 2
assert result["running"] == 2
assert result["succeeded"] == 0
assert result["failed"] == 0
# Verify that scalar was called (which means COUNT query was used)
assert mock_session.scalar.called, "scalar should have been called for count query"
# Verify SQL query includes both status and time filter
call_args = mock_session.scalar.call_args
assert call_args is not None, "scalar should have been called with a statement"
# The first argument should be the SQL statement
stmt = call_args[0][0]
# Convert to string to inspect the query
query_str = str(stmt.compile(compile_kwargs={"literal_binds": True}))
# Verify the query includes both filters
assert "created_at" in query_str.lower() or "workflow_runs.created_at" in query_str.lower(), (
"Query should include created_at filter for time range"
)
assert "status" in query_str.lower() or "workflow_runs.status" in query_str.lower(), (
"Query should include status filter"
)