ci: fix AttributeError: 'Flask' object has no attribute 'login_manager' FAILED #33891 (#33896)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Asuka Minato 2026-03-23 21:27:14 +09:00 committed by GitHub
parent 8b6fc07019
commit d956b919a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 1792 additions and 1726 deletions

View File

@ -0,0 +1,342 @@
"""Authenticated controller integration tests for console message APIs."""
from datetime import timedelta
from decimal import Decimal
from unittest.mock import patch
from uuid import uuid4
import pytest
from flask.testing import FlaskClient
from sqlalchemy import select
from sqlalchemy.orm import Session
from controllers.console.app.message import ChatMessagesQuery, FeedbackExportQuery, MessageFeedbackPayload
from controllers.console.app.message import attach_message_extra_contents as _attach_message_extra_contents
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from libs.datetime_utils import naive_utc_now
from models.enums import ConversationFromSource, FeedbackRating
from models.model import AppMode, Conversation, Message, MessageAnnotation, MessageFeedback
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError
from tests.test_containers_integration_tests.controllers.console.helpers import (
authenticate_console_client,
create_console_account_and_tenant,
create_console_app,
)
def _create_conversation(db_session: Session, app_id: str, account_id: str, mode: AppMode) -> Conversation:
conversation = Conversation(
app_id=app_id,
app_model_config_id=None,
model_provider=None,
model_id="",
override_model_configs=None,
mode=mode,
name="Test Conversation",
inputs={},
introduction="",
system_instruction="",
system_instruction_tokens=0,
status="normal",
from_source=ConversationFromSource.CONSOLE,
from_account_id=account_id,
)
db_session.add(conversation)
db_session.commit()
return conversation
def _create_message(
db_session: Session,
app_id: str,
conversation_id: str,
account_id: str,
*,
created_at_offset_seconds: int = 0,
) -> Message:
created_at = naive_utc_now() + timedelta(seconds=created_at_offset_seconds)
message = Message(
app_id=app_id,
model_provider=None,
model_id="",
override_model_configs=None,
conversation_id=conversation_id,
inputs={},
query="Hello",
message={"type": "text", "content": "Hello"},
message_tokens=1,
message_unit_price=Decimal("0.0001"),
message_price_unit=Decimal("0.001"),
answer="Hi there",
answer_tokens=1,
answer_unit_price=Decimal("0.0001"),
answer_price_unit=Decimal("0.001"),
parent_message_id=None,
provider_response_latency=0,
total_price=Decimal("0.0002"),
currency="USD",
from_source=ConversationFromSource.CONSOLE,
from_account_id=account_id,
created_at=created_at,
updated_at=created_at,
app_mode=AppMode.CHAT,
)
db_session.add(message)
db_session.commit()
return message
class TestMessageValidators:
def test_chat_messages_query_validators(self) -> None:
assert ChatMessagesQuery.empty_to_none("") is None
assert ChatMessagesQuery.empty_to_none("val") == "val"
assert ChatMessagesQuery.validate_uuid(None) is None
assert (
ChatMessagesQuery.validate_uuid("123e4567-e89b-12d3-a456-426614174000")
== "123e4567-e89b-12d3-a456-426614174000"
)
def test_message_feedback_validators(self) -> None:
assert (
MessageFeedbackPayload.validate_message_id("123e4567-e89b-12d3-a456-426614174000")
== "123e4567-e89b-12d3-a456-426614174000"
)
def test_feedback_export_validators(self) -> None:
assert FeedbackExportQuery.parse_bool(None) is None
assert FeedbackExportQuery.parse_bool(True) is True
assert FeedbackExportQuery.parse_bool("1") is True
assert FeedbackExportQuery.parse_bool("0") is False
assert FeedbackExportQuery.parse_bool("off") is False
with pytest.raises(ValueError):
FeedbackExportQuery.parse_bool("invalid")
def test_chat_message_list_not_found(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/chat-messages",
query_string={"conversation_id": str(uuid4())},
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 404
payload = response.get_json()
assert payload is not None
assert payload["code"] == "not_found"
def test_chat_message_list_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, app.mode)
_create_message(db_session_with_containers, app.id, conversation.id, account.id, created_at_offset_seconds=0)
second = _create_message(
db_session_with_containers,
app.id,
conversation.id,
account.id,
created_at_offset_seconds=1,
)
with patch(
"controllers.console.app.message.attach_message_extra_contents",
side_effect=_attach_message_extra_contents,
):
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/chat-messages",
query_string={"conversation_id": conversation.id, "limit": 1},
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert payload["limit"] == 1
assert payload["has_more"] is True
assert len(payload["data"]) == 1
assert payload["data"][0]["id"] == second.id
def test_message_feedback_not_found(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
response = test_client_with_containers.post(
f"/console/api/apps/{app.id}/feedbacks",
json={"message_id": str(uuid4()), "rating": "like"},
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 404
payload = response.get_json()
assert payload is not None
assert payload["code"] == "not_found"
def test_message_feedback_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, app.mode)
message = _create_message(db_session_with_containers, app.id, conversation.id, account.id)
response = test_client_with_containers.post(
f"/console/api/apps/{app.id}/feedbacks",
json={"message_id": message.id, "rating": "like"},
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json() == {"result": "success"}
feedback = db_session_with_containers.scalar(
select(MessageFeedback).where(MessageFeedback.message_id == message.id)
)
assert feedback is not None
assert feedback.rating == FeedbackRating.LIKE
assert feedback.from_account_id == account.id
def test_message_annotation_count(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, app.mode)
message = _create_message(db_session_with_containers, app.id, conversation.id, account.id)
db_session_with_containers.add(
MessageAnnotation(
app_id=app.id,
conversation_id=conversation.id,
message_id=message.id,
question="Q",
content="A",
account_id=account.id,
)
)
db_session_with_containers.commit()
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/annotations/count",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json() == {"count": 1}
def test_message_suggested_questions_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
message_id = str(uuid4())
with patch(
"controllers.console.app.message.MessageService.get_suggested_questions_after_answer",
return_value=["q1", "q2"],
):
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/chat-messages/{message_id}/suggested-questions",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json() == {"data": ["q1", "q2"]}
@pytest.mark.parametrize(
("exc", "expected_status", "expected_code"),
[
(MessageNotExistsError(), 404, "not_found"),
(ConversationNotExistsError(), 404, "not_found"),
(ProviderTokenNotInitError(), 400, "provider_not_initialize"),
(QuotaExceededError(), 400, "provider_quota_exceeded"),
(ModelCurrentlyNotSupportError(), 400, "model_currently_not_support"),
(SuggestedQuestionsAfterAnswerDisabledError(), 403, "app_suggested_questions_after_answer_disabled"),
(Exception(), 500, "internal_server_error"),
],
)
def test_message_suggested_questions_errors(
exc: Exception,
expected_status: int,
expected_code: str,
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
message_id = str(uuid4())
with patch(
"controllers.console.app.message.MessageService.get_suggested_questions_after_answer",
side_effect=exc,
):
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/chat-messages/{message_id}/suggested-questions",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == expected_status
payload = response.get_json()
assert payload is not None
assert payload["code"] == expected_code
def test_message_feedback_export_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
with patch("services.feedback_service.FeedbackService.export_feedbacks", return_value={"exported": True}):
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/feedbacks/export",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json() == {"exported": True}
def test_message_api_get_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, app.mode)
message = _create_message(db_session_with_containers, app.id, conversation.id, account.id)
with patch(
"controllers.console.app.message.attach_message_extra_contents",
side_effect=_attach_message_extra_contents,
):
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/messages/{message.id}",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert payload["id"] == message.id

View File

@ -0,0 +1,334 @@
"""Controller integration tests for console statistic routes."""
from datetime import timedelta
from decimal import Decimal
from unittest.mock import patch
from uuid import uuid4
from flask.testing import FlaskClient
from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import InvokeFrom
from libs.datetime_utils import naive_utc_now
from models.enums import ConversationFromSource, FeedbackFromSource, FeedbackRating
from models.model import AppMode, Conversation, Message, MessageFeedback
from tests.test_containers_integration_tests.controllers.console.helpers import (
authenticate_console_client,
create_console_account_and_tenant,
create_console_app,
)
def _create_conversation(
db_session: Session,
app_id: str,
account_id: str,
*,
mode: AppMode,
created_at_offset_days: int = 0,
) -> Conversation:
created_at = naive_utc_now() + timedelta(days=created_at_offset_days)
conversation = Conversation(
app_id=app_id,
app_model_config_id=None,
model_provider=None,
model_id="",
override_model_configs=None,
mode=mode,
name="Stats Conversation",
inputs={},
introduction="",
system_instruction="",
system_instruction_tokens=0,
status="normal",
from_source=ConversationFromSource.CONSOLE,
from_account_id=account_id,
created_at=created_at,
updated_at=created_at,
)
db_session.add(conversation)
db_session.commit()
return conversation
def _create_message(
db_session: Session,
app_id: str,
conversation_id: str,
*,
from_account_id: str | None,
from_end_user_id: str | None = None,
message_tokens: int = 1,
answer_tokens: int = 1,
total_price: Decimal = Decimal("0.01"),
provider_response_latency: float = 1.0,
created_at_offset_days: int = 0,
) -> Message:
created_at = naive_utc_now() + timedelta(days=created_at_offset_days)
message = Message(
app_id=app_id,
model_provider=None,
model_id="",
override_model_configs=None,
conversation_id=conversation_id,
inputs={},
query="Hello",
message={"type": "text", "content": "Hello"},
message_tokens=message_tokens,
message_unit_price=Decimal("0.001"),
message_price_unit=Decimal("0.001"),
answer="Hi there",
answer_tokens=answer_tokens,
answer_unit_price=Decimal("0.001"),
answer_price_unit=Decimal("0.001"),
parent_message_id=None,
provider_response_latency=provider_response_latency,
total_price=total_price,
currency="USD",
invoke_from=InvokeFrom.EXPLORE,
from_source=ConversationFromSource.CONSOLE,
from_end_user_id=from_end_user_id,
from_account_id=from_account_id,
created_at=created_at,
updated_at=created_at,
app_mode=AppMode.CHAT,
)
db_session.add(message)
db_session.commit()
return message
def _create_like_feedback(
db_session: Session,
app_id: str,
conversation_id: str,
message_id: str,
account_id: str,
) -> None:
db_session.add(
MessageFeedback(
app_id=app_id,
conversation_id=conversation_id,
message_id=message_id,
rating=FeedbackRating.LIKE,
from_source=FeedbackFromSource.ADMIN,
from_account_id=account_id,
)
)
db_session.commit()
def test_daily_message_statistic(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, mode=app.mode)
_create_message(db_session_with_containers, app.id, conversation.id, from_account_id=account.id)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/daily-messages",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json()["data"][0]["message_count"] == 1
def test_daily_conversation_statistic(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, mode=app.mode)
_create_message(db_session_with_containers, app.id, conversation.id, from_account_id=account.id)
_create_message(db_session_with_containers, app.id, conversation.id, from_account_id=account.id)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/daily-conversations",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json()["data"][0]["conversation_count"] == 1
def test_daily_terminals_statistic(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, mode=app.mode)
_create_message(
db_session_with_containers,
app.id,
conversation.id,
from_account_id=None,
from_end_user_id=str(uuid4()),
)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/daily-end-users",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json()["data"][0]["terminal_count"] == 1
def test_daily_token_cost_statistic(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, mode=app.mode)
_create_message(
db_session_with_containers,
app.id,
conversation.id,
from_account_id=account.id,
message_tokens=40,
answer_tokens=60,
total_price=Decimal("0.02"),
)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/token-costs",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
payload = response.get_json()
assert payload["data"][0]["token_count"] == 100
assert Decimal(payload["data"][0]["total_price"]) == Decimal("0.02")
def test_average_session_interaction_statistic(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, mode=app.mode)
_create_message(db_session_with_containers, app.id, conversation.id, from_account_id=account.id)
_create_message(db_session_with_containers, app.id, conversation.id, from_account_id=account.id)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/average-session-interactions",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json()["data"][0]["interactions"] == 2.0
def test_user_satisfaction_rate_statistic(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, mode=app.mode)
first = _create_message(db_session_with_containers, app.id, conversation.id, from_account_id=account.id)
for _ in range(9):
_create_message(db_session_with_containers, app.id, conversation.id, from_account_id=account.id)
_create_like_feedback(db_session_with_containers, app.id, conversation.id, first.id, account.id)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/user-satisfaction-rate",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json()["data"][0]["rate"] == 100.0
def test_average_response_time_statistic(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.COMPLETION)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, mode=app.mode)
_create_message(
db_session_with_containers,
app.id,
conversation.id,
from_account_id=account.id,
provider_response_latency=1.234,
)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/average-response-time",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json()["data"][0]["latency"] == 1234.0
def test_tokens_per_second_statistic(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
conversation = _create_conversation(db_session_with_containers, app.id, account.id, mode=app.mode)
_create_message(
db_session_with_containers,
app.id,
conversation.id,
from_account_id=account.id,
answer_tokens=31,
provider_response_latency=2.0,
)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/tokens-per-second",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json()["data"][0]["tps"] == 15.5
def test_invalid_time_range(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
with patch("controllers.console.app.statistic.parse_time_range", side_effect=ValueError("Invalid time")):
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/daily-messages?start=invalid&end=invalid",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 400
assert response.get_json()["message"] == "Invalid time"
def test_time_range_params_passed(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
import datetime
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.CHAT)
start = datetime.datetime.now()
end = datetime.datetime.now()
with patch("controllers.console.app.statistic.parse_time_range", return_value=(start, end)) as mock_parse:
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/statistics/daily-messages?start=something&end=something",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
mock_parse.assert_called_once_with("something", "something", "UTC")

View File

@ -0,0 +1,415 @@
"""Authenticated controller integration tests for workflow draft variable APIs."""
import uuid
from flask.testing import FlaskClient
from sqlalchemy import select
from sqlalchemy.orm import Session
from dify_graph.constants import CONVERSATION_VARIABLE_NODE_ID, ENVIRONMENT_VARIABLE_NODE_ID
from dify_graph.variables.segments import StringSegment
from factories.variable_factory import segment_to_variable
from models import Workflow
from models.model import AppMode
from models.workflow import WorkflowDraftVariable
from tests.test_containers_integration_tests.controllers.console.helpers import (
authenticate_console_client,
create_console_account_and_tenant,
create_console_app,
)
def _create_draft_workflow(
db_session: Session,
app_id: str,
tenant_id: str,
account_id: str,
*,
environment_variables: list | None = None,
conversation_variables: list | None = None,
) -> Workflow:
workflow = Workflow.new(
tenant_id=tenant_id,
app_id=app_id,
type="workflow",
version=Workflow.VERSION_DRAFT,
graph='{"nodes": [], "edges": []}',
features="{}",
created_by=account_id,
environment_variables=environment_variables or [],
conversation_variables=conversation_variables or [],
rag_pipeline_variables=[],
)
db_session.add(workflow)
db_session.commit()
return workflow
def _create_node_variable(
db_session: Session,
app_id: str,
user_id: str,
*,
node_id: str = "node_1",
name: str = "test_var",
) -> WorkflowDraftVariable:
variable = WorkflowDraftVariable.new_node_variable(
app_id=app_id,
user_id=user_id,
node_id=node_id,
name=name,
value=StringSegment(value="test_value"),
node_execution_id=str(uuid.uuid4()),
visible=True,
editable=True,
)
db_session.add(variable)
db_session.commit()
return variable
def _create_system_variable(
db_session: Session, app_id: str, user_id: str, name: str = "query"
) -> WorkflowDraftVariable:
variable = WorkflowDraftVariable.new_sys_variable(
app_id=app_id,
user_id=user_id,
name=name,
value=StringSegment(value="system-value"),
node_execution_id=str(uuid.uuid4()),
editable=True,
)
db_session.add(variable)
db_session.commit()
return variable
def _build_environment_variable(name: str, value: str):
return segment_to_variable(
segment=StringSegment(value=value),
selector=[ENVIRONMENT_VARIABLE_NODE_ID, name],
name=name,
description=f"Environment variable {name}",
)
def _build_conversation_variable(name: str, value: str):
return segment_to_variable(
segment=StringSegment(value=value),
selector=[CONVERSATION_VARIABLE_NODE_ID, name],
name=name,
description=f"Conversation variable {name}",
)
def test_workflow_variable_collection_get_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
_create_draft_workflow(db_session_with_containers, app.id, tenant.id, account.id)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/workflows/draft/variables?page=1&limit=20",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json() == {"items": [], "total": 0}
def test_workflow_variable_collection_get_not_exist(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/workflows/draft/variables",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 404
payload = response.get_json()
assert payload is not None
assert payload["code"] == "draft_workflow_not_exist"
def test_workflow_variable_collection_delete(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
_create_node_variable(db_session_with_containers, app.id, account.id)
_create_node_variable(db_session_with_containers, app.id, account.id, node_id="node_2", name="other_var")
response = test_client_with_containers.delete(
f"/console/api/apps/{app.id}/workflows/draft/variables",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 204
remaining = db_session_with_containers.scalars(
select(WorkflowDraftVariable).where(
WorkflowDraftVariable.app_id == app.id,
WorkflowDraftVariable.user_id == account.id,
)
).all()
assert remaining == []
def test_node_variable_collection_get_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
node_variable = _create_node_variable(db_session_with_containers, app.id, account.id, node_id="node_123")
_create_node_variable(db_session_with_containers, app.id, account.id, node_id="node_456", name="other")
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/workflows/draft/nodes/node_123/variables",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert [item["id"] for item in payload["items"]] == [node_variable.id]
def test_node_variable_collection_get_invalid_node_id(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/workflows/draft/nodes/sys/variables",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 400
payload = response.get_json()
assert payload is not None
assert payload["code"] == "invalid_param"
def test_node_variable_collection_delete(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
target = _create_node_variable(db_session_with_containers, app.id, account.id, node_id="node_123")
untouched = _create_node_variable(db_session_with_containers, app.id, account.id, node_id="node_456")
target_id = target.id
untouched_id = untouched.id
response = test_client_with_containers.delete(
f"/console/api/apps/{app.id}/workflows/draft/nodes/node_123/variables",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 204
assert (
db_session_with_containers.scalar(select(WorkflowDraftVariable).where(WorkflowDraftVariable.id == target_id))
is None
)
assert (
db_session_with_containers.scalar(select(WorkflowDraftVariable).where(WorkflowDraftVariable.id == untouched_id))
is not None
)
def test_variable_api_get_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
_create_draft_workflow(db_session_with_containers, app.id, tenant.id, account.id)
variable = _create_node_variable(db_session_with_containers, app.id, account.id)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/workflows/draft/variables/{variable.id}",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert payload["id"] == variable.id
assert payload["name"] == "test_var"
def test_variable_api_get_not_found(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
_create_draft_workflow(db_session_with_containers, app.id, tenant.id, account.id)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/workflows/draft/variables/{uuid.uuid4()}",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 404
payload = response.get_json()
assert payload is not None
assert payload["code"] == "not_found"
def test_variable_api_patch_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
_create_draft_workflow(db_session_with_containers, app.id, tenant.id, account.id)
variable = _create_node_variable(db_session_with_containers, app.id, account.id)
response = test_client_with_containers.patch(
f"/console/api/apps/{app.id}/workflows/draft/variables/{variable.id}",
headers=authenticate_console_client(test_client_with_containers, account),
json={"name": "renamed_var"},
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert payload["id"] == variable.id
assert payload["name"] == "renamed_var"
refreshed = db_session_with_containers.scalar(
select(WorkflowDraftVariable).where(WorkflowDraftVariable.id == variable.id)
)
assert refreshed is not None
assert refreshed.name == "renamed_var"
def test_variable_api_delete_success(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
_create_draft_workflow(db_session_with_containers, app.id, tenant.id, account.id)
variable = _create_node_variable(db_session_with_containers, app.id, account.id)
response = test_client_with_containers.delete(
f"/console/api/apps/{app.id}/workflows/draft/variables/{variable.id}",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 204
assert (
db_session_with_containers.scalar(select(WorkflowDraftVariable).where(WorkflowDraftVariable.id == variable.id))
is None
)
def test_variable_reset_api_put_success_returns_no_content_without_execution(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
_create_draft_workflow(db_session_with_containers, app.id, tenant.id, account.id)
variable = _create_node_variable(db_session_with_containers, app.id, account.id)
response = test_client_with_containers.put(
f"/console/api/apps/{app.id}/workflows/draft/variables/{variable.id}/reset",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 204
assert (
db_session_with_containers.scalar(select(WorkflowDraftVariable).where(WorkflowDraftVariable.id == variable.id))
is None
)
def test_conversation_variable_collection_get(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
_create_draft_workflow(
db_session_with_containers,
app.id,
tenant.id,
account.id,
conversation_variables=[_build_conversation_variable("session_name", "Alice")],
)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/workflows/draft/conversation-variables",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert [item["name"] for item in payload["items"]] == ["session_name"]
created = db_session_with_containers.scalars(
select(WorkflowDraftVariable).where(
WorkflowDraftVariable.app_id == app.id,
WorkflowDraftVariable.user_id == account.id,
WorkflowDraftVariable.node_id == CONVERSATION_VARIABLE_NODE_ID,
)
).all()
assert len(created) == 1
def test_system_variable_collection_get(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
variable = _create_system_variable(db_session_with_containers, app.id, account.id)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/workflows/draft/system-variables",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert [item["id"] for item in payload["items"]] == [variable.id]
def test_environment_variable_collection_get(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
app = create_console_app(db_session_with_containers, tenant.id, account.id, AppMode.WORKFLOW)
_create_draft_workflow(
db_session_with_containers,
app.id,
tenant.id,
account.id,
environment_variables=[_build_environment_variable("api_key", "secret-value")],
)
response = test_client_with_containers.get(
f"/console/api/apps/{app.id}/workflows/draft/environment-variables",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert payload["items"][0]["name"] == "api_key"
assert payload["items"][0]["value"] == "secret-value"

View File

@ -0,0 +1,131 @@
"""Controller integration tests for API key data source auth routes."""
import json
from unittest.mock import patch
from flask.testing import FlaskClient
from sqlalchemy import select
from sqlalchemy.orm import Session
from models.source import DataSourceApiKeyAuthBinding
from tests.test_containers_integration_tests.controllers.console.helpers import (
authenticate_console_client,
create_console_account_and_tenant,
)
def test_get_api_key_auth_data_source(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
binding = DataSourceApiKeyAuthBinding(
tenant_id=tenant.id,
category="api_key",
provider="custom_provider",
credentials=json.dumps({"auth_type": "api_key", "config": {"api_key": "encrypted"}}),
disabled=False,
)
db_session_with_containers.add(binding)
db_session_with_containers.commit()
response = test_client_with_containers.get(
"/console/api/api-key-auth/data-source",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert len(payload["sources"]) == 1
assert payload["sources"][0]["provider"] == "custom_provider"
def test_get_api_key_auth_data_source_empty(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, _tenant = create_console_account_and_tenant(db_session_with_containers)
response = test_client_with_containers.get(
"/console/api/api-key-auth/data-source",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json() == {"sources": []}
def test_create_binding_successful(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, _tenant = create_console_account_and_tenant(db_session_with_containers)
with (
patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.validate_api_key_auth_args"),
patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.create_provider_auth"),
):
response = test_client_with_containers.post(
"/console/api/api-key-auth/data-source/binding",
json={"category": "api_key", "provider": "custom", "credentials": {"key": "value"}},
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json() == {"result": "success"}
def test_create_binding_failure(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, _tenant = create_console_account_and_tenant(db_session_with_containers)
with (
patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.validate_api_key_auth_args"),
patch(
"controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.create_provider_auth",
side_effect=ValueError("Invalid structure"),
),
):
response = test_client_with_containers.post(
"/console/api/api-key-auth/data-source/binding",
json={"category": "api_key", "provider": "custom", "credentials": {"key": "value"}},
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 500
payload = response.get_json()
assert payload is not None
assert payload["code"] == "auth_failed"
assert payload["message"] == "Invalid structure"
def test_delete_binding_successful(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
binding = DataSourceApiKeyAuthBinding(
tenant_id=tenant.id,
category="api_key",
provider="custom_provider",
credentials=json.dumps({"auth_type": "api_key", "config": {"api_key": "encrypted"}}),
disabled=False,
)
db_session_with_containers.add(binding)
db_session_with_containers.commit()
response = test_client_with_containers.delete(
f"/console/api/api-key-auth/data-source/{binding.id}",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 204
assert (
db_session_with_containers.scalar(
select(DataSourceApiKeyAuthBinding).where(DataSourceApiKeyAuthBinding.id == binding.id)
)
is None
)

View File

@ -0,0 +1,120 @@
"""Controller integration tests for console OAuth data source routes."""
from unittest.mock import MagicMock, patch
from flask.testing import FlaskClient
from sqlalchemy.orm import Session
from models.source import DataSourceOauthBinding
from tests.test_containers_integration_tests.controllers.console.helpers import (
authenticate_console_client,
create_console_account_and_tenant,
)
def test_get_oauth_url_successful(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
provider = MagicMock()
provider.get_authorization_url.return_value = "http://oauth.provider/auth"
with (
patch("controllers.console.auth.data_source_oauth.get_oauth_providers", return_value={"notion": provider}),
patch("controllers.console.auth.data_source_oauth.dify_config.NOTION_INTEGRATION_TYPE", None),
):
response = test_client_with_containers.get(
"/console/api/oauth/data-source/notion",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert tenant.id == account.current_tenant_id
assert response.status_code == 200
assert response.get_json() == {"data": "http://oauth.provider/auth"}
provider.get_authorization_url.assert_called_once()
def test_get_oauth_url_invalid_provider(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, _tenant = create_console_account_and_tenant(db_session_with_containers)
with patch("controllers.console.auth.data_source_oauth.get_oauth_providers", return_value={"notion": MagicMock()}):
response = test_client_with_containers.get(
"/console/api/oauth/data-source/unknown_provider",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 400
assert response.get_json() == {"error": "Invalid provider"}
def test_oauth_callback_successful(test_client_with_containers: FlaskClient) -> None:
with patch("controllers.console.auth.data_source_oauth.get_oauth_providers", return_value={"notion": MagicMock()}):
response = test_client_with_containers.get("/console/api/oauth/data-source/callback/notion?code=mock_code")
assert response.status_code == 302
assert "code=mock_code" in response.location
def test_oauth_callback_missing_code(test_client_with_containers: FlaskClient) -> None:
with patch("controllers.console.auth.data_source_oauth.get_oauth_providers", return_value={"notion": MagicMock()}):
response = test_client_with_containers.get("/console/api/oauth/data-source/callback/notion")
assert response.status_code == 302
assert "error=Access%20denied" in response.location
def test_oauth_callback_invalid_provider(test_client_with_containers: FlaskClient) -> None:
with patch("controllers.console.auth.data_source_oauth.get_oauth_providers", return_value={"notion": MagicMock()}):
response = test_client_with_containers.get("/console/api/oauth/data-source/callback/invalid?code=mock_code")
assert response.status_code == 400
assert response.get_json() == {"error": "Invalid provider"}
def test_get_binding_successful(test_client_with_containers: FlaskClient) -> None:
provider = MagicMock()
with patch("controllers.console.auth.data_source_oauth.get_oauth_providers", return_value={"notion": provider}):
response = test_client_with_containers.get("/console/api/oauth/data-source/binding/notion?code=auth_code_123")
assert response.status_code == 200
assert response.get_json() == {"result": "success"}
provider.get_access_token.assert_called_once_with("auth_code_123")
def test_get_binding_missing_code(test_client_with_containers: FlaskClient) -> None:
with patch("controllers.console.auth.data_source_oauth.get_oauth_providers", return_value={"notion": MagicMock()}):
response = test_client_with_containers.get("/console/api/oauth/data-source/binding/notion?code=")
assert response.status_code == 400
assert response.get_json() == {"error": "Invalid code"}
def test_sync_successful(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, tenant = create_console_account_and_tenant(db_session_with_containers)
binding = DataSourceOauthBinding(
tenant_id=tenant.id,
access_token="test-access-token",
provider="notion",
source_info={"workspace_name": "Workspace", "workspace_icon": None, "workspace_id": tenant.id, "pages": []},
disabled=False,
)
db_session_with_containers.add(binding)
db_session_with_containers.commit()
provider = MagicMock()
with patch("controllers.console.auth.data_source_oauth.get_oauth_providers", return_value={"notion": provider}):
response = test_client_with_containers.get(
f"/console/api/oauth/data-source/notion/{binding.id}/sync",
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json() == {"result": "success"}
provider.sync_data_source.assert_called_once_with(binding.id)

View File

@ -0,0 +1,365 @@
"""Controller integration tests for console OAuth server routes."""
from unittest.mock import patch
from flask.testing import FlaskClient
from sqlalchemy.orm import Session
from models.model import OAuthProviderApp
from services.oauth_server import OAUTH_ACCESS_TOKEN_EXPIRES_IN
from tests.test_containers_integration_tests.controllers.console.helpers import (
authenticate_console_client,
create_console_account_and_tenant,
ensure_dify_setup,
)
def _build_oauth_provider_app() -> OAuthProviderApp:
return OAuthProviderApp(
app_icon="icon_url",
client_id="test_client_id",
client_secret="test_secret",
app_label={"en-US": "Test App"},
redirect_uris=["http://localhost/callback"],
scope="read,write",
)
def test_oauth_provider_successful_post(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider",
json={"client_id": "test_client_id", "redirect_uri": "http://localhost/callback"},
)
assert response.status_code == 200
payload = response.get_json()
assert payload is not None
assert payload["app_icon"] == "icon_url"
assert payload["app_label"] == {"en-US": "Test App"}
assert payload["scope"] == "read,write"
def test_oauth_provider_invalid_redirect_uri(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider",
json={"client_id": "test_client_id", "redirect_uri": "http://invalid/callback"},
)
assert response.status_code == 400
payload = response.get_json()
assert payload is not None
assert "redirect_uri is invalid" in payload["message"]
def test_oauth_provider_invalid_client_id(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
response = test_client_with_containers.post(
"/console/api/oauth/provider",
json={"client_id": "test_invalid_client_id", "redirect_uri": "http://localhost/callback"},
)
assert response.status_code == 404
payload = response.get_json()
assert payload is not None
assert "client_id is invalid" in payload["message"]
def test_oauth_authorize_successful(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
account, _tenant = create_console_account_and_tenant(db_session_with_containers)
with (
patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
),
patch(
"controllers.console.auth.oauth_server.OAuthServerService.sign_oauth_authorization_code",
return_value="auth_code_123",
) as mock_sign,
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/authorize",
json={"client_id": "test_client_id"},
headers=authenticate_console_client(test_client_with_containers, account),
)
assert response.status_code == 200
assert response.get_json() == {"code": "auth_code_123"}
mock_sign.assert_called_once_with("test_client_id", account.id)
def test_oauth_token_authorization_code_grant(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with (
patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
),
patch(
"controllers.console.auth.oauth_server.OAuthServerService.sign_oauth_access_token",
return_value=("access_123", "refresh_123"),
),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/token",
json={
"client_id": "test_client_id",
"grant_type": "authorization_code",
"code": "auth_code",
"client_secret": "test_secret",
"redirect_uri": "http://localhost/callback",
},
)
assert response.status_code == 200
assert response.get_json() == {
"access_token": "access_123",
"token_type": "Bearer",
"expires_in": OAUTH_ACCESS_TOKEN_EXPIRES_IN,
"refresh_token": "refresh_123",
}
def test_oauth_token_authorization_code_grant_missing_code(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/token",
json={
"client_id": "test_client_id",
"grant_type": "authorization_code",
"client_secret": "test_secret",
"redirect_uri": "http://localhost/callback",
},
)
assert response.status_code == 400
assert response.get_json()["message"] == "code is required"
def test_oauth_token_authorization_code_grant_invalid_secret(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/token",
json={
"client_id": "test_client_id",
"grant_type": "authorization_code",
"code": "auth_code",
"client_secret": "invalid_secret",
"redirect_uri": "http://localhost/callback",
},
)
assert response.status_code == 400
assert response.get_json()["message"] == "client_secret is invalid"
def test_oauth_token_authorization_code_grant_invalid_redirect_uri(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/token",
json={
"client_id": "test_client_id",
"grant_type": "authorization_code",
"code": "auth_code",
"client_secret": "test_secret",
"redirect_uri": "http://invalid/callback",
},
)
assert response.status_code == 400
assert response.get_json()["message"] == "redirect_uri is invalid"
def test_oauth_token_refresh_token_grant(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with (
patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
),
patch(
"controllers.console.auth.oauth_server.OAuthServerService.sign_oauth_access_token",
return_value=("new_access", "new_refresh"),
),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/token",
json={"client_id": "test_client_id", "grant_type": "refresh_token", "refresh_token": "refresh_123"},
)
assert response.status_code == 200
assert response.get_json() == {
"access_token": "new_access",
"token_type": "Bearer",
"expires_in": OAUTH_ACCESS_TOKEN_EXPIRES_IN,
"refresh_token": "new_refresh",
}
def test_oauth_token_refresh_token_grant_missing_token(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/token",
json={"client_id": "test_client_id", "grant_type": "refresh_token"},
)
assert response.status_code == 400
assert response.get_json()["message"] == "refresh_token is required"
def test_oauth_token_invalid_grant_type(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/token",
json={"client_id": "test_client_id", "grant_type": "invalid_grant"},
)
assert response.status_code == 400
assert response.get_json()["message"] == "invalid grant_type"
def test_oauth_account_successful_retrieval(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
account, _tenant = create_console_account_and_tenant(db_session_with_containers)
account.avatar = "avatar_url"
db_session_with_containers.commit()
with (
patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
),
patch(
"controllers.console.auth.oauth_server.OAuthServerService.validate_oauth_access_token",
return_value=account,
),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/account",
json={"client_id": "test_client_id"},
headers={"Authorization": "Bearer valid_access_token"},
)
assert response.status_code == 200
assert response.get_json() == {
"name": "Test User",
"email": account.email,
"avatar": "avatar_url",
"interface_language": "en-US",
"timezone": "UTC",
}
def test_oauth_account_missing_authorization_header(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/account",
json={"client_id": "test_client_id"},
)
assert response.status_code == 401
assert response.get_json() == {"error": "Authorization header is required"}
def test_oauth_account_invalid_authorization_header_format(
db_session_with_containers: Session,
test_client_with_containers: FlaskClient,
) -> None:
ensure_dify_setup(db_session_with_containers)
with patch(
"controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app",
return_value=_build_oauth_provider_app(),
):
response = test_client_with_containers.post(
"/console/api/oauth/provider/account",
json={"client_id": "test_client_id"},
headers={"Authorization": "InvalidFormat"},
)
assert response.status_code == 401
assert response.get_json() == {"error": "Invalid Authorization header format"}

View File

@ -0,0 +1,85 @@
"""Shared helpers for authenticated console controller integration tests."""
import uuid
from flask.testing import FlaskClient
from sqlalchemy import select
from sqlalchemy.orm import Session
from configs import dify_config
from constants import HEADER_NAME_CSRF_TOKEN
from libs.datetime_utils import naive_utc_now
from libs.token import _real_cookie_name, generate_csrf_token
from models import Account, DifySetup, Tenant, TenantAccountJoin
from models.account import AccountStatus, TenantAccountRole
from models.model import App, AppMode
from services.account_service import AccountService
def ensure_dify_setup(db_session: Session) -> None:
"""Create a setup marker once so setup-protected console routes can be exercised."""
if db_session.scalar(select(DifySetup).limit(1)) is not None:
return
db_session.add(DifySetup(version=dify_config.project.version))
db_session.commit()
def create_console_account_and_tenant(db_session: Session) -> tuple[Account, Tenant]:
"""Create an initialized owner account with a current tenant."""
account = Account(
email=f"test-{uuid.uuid4()}@example.com",
name="Test User",
interface_language="en-US",
status=AccountStatus.ACTIVE,
)
account.initialized_at = naive_utc_now()
db_session.add(account)
db_session.commit()
tenant = Tenant(name="Test Tenant", status="normal")
db_session.add(tenant)
db_session.commit()
db_session.add(
TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER,
current=True,
)
)
db_session.commit()
account.set_tenant_id(tenant.id)
account.timezone = "UTC"
db_session.commit()
ensure_dify_setup(db_session)
return account, tenant
def create_console_app(db_session: Session, tenant_id: str, account_id: str, mode: AppMode) -> App:
"""Create a minimal app row that can be loaded by get_app_model."""
app = App(
tenant_id=tenant_id,
name="Test App",
mode=mode,
enable_site=True,
enable_api=True,
created_by=account_id,
)
db_session.add(app)
db_session.commit()
return app
def authenticate_console_client(test_client: FlaskClient, account: Account) -> dict[str, str]:
"""Attach console auth cookies/headers for endpoints guarded by login_required."""
access_token = AccountService.get_account_jwt_token(account)
csrf_token = generate_csrf_token(account.id)
test_client.set_cookie(_real_cookie_name("csrf_token"), csrf_token, domain="localhost")
return {
"Authorization": f"Bearer {access_token}",
HEADER_NAME_CSRF_TOKEN: csrf_token,
}

View File

@ -1,320 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
from flask import Flask, request
from werkzeug.exceptions import InternalServerError, NotFound
from werkzeug.local import LocalProxy
from controllers.console.app.error import (
ProviderModelCurrentlyNotSupportError,
ProviderNotInitializeError,
ProviderQuotaExceededError,
)
from controllers.console.app.message import (
ChatMessageListApi,
ChatMessagesQuery,
FeedbackExportQuery,
MessageAnnotationCountApi,
MessageApi,
MessageFeedbackApi,
MessageFeedbackExportApi,
MessageFeedbackPayload,
MessageSuggestedQuestionApi,
)
from controllers.console.explore.error import AppSuggestedQuestionsAfterAnswerDisabledError
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from models import App, AppMode
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError
@pytest.fixture
def app():
flask_app = Flask(__name__)
flask_app.config["TESTING"] = True
flask_app.config["RESTX_MASK_HEADER"] = "X-Fields"
return flask_app
@pytest.fixture
def mock_account():
from models.account import Account, AccountStatus
account = MagicMock(spec=Account)
account.id = "user_123"
account.timezone = "UTC"
account.status = AccountStatus.ACTIVE
account.is_admin_or_owner = True
account.current_tenant.current_role = "owner"
account.has_edit_permission = True
return account
@pytest.fixture
def mock_app_model():
app_model = MagicMock(spec=App)
app_model.id = "app_123"
app_model.mode = AppMode.CHAT
app_model.tenant_id = "tenant_123"
return app_model
@pytest.fixture(autouse=True)
def mock_csrf():
with patch("libs.login.check_csrf_token") as mock:
yield mock
import contextlib
@contextlib.contextmanager
def setup_test_context(
test_app, endpoint_class, route_path, method, mock_account, mock_app_model, payload=None, qs=None
):
with (
patch("extensions.ext_database.db") as mock_db,
patch("controllers.console.app.wraps.db", mock_db),
patch("controllers.console.wraps.db", mock_db),
patch("controllers.console.app.message.db", mock_db),
patch("controllers.console.app.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
patch("controllers.console.app.message.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
):
# Set up a generic query mock that usually returns mock_app_model when getting app
app_query_mock = MagicMock()
app_query_mock.filter.return_value.first.return_value = mock_app_model
app_query_mock.filter.return_value.filter.return_value.first.return_value = mock_app_model
app_query_mock.where.return_value.first.return_value = mock_app_model
app_query_mock.where.return_value.where.return_value.first.return_value = mock_app_model
data_query_mock = MagicMock()
def query_side_effect(*args, **kwargs):
if args and hasattr(args[0], "__name__") and args[0].__name__ == "App":
return app_query_mock
return data_query_mock
mock_db.session.query.side_effect = query_side_effect
mock_db.data_query = data_query_mock
# Let the caller override the stat db query logic
proxy_mock = LocalProxy(lambda: mock_account)
query_string = "&".join([f"{k}={v}" for k, v in (qs or {}).items()])
full_path = f"{route_path}?{query_string}" if qs else route_path
with (
patch("libs.login.current_user", proxy_mock),
patch("flask_login.current_user", proxy_mock),
patch("controllers.console.app.message.attach_message_extra_contents", return_value=None),
):
with test_app.test_request_context(full_path, method=method, json=payload):
request.view_args = {"app_id": "app_123"}
if "suggested-questions" in route_path:
# simplistic extraction for message_id
parts = route_path.split("chat-messages/")
if len(parts) > 1:
request.view_args["message_id"] = parts[1].split("/")[0]
elif "messages/" in route_path and "chat-messages" not in route_path:
parts = route_path.split("messages/")
if len(parts) > 1:
request.view_args["message_id"] = parts[1].split("/")[0]
api_instance = endpoint_class()
# Check if it has a dispatch_request or method
if hasattr(api_instance, method.lower()):
yield api_instance, mock_db, request.view_args
class TestMessageValidators:
def test_chat_messages_query_validators(self):
# Test empty_to_none
assert ChatMessagesQuery.empty_to_none("") is None
assert ChatMessagesQuery.empty_to_none("val") == "val"
# Test validate_uuid
assert ChatMessagesQuery.validate_uuid(None) is None
assert (
ChatMessagesQuery.validate_uuid("123e4567-e89b-12d3-a456-426614174000")
== "123e4567-e89b-12d3-a456-426614174000"
)
def test_message_feedback_validators(self):
assert (
MessageFeedbackPayload.validate_message_id("123e4567-e89b-12d3-a456-426614174000")
== "123e4567-e89b-12d3-a456-426614174000"
)
def test_feedback_export_validators(self):
assert FeedbackExportQuery.parse_bool(None) is None
assert FeedbackExportQuery.parse_bool(True) is True
assert FeedbackExportQuery.parse_bool("1") is True
assert FeedbackExportQuery.parse_bool("0") is False
assert FeedbackExportQuery.parse_bool("off") is False
with pytest.raises(ValueError):
FeedbackExportQuery.parse_bool("invalid")
class TestMessageEndpoints:
def test_chat_message_list_not_found(self, app, mock_account, mock_app_model):
with setup_test_context(
app,
ChatMessageListApi,
"/apps/app_123/chat-messages",
"GET",
mock_account,
mock_app_model,
qs={"conversation_id": "123e4567-e89b-12d3-a456-426614174000"},
) as (api, mock_db, v_args):
mock_db.session.scalar.return_value = None
with pytest.raises(NotFound):
api.get(**v_args)
def test_chat_message_list_success(self, app, mock_account, mock_app_model):
with setup_test_context(
app,
ChatMessageListApi,
"/apps/app_123/chat-messages",
"GET",
mock_account,
mock_app_model,
qs={"conversation_id": "123e4567-e89b-12d3-a456-426614174000", "limit": 1},
) as (api, mock_db, v_args):
mock_conv = MagicMock()
mock_conv.id = "123e4567-e89b-12d3-a456-426614174000"
mock_msg = MagicMock()
mock_msg.id = "msg_123"
mock_msg.feedbacks = []
mock_msg.annotation = None
mock_msg.annotation_hit_history = None
mock_msg.agent_thoughts = []
mock_msg.message_files = []
mock_msg.extra_contents = []
mock_msg.message = {}
mock_msg.message_metadata_dict = {}
# scalar() is called twice: first for conversation lookup, second for has_more check
mock_db.session.scalar.side_effect = [mock_conv, False]
scalars_result = MagicMock()
scalars_result.all.return_value = [mock_msg]
mock_db.session.scalars.return_value = scalars_result
resp = api.get(**v_args)
assert resp["limit"] == 1
assert resp["has_more"] is False
assert len(resp["data"]) == 1
def test_message_feedback_not_found(self, app, mock_account, mock_app_model):
with setup_test_context(
app,
MessageFeedbackApi,
"/apps/app_123/feedbacks",
"POST",
mock_account,
mock_app_model,
payload={"message_id": "123e4567-e89b-12d3-a456-426614174000"},
) as (api, mock_db, v_args):
mock_db.session.scalar.return_value = None
with pytest.raises(NotFound):
api.post(**v_args)
def test_message_feedback_success(self, app, mock_account, mock_app_model):
payload = {"message_id": "123e4567-e89b-12d3-a456-426614174000", "rating": "like"}
with setup_test_context(
app, MessageFeedbackApi, "/apps/app_123/feedbacks", "POST", mock_account, mock_app_model, payload=payload
) as (api, mock_db, v_args):
mock_msg = MagicMock()
mock_msg.admin_feedback = None
mock_db.session.scalar.return_value = mock_msg
resp = api.post(**v_args)
assert resp == {"result": "success"}
def test_message_annotation_count(self, app, mock_account, mock_app_model):
with setup_test_context(
app, MessageAnnotationCountApi, "/apps/app_123/annotations/count", "GET", mock_account, mock_app_model
) as (api, mock_db, v_args):
mock_db.session.scalar.return_value = 5
resp = api.get(**v_args)
assert resp == {"count": 5}
@patch("controllers.console.app.message.MessageService")
def test_message_suggested_questions_success(self, mock_msg_srv, app, mock_account, mock_app_model):
mock_msg_srv.get_suggested_questions_after_answer.return_value = ["q1", "q2"]
with setup_test_context(
app,
MessageSuggestedQuestionApi,
"/apps/app_123/chat-messages/msg_123/suggested-questions",
"GET",
mock_account,
mock_app_model,
) as (api, mock_db, v_args):
resp = api.get(**v_args)
assert resp == {"data": ["q1", "q2"]}
@pytest.mark.parametrize(
("exc", "expected_exc"),
[
(MessageNotExistsError, NotFound),
(ConversationNotExistsError, NotFound),
(ProviderTokenNotInitError, ProviderNotInitializeError),
(QuotaExceededError, ProviderQuotaExceededError),
(ModelCurrentlyNotSupportError, ProviderModelCurrentlyNotSupportError),
(SuggestedQuestionsAfterAnswerDisabledError, AppSuggestedQuestionsAfterAnswerDisabledError),
(Exception, InternalServerError),
],
)
@patch("controllers.console.app.message.MessageService")
def test_message_suggested_questions_errors(
self, mock_msg_srv, exc, expected_exc, app, mock_account, mock_app_model
):
mock_msg_srv.get_suggested_questions_after_answer.side_effect = exc()
with setup_test_context(
app,
MessageSuggestedQuestionApi,
"/apps/app_123/chat-messages/msg_123/suggested-questions",
"GET",
mock_account,
mock_app_model,
) as (api, mock_db, v_args):
with pytest.raises(expected_exc):
api.get(**v_args)
@patch("services.feedback_service.FeedbackService.export_feedbacks")
def test_message_feedback_export_success(self, mock_export, app, mock_account, mock_app_model):
mock_export.return_value = {"exported": True}
with setup_test_context(
app, MessageFeedbackExportApi, "/apps/app_123/feedbacks/export", "GET", mock_account, mock_app_model
) as (api, mock_db, v_args):
resp = api.get(**v_args)
assert resp == {"exported": True}
def test_message_api_get_success(self, app, mock_account, mock_app_model):
with setup_test_context(
app, MessageApi, "/apps/app_123/messages/msg_123", "GET", mock_account, mock_app_model
) as (api, mock_db, v_args):
mock_msg = MagicMock()
mock_msg.id = "msg_123"
mock_msg.feedbacks = []
mock_msg.annotation = None
mock_msg.annotation_hit_history = None
mock_msg.agent_thoughts = []
mock_msg.message_files = []
mock_msg.extra_contents = []
mock_msg.message = {}
mock_msg.message_metadata_dict = {}
mock_db.session.scalar.return_value = mock_msg
resp = api.get(**v_args)
assert resp["id"] == "msg_123"

View File

@ -1,275 +0,0 @@
from decimal import Decimal
from unittest.mock import MagicMock, patch
import pytest
from flask import Flask, request
from werkzeug.local import LocalProxy
from controllers.console.app.statistic import (
AverageResponseTimeStatistic,
AverageSessionInteractionStatistic,
DailyConversationStatistic,
DailyMessageStatistic,
DailyTerminalsStatistic,
DailyTokenCostStatistic,
TokensPerSecondStatistic,
UserSatisfactionRateStatistic,
)
from models import App, AppMode
@pytest.fixture
def app():
flask_app = Flask(__name__)
flask_app.config["TESTING"] = True
return flask_app
@pytest.fixture
def mock_account():
from models.account import Account, AccountStatus
account = MagicMock(spec=Account)
account.id = "user_123"
account.timezone = "UTC"
account.status = AccountStatus.ACTIVE
account.is_admin_or_owner = True
account.current_tenant.current_role = "owner"
account.has_edit_permission = True
return account
@pytest.fixture
def mock_app_model():
app_model = MagicMock(spec=App)
app_model.id = "app_123"
app_model.mode = AppMode.CHAT
app_model.tenant_id = "tenant_123"
return app_model
@pytest.fixture(autouse=True)
def mock_csrf():
with patch("libs.login.check_csrf_token") as mock:
yield mock
def setup_test_context(
test_app, endpoint_class, route_path, mock_account, mock_app_model, mock_rs, mock_parse_ret=(None, None)
):
with (
patch("controllers.console.app.statistic.db") as mock_db_stat,
patch("controllers.console.app.wraps.db") as mock_db_wraps,
patch("controllers.console.wraps.db", mock_db_wraps),
patch(
"controllers.console.app.statistic.current_account_with_tenant", return_value=(mock_account, "tenant_123")
),
patch("controllers.console.app.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
):
mock_conn = MagicMock()
mock_conn.execute.return_value = mock_rs
mock_begin = MagicMock()
mock_begin.__enter__.return_value = mock_conn
mock_db_stat.engine.begin.return_value = mock_begin
mock_query = MagicMock()
mock_query.filter.return_value.first.return_value = mock_app_model
mock_query.filter.return_value.filter.return_value.first.return_value = mock_app_model
mock_query.where.return_value.first.return_value = mock_app_model
mock_query.where.return_value.where.return_value.first.return_value = mock_app_model
mock_db_wraps.session.query.return_value = mock_query
proxy_mock = LocalProxy(lambda: mock_account)
with patch("libs.login.current_user", proxy_mock), patch("flask_login.current_user", proxy_mock):
with test_app.test_request_context(route_path, method="GET"):
request.view_args = {"app_id": "app_123"}
api_instance = endpoint_class()
response = api_instance.get(app_id="app_123")
return response
class TestStatisticEndpoints:
def test_daily_message_statistic(self, app, mock_account, mock_app_model):
mock_row = MagicMock()
mock_row.date = "2023-01-01"
mock_row.message_count = 10
mock_row.interactions = Decimal(0)
with patch("controllers.console.app.statistic.parse_time_range", return_value=(None, None)):
response = setup_test_context(
app,
DailyMessageStatistic,
"/apps/app_123/statistics/daily-messages?start=2023-01-01 00:00&end=2023-01-02 00:00",
mock_account,
mock_app_model,
[mock_row],
)
assert response.status_code == 200
assert response.json["data"][0]["message_count"] == 10
def test_daily_conversation_statistic(self, app, mock_account, mock_app_model):
mock_row = MagicMock()
mock_row.date = "2023-01-01"
mock_row.conversation_count = 5
mock_row.interactions = Decimal(0)
with patch("controllers.console.app.statistic.parse_time_range", return_value=(None, None)):
response = setup_test_context(
app,
DailyConversationStatistic,
"/apps/app_123/statistics/daily-conversations",
mock_account,
mock_app_model,
[mock_row],
)
assert response.status_code == 200
assert response.json["data"][0]["conversation_count"] == 5
def test_daily_terminals_statistic(self, app, mock_account, mock_app_model):
mock_row = MagicMock()
mock_row.date = "2023-01-01"
mock_row.terminal_count = 2
mock_row.interactions = Decimal(0)
with patch("controllers.console.app.statistic.parse_time_range", return_value=(None, None)):
response = setup_test_context(
app,
DailyTerminalsStatistic,
"/apps/app_123/statistics/daily-end-users",
mock_account,
mock_app_model,
[mock_row],
)
assert response.status_code == 200
assert response.json["data"][0]["terminal_count"] == 2
def test_daily_token_cost_statistic(self, app, mock_account, mock_app_model):
mock_row = MagicMock()
mock_row.date = "2023-01-01"
mock_row.token_count = 100
mock_row.total_price = Decimal("0.02")
mock_row.interactions = Decimal(0)
with patch("controllers.console.app.statistic.parse_time_range", return_value=(None, None)):
response = setup_test_context(
app,
DailyTokenCostStatistic,
"/apps/app_123/statistics/token-costs",
mock_account,
mock_app_model,
[mock_row],
)
assert response.status_code == 200
assert response.json["data"][0]["token_count"] == 100
assert response.json["data"][0]["total_price"] == "0.02"
def test_average_session_interaction_statistic(self, app, mock_account, mock_app_model):
mock_row = MagicMock()
mock_row.date = "2023-01-01"
mock_row.interactions = Decimal("3.523")
with patch("controllers.console.app.statistic.parse_time_range", return_value=(None, None)):
response = setup_test_context(
app,
AverageSessionInteractionStatistic,
"/apps/app_123/statistics/average-session-interactions",
mock_account,
mock_app_model,
[mock_row],
)
assert response.status_code == 200
assert response.json["data"][0]["interactions"] == 3.52
def test_user_satisfaction_rate_statistic(self, app, mock_account, mock_app_model):
mock_row = MagicMock()
mock_row.date = "2023-01-01"
mock_row.message_count = 100
mock_row.feedback_count = 10
mock_row.interactions = Decimal(0)
with patch("controllers.console.app.statistic.parse_time_range", return_value=(None, None)):
response = setup_test_context(
app,
UserSatisfactionRateStatistic,
"/apps/app_123/statistics/user-satisfaction-rate",
mock_account,
mock_app_model,
[mock_row],
)
assert response.status_code == 200
assert response.json["data"][0]["rate"] == 100.0
def test_average_response_time_statistic(self, app, mock_account, mock_app_model):
mock_app_model.mode = AppMode.COMPLETION
mock_row = MagicMock()
mock_row.date = "2023-01-01"
mock_row.latency = 1.234
mock_row.interactions = Decimal(0)
with patch("controllers.console.app.statistic.parse_time_range", return_value=(None, None)):
response = setup_test_context(
app,
AverageResponseTimeStatistic,
"/apps/app_123/statistics/average-response-time",
mock_account,
mock_app_model,
[mock_row],
)
assert response.status_code == 200
assert response.json["data"][0]["latency"] == 1234.0
def test_tokens_per_second_statistic(self, app, mock_account, mock_app_model):
mock_row = MagicMock()
mock_row.date = "2023-01-01"
mock_row.tokens_per_second = 15.5
mock_row.interactions = Decimal(0)
with patch("controllers.console.app.statistic.parse_time_range", return_value=(None, None)):
response = setup_test_context(
app,
TokensPerSecondStatistic,
"/apps/app_123/statistics/tokens-per-second",
mock_account,
mock_app_model,
[mock_row],
)
assert response.status_code == 200
assert response.json["data"][0]["tps"] == 15.5
@patch("controllers.console.app.statistic.parse_time_range")
def test_invalid_time_range(self, mock_parse, app, mock_account, mock_app_model):
mock_parse.side_effect = ValueError("Invalid time")
from werkzeug.exceptions import BadRequest
with pytest.raises(BadRequest):
setup_test_context(
app,
DailyMessageStatistic,
"/apps/app_123/statistics/daily-messages?start=invalid&end=invalid",
mock_account,
mock_app_model,
[],
)
@patch("controllers.console.app.statistic.parse_time_range")
def test_time_range_params_passed(self, mock_parse, app, mock_account, mock_app_model):
import datetime
start = datetime.datetime.now()
end = datetime.datetime.now()
mock_parse.return_value = (start, end)
response = setup_test_context(
app,
DailyMessageStatistic,
"/apps/app_123/statistics/daily-messages?start=something&end=something",
mock_account,
mock_app_model,
[],
)
assert response.status_code == 200
mock_parse.assert_called_once()

View File

@ -1,313 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
from flask import Flask, request
from werkzeug.local import LocalProxy
from controllers.console.app.error import DraftWorkflowNotExist
from controllers.console.app.workflow_draft_variable import (
ConversationVariableCollectionApi,
EnvironmentVariableCollectionApi,
NodeVariableCollectionApi,
SystemVariableCollectionApi,
VariableApi,
VariableResetApi,
WorkflowVariableCollectionApi,
)
from controllers.web.error import InvalidArgumentError, NotFoundError
from models import App, AppMode
from models.enums import DraftVariableType
@pytest.fixture
def app():
flask_app = Flask(__name__)
flask_app.config["TESTING"] = True
flask_app.config["RESTX_MASK_HEADER"] = "X-Fields"
return flask_app
@pytest.fixture
def mock_account():
from models.account import Account, AccountStatus
account = MagicMock(spec=Account)
account.id = "user_123"
account.timezone = "UTC"
account.status = AccountStatus.ACTIVE
account.is_admin_or_owner = True
account.current_tenant.current_role = "owner"
account.has_edit_permission = True
return account
@pytest.fixture
def mock_app_model():
app_model = MagicMock(spec=App)
app_model.id = "app_123"
app_model.mode = AppMode.WORKFLOW
app_model.tenant_id = "tenant_123"
return app_model
@pytest.fixture(autouse=True)
def mock_csrf():
with patch("libs.login.check_csrf_token") as mock:
yield mock
def setup_test_context(test_app, endpoint_class, route_path, method, mock_account, mock_app_model, payload=None):
with (
patch("controllers.console.app.wraps.db") as mock_db_wraps,
patch("controllers.console.wraps.db", mock_db_wraps),
patch("controllers.console.app.workflow_draft_variable.db"),
patch("controllers.console.app.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
):
mock_query = MagicMock()
mock_query.filter.return_value.first.return_value = mock_app_model
mock_query.filter.return_value.filter.return_value.first.return_value = mock_app_model
mock_query.where.return_value.first.return_value = mock_app_model
mock_query.where.return_value.where.return_value.first.return_value = mock_app_model
mock_db_wraps.session.query.return_value = mock_query
proxy_mock = LocalProxy(lambda: mock_account)
with patch("libs.login.current_user", proxy_mock), patch("flask_login.current_user", proxy_mock):
with test_app.test_request_context(route_path, method=method, json=payload):
request.view_args = {"app_id": "app_123"}
# extract node_id or variable_id from path manually since view_args overrides
if "nodes/" in route_path:
request.view_args["node_id"] = route_path.split("nodes/")[1].split("/")[0]
if "variables/" in route_path:
# simplistic extraction
parts = route_path.split("variables/")
if len(parts) > 1 and parts[1] and parts[1] != "reset":
request.view_args["variable_id"] = parts[1].split("/")[0]
api_instance = endpoint_class()
# we just call dispatch_request to avoid manual argument passing
if hasattr(api_instance, method.lower()):
func = getattr(api_instance, method.lower())
return func(**request.view_args)
class TestWorkflowDraftVariableEndpoints:
@staticmethod
def _mock_workflow_variable(variable_type: DraftVariableType = DraftVariableType.NODE) -> MagicMock:
class DummyValueType:
def exposed_type(self):
return DraftVariableType.NODE
mock_var = MagicMock()
mock_var.app_id = "app_123"
mock_var.id = "var_123"
mock_var.name = "test_var"
mock_var.description = ""
mock_var.get_variable_type.return_value = variable_type
mock_var.get_selector.return_value = []
mock_var.value_type = DummyValueType()
mock_var.edited = False
mock_var.visible = True
mock_var.file_id = None
mock_var.variable_file = None
mock_var.is_truncated.return_value = False
mock_var.get_value.return_value.model_copy.return_value.value = "test_value"
return mock_var
@patch("controllers.console.app.workflow_draft_variable.WorkflowService")
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_workflow_variable_collection_get_success(
self, mock_draft_srv, mock_wf_srv, app, mock_account, mock_app_model
):
mock_wf_srv.return_value.is_workflow_exist.return_value = True
from services.workflow_draft_variable_service import WorkflowDraftVariableList
mock_draft_srv.return_value.list_variables_without_values.return_value = WorkflowDraftVariableList(
variables=[], total=0
)
resp = setup_test_context(
app,
WorkflowVariableCollectionApi,
"/apps/app_123/workflows/draft/variables?page=1&limit=20",
"GET",
mock_account,
mock_app_model,
)
assert resp == {"items": [], "total": 0}
@patch("controllers.console.app.workflow_draft_variable.WorkflowService")
def test_workflow_variable_collection_get_not_exist(self, mock_wf_srv, app, mock_account, mock_app_model):
mock_wf_srv.return_value.is_workflow_exist.return_value = False
with pytest.raises(DraftWorkflowNotExist):
setup_test_context(
app,
WorkflowVariableCollectionApi,
"/apps/app_123/workflows/draft/variables",
"GET",
mock_account,
mock_app_model,
)
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_workflow_variable_collection_delete(self, mock_draft_srv, app, mock_account, mock_app_model):
resp = setup_test_context(
app,
WorkflowVariableCollectionApi,
"/apps/app_123/workflows/draft/variables",
"DELETE",
mock_account,
mock_app_model,
)
assert resp.status_code == 204
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_node_variable_collection_get_success(self, mock_draft_srv, app, mock_account, mock_app_model):
from services.workflow_draft_variable_service import WorkflowDraftVariableList
mock_draft_srv.return_value.list_node_variables.return_value = WorkflowDraftVariableList(variables=[])
resp = setup_test_context(
app,
NodeVariableCollectionApi,
"/apps/app_123/workflows/draft/nodes/node_123/variables",
"GET",
mock_account,
mock_app_model,
)
assert resp == {"items": []}
def test_node_variable_collection_get_invalid_node_id(self, app, mock_account, mock_app_model):
with pytest.raises(InvalidArgumentError):
setup_test_context(
app,
NodeVariableCollectionApi,
"/apps/app_123/workflows/draft/nodes/sys/variables",
"GET",
mock_account,
mock_app_model,
)
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_node_variable_collection_delete(self, mock_draft_srv, app, mock_account, mock_app_model):
resp = setup_test_context(
app,
NodeVariableCollectionApi,
"/apps/app_123/workflows/draft/nodes/node_123/variables",
"DELETE",
mock_account,
mock_app_model,
)
assert resp.status_code == 204
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_variable_api_get_success(self, mock_draft_srv, app, mock_account, mock_app_model):
mock_draft_srv.return_value.get_variable.return_value = self._mock_workflow_variable()
resp = setup_test_context(
app, VariableApi, "/apps/app_123/workflows/draft/variables/var_123", "GET", mock_account, mock_app_model
)
assert resp["id"] == "var_123"
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_variable_api_get_not_found(self, mock_draft_srv, app, mock_account, mock_app_model):
mock_draft_srv.return_value.get_variable.return_value = None
with pytest.raises(NotFoundError):
setup_test_context(
app, VariableApi, "/apps/app_123/workflows/draft/variables/var_123", "GET", mock_account, mock_app_model
)
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_variable_api_patch_success(self, mock_draft_srv, app, mock_account, mock_app_model):
mock_draft_srv.return_value.get_variable.return_value = self._mock_workflow_variable()
resp = setup_test_context(
app,
VariableApi,
"/apps/app_123/workflows/draft/variables/var_123",
"PATCH",
mock_account,
mock_app_model,
payload={"name": "new_name"},
)
assert resp["id"] == "var_123"
mock_draft_srv.return_value.update_variable.assert_called_once()
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_variable_api_delete_success(self, mock_draft_srv, app, mock_account, mock_app_model):
mock_draft_srv.return_value.get_variable.return_value = self._mock_workflow_variable()
resp = setup_test_context(
app, VariableApi, "/apps/app_123/workflows/draft/variables/var_123", "DELETE", mock_account, mock_app_model
)
assert resp.status_code == 204
mock_draft_srv.return_value.delete_variable.assert_called_once()
@patch("controllers.console.app.workflow_draft_variable.WorkflowService")
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_variable_reset_api_put_success(self, mock_draft_srv, mock_wf_srv, app, mock_account, mock_app_model):
mock_wf_srv.return_value.get_draft_workflow.return_value = MagicMock()
mock_draft_srv.return_value.get_variable.return_value = self._mock_workflow_variable()
mock_draft_srv.return_value.reset_variable.return_value = None # means no content
resp = setup_test_context(
app,
VariableResetApi,
"/apps/app_123/workflows/draft/variables/var_123/reset",
"PUT",
mock_account,
mock_app_model,
)
assert resp.status_code == 204
@patch("controllers.console.app.workflow_draft_variable.WorkflowService")
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_conversation_variable_collection_get(self, mock_draft_srv, mock_wf_srv, app, mock_account, mock_app_model):
mock_wf_srv.return_value.get_draft_workflow.return_value = MagicMock()
from services.workflow_draft_variable_service import WorkflowDraftVariableList
mock_draft_srv.return_value.list_conversation_variables.return_value = WorkflowDraftVariableList(variables=[])
resp = setup_test_context(
app,
ConversationVariableCollectionApi,
"/apps/app_123/workflows/draft/conversation-variables",
"GET",
mock_account,
mock_app_model,
)
assert resp == {"items": []}
@patch("controllers.console.app.workflow_draft_variable.WorkflowDraftVariableService")
def test_system_variable_collection_get(self, mock_draft_srv, app, mock_account, mock_app_model):
from services.workflow_draft_variable_service import WorkflowDraftVariableList
mock_draft_srv.return_value.list_system_variables.return_value = WorkflowDraftVariableList(variables=[])
resp = setup_test_context(
app,
SystemVariableCollectionApi,
"/apps/app_123/workflows/draft/system-variables",
"GET",
mock_account,
mock_app_model,
)
assert resp == {"items": []}
@patch("controllers.console.app.workflow_draft_variable.WorkflowService")
def test_environment_variable_collection_get(self, mock_wf_srv, app, mock_account, mock_app_model):
mock_wf = MagicMock()
mock_wf.environment_variables = []
mock_wf_srv.return_value.get_draft_workflow.return_value = mock_wf
resp = setup_test_context(
app,
EnvironmentVariableCollectionApi,
"/apps/app_123/workflows/draft/environment-variables",
"GET",
mock_account,
mock_app_model,
)
assert resp == {"items": []}

View File

@ -1,209 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
from flask import Flask
from controllers.console.auth.data_source_bearer_auth import (
ApiKeyAuthDataSource,
ApiKeyAuthDataSourceBinding,
ApiKeyAuthDataSourceBindingDelete,
)
from controllers.console.auth.error import ApiKeyAuthFailedError
class TestApiKeyAuthDataSource:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
app.config["WTF_CSRF_ENABLED"] = False
return app
@patch("libs.login.check_csrf_token")
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.get_provider_auth_list")
def test_get_api_key_auth_data_source(self, mock_get_list, mock_db, mock_csrf, app):
from models.account import Account, AccountStatus
mock_account = MagicMock(spec=Account)
mock_account.id = "user_123"
mock_account.status = AccountStatus.ACTIVE
mock_account.is_admin_or_owner = True
mock_account.current_tenant.current_role = "owner"
mock_binding = MagicMock()
mock_binding.id = "bind_123"
mock_binding.category = "api_key"
mock_binding.provider = "custom_provider"
mock_binding.disabled = False
mock_binding.created_at.timestamp.return_value = 1620000000
mock_binding.updated_at.timestamp.return_value = 1620000001
mock_get_list.return_value = [mock_binding]
with (
patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
patch(
"controllers.console.auth.data_source_bearer_auth.current_account_with_tenant",
return_value=(mock_account, "tenant_123"),
),
):
with app.test_request_context("/console/api/api-key-auth/data-source", method="GET"):
proxy_mock = MagicMock()
proxy_mock._get_current_object.return_value = mock_account
with patch("libs.login.current_user", proxy_mock):
api_instance = ApiKeyAuthDataSource()
response = api_instance.get()
assert "sources" in response
assert len(response["sources"]) == 1
assert response["sources"][0]["provider"] == "custom_provider"
@patch("libs.login.check_csrf_token")
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.get_provider_auth_list")
def test_get_api_key_auth_data_source_empty(self, mock_get_list, mock_db, mock_csrf, app):
from models.account import Account, AccountStatus
mock_account = MagicMock(spec=Account)
mock_account.id = "user_123"
mock_account.status = AccountStatus.ACTIVE
mock_account.is_admin_or_owner = True
mock_account.current_tenant.current_role = "owner"
mock_get_list.return_value = None
with (
patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
patch(
"controllers.console.auth.data_source_bearer_auth.current_account_with_tenant",
return_value=(mock_account, "tenant_123"),
),
):
with app.test_request_context("/console/api/api-key-auth/data-source", method="GET"):
proxy_mock = MagicMock()
proxy_mock._get_current_object.return_value = mock_account
with patch("libs.login.current_user", proxy_mock):
api_instance = ApiKeyAuthDataSource()
response = api_instance.get()
assert "sources" in response
assert len(response["sources"]) == 0
class TestApiKeyAuthDataSourceBinding:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
app.config["WTF_CSRF_ENABLED"] = False
return app
@patch("libs.login.check_csrf_token")
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.create_provider_auth")
@patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.validate_api_key_auth_args")
def test_create_binding_successful(self, mock_validate, mock_create, mock_db, mock_csrf, app):
from models.account import Account, AccountStatus
mock_account = MagicMock(spec=Account)
mock_account.id = "user_123"
mock_account.status = AccountStatus.ACTIVE
mock_account.is_admin_or_owner = True
mock_account.current_tenant.current_role = "owner"
with (
patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
patch(
"controllers.console.auth.data_source_bearer_auth.current_account_with_tenant",
return_value=(mock_account, "tenant_123"),
),
):
with app.test_request_context(
"/console/api/api-key-auth/data-source/binding",
method="POST",
json={"category": "api_key", "provider": "custom", "credentials": {"key": "value"}},
):
proxy_mock = MagicMock()
proxy_mock._get_current_object.return_value = mock_account
with patch("libs.login.current_user", proxy_mock), patch("flask_login.current_user", proxy_mock):
api_instance = ApiKeyAuthDataSourceBinding()
response = api_instance.post()
assert response[0]["result"] == "success"
assert response[1] == 200
mock_validate.assert_called_once()
mock_create.assert_called_once()
@patch("libs.login.check_csrf_token")
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.create_provider_auth")
@patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.validate_api_key_auth_args")
def test_create_binding_failure(self, mock_validate, mock_create, mock_db, mock_csrf, app):
from models.account import Account, AccountStatus
mock_account = MagicMock(spec=Account)
mock_account.id = "user_123"
mock_account.status = AccountStatus.ACTIVE
mock_account.is_admin_or_owner = True
mock_account.current_tenant.current_role = "owner"
mock_create.side_effect = ValueError("Invalid structure")
with (
patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
patch(
"controllers.console.auth.data_source_bearer_auth.current_account_with_tenant",
return_value=(mock_account, "tenant_123"),
),
):
with app.test_request_context(
"/console/api/api-key-auth/data-source/binding",
method="POST",
json={"category": "api_key", "provider": "custom", "credentials": {"key": "value"}},
):
proxy_mock = MagicMock()
proxy_mock._get_current_object.return_value = mock_account
with patch("libs.login.current_user", proxy_mock), patch("flask_login.current_user", proxy_mock):
api_instance = ApiKeyAuthDataSourceBinding()
with pytest.raises(ApiKeyAuthFailedError, match="Invalid structure"):
api_instance.post()
class TestApiKeyAuthDataSourceBindingDelete:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
app.config["WTF_CSRF_ENABLED"] = False
return app
@patch("libs.login.check_csrf_token")
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.data_source_bearer_auth.ApiKeyAuthService.delete_provider_auth")
def test_delete_binding_successful(self, mock_delete, mock_db, mock_csrf, app):
from models.account import Account, AccountStatus
mock_account = MagicMock(spec=Account)
mock_account.id = "user_123"
mock_account.status = AccountStatus.ACTIVE
mock_account.is_admin_or_owner = True
mock_account.current_tenant.current_role = "owner"
with (
patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, "tenant_123")),
patch(
"controllers.console.auth.data_source_bearer_auth.current_account_with_tenant",
return_value=(mock_account, "tenant_123"),
),
):
with app.test_request_context("/console/api/api-key-auth/data-source/binding_123", method="DELETE"):
proxy_mock = MagicMock()
proxy_mock._get_current_object.return_value = mock_account
with patch("libs.login.current_user", proxy_mock), patch("flask_login.current_user", proxy_mock):
api_instance = ApiKeyAuthDataSourceBindingDelete()
response = api_instance.delete("binding_123")
assert response[0]["result"] == "success"
assert response[1] == 204
mock_delete.assert_called_once_with("tenant_123", "binding_123")

View File

@ -1,192 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
from flask import Flask
from werkzeug.local import LocalProxy
from controllers.console.auth.data_source_oauth import (
OAuthDataSource,
OAuthDataSourceBinding,
OAuthDataSourceCallback,
OAuthDataSourceSync,
)
class TestOAuthDataSource:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
return app
@patch("controllers.console.auth.data_source_oauth.get_oauth_providers")
@patch("flask_login.current_user")
@patch("libs.login.current_user")
@patch("libs.login.check_csrf_token")
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.data_source_oauth.dify_config.NOTION_INTEGRATION_TYPE", None)
def test_get_oauth_url_successful(
self, mock_db, mock_csrf, mock_libs_user, mock_flask_user, mock_get_providers, app
):
mock_oauth_provider = MagicMock()
mock_oauth_provider.get_authorization_url.return_value = "http://oauth.provider/auth"
mock_get_providers.return_value = {"notion": mock_oauth_provider}
from models.account import Account, AccountStatus
mock_account = MagicMock(spec=Account)
mock_account.id = "user_123"
mock_account.status = AccountStatus.ACTIVE
mock_account.is_admin_or_owner = True
mock_account.current_tenant.current_role = "owner"
mock_libs_user.return_value = mock_account
mock_flask_user.return_value = mock_account
# also patch current_account_with_tenant
with patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, MagicMock())):
with app.test_request_context("/console/api/oauth/data-source/notion", method="GET"):
proxy_mock = LocalProxy(lambda: mock_account)
with patch("libs.login.current_user", proxy_mock):
api_instance = OAuthDataSource()
response = api_instance.get("notion")
assert response[0]["data"] == "http://oauth.provider/auth"
assert response[1] == 200
mock_oauth_provider.get_authorization_url.assert_called_once()
@patch("controllers.console.auth.data_source_oauth.get_oauth_providers")
@patch("flask_login.current_user")
@patch("libs.login.check_csrf_token")
@patch("controllers.console.wraps.db")
def test_get_oauth_url_invalid_provider(self, mock_db, mock_csrf, mock_flask_user, mock_get_providers, app):
mock_get_providers.return_value = {"notion": MagicMock()}
from models.account import Account, AccountStatus
mock_account = MagicMock(spec=Account)
mock_account.id = "user_123"
mock_account.status = AccountStatus.ACTIVE
mock_account.is_admin_or_owner = True
mock_account.current_tenant.current_role = "owner"
with patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, MagicMock())):
with app.test_request_context("/console/api/oauth/data-source/unknown_provider", method="GET"):
proxy_mock = LocalProxy(lambda: mock_account)
with patch("libs.login.current_user", proxy_mock):
api_instance = OAuthDataSource()
response = api_instance.get("unknown_provider")
assert response[0]["error"] == "Invalid provider"
assert response[1] == 400
class TestOAuthDataSourceCallback:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
return app
@patch("controllers.console.auth.data_source_oauth.get_oauth_providers")
def test_oauth_callback_successful(self, mock_get_providers, app):
provider_mock = MagicMock()
mock_get_providers.return_value = {"notion": provider_mock}
with app.test_request_context("/console/api/oauth/data-source/notion/callback?code=mock_code", method="GET"):
api_instance = OAuthDataSourceCallback()
response = api_instance.get("notion")
assert response.status_code == 302
assert "code=mock_code" in response.location
@patch("controllers.console.auth.data_source_oauth.get_oauth_providers")
def test_oauth_callback_missing_code(self, mock_get_providers, app):
provider_mock = MagicMock()
mock_get_providers.return_value = {"notion": provider_mock}
with app.test_request_context("/console/api/oauth/data-source/notion/callback", method="GET"):
api_instance = OAuthDataSourceCallback()
response = api_instance.get("notion")
assert response.status_code == 302
assert "error=Access denied" in response.location
@patch("controllers.console.auth.data_source_oauth.get_oauth_providers")
def test_oauth_callback_invalid_provider(self, mock_get_providers, app):
mock_get_providers.return_value = {"notion": MagicMock()}
with app.test_request_context("/console/api/oauth/data-source/invalid/callback?code=mock_code", method="GET"):
api_instance = OAuthDataSourceCallback()
response = api_instance.get("invalid")
assert response[0]["error"] == "Invalid provider"
assert response[1] == 400
class TestOAuthDataSourceBinding:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
return app
@patch("controllers.console.auth.data_source_oauth.get_oauth_providers")
def test_get_binding_successful(self, mock_get_providers, app):
mock_provider = MagicMock()
mock_provider.get_access_token.return_value = None
mock_get_providers.return_value = {"notion": mock_provider}
with app.test_request_context("/console/api/oauth/data-source/notion/binding?code=auth_code_123", method="GET"):
api_instance = OAuthDataSourceBinding()
response = api_instance.get("notion")
assert response[0]["result"] == "success"
assert response[1] == 200
mock_provider.get_access_token.assert_called_once_with("auth_code_123")
@patch("controllers.console.auth.data_source_oauth.get_oauth_providers")
def test_get_binding_missing_code(self, mock_get_providers, app):
mock_get_providers.return_value = {"notion": MagicMock()}
with app.test_request_context("/console/api/oauth/data-source/notion/binding?code=", method="GET"):
api_instance = OAuthDataSourceBinding()
response = api_instance.get("notion")
assert response[0]["error"] == "Invalid code"
assert response[1] == 400
class TestOAuthDataSourceSync:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
return app
@patch("controllers.console.auth.data_source_oauth.get_oauth_providers")
@patch("libs.login.check_csrf_token")
@patch("controllers.console.wraps.db")
def test_sync_successful(self, mock_db, mock_csrf, mock_get_providers, app):
mock_provider = MagicMock()
mock_provider.sync_data_source.return_value = None
mock_get_providers.return_value = {"notion": mock_provider}
from models.account import Account, AccountStatus
mock_account = MagicMock(spec=Account)
mock_account.id = "user_123"
mock_account.status = AccountStatus.ACTIVE
mock_account.is_admin_or_owner = True
mock_account.current_tenant.current_role = "owner"
with patch("controllers.console.wraps.current_account_with_tenant", return_value=(mock_account, MagicMock())):
with app.test_request_context("/console/api/oauth/data-source/notion/binding_123/sync", method="GET"):
proxy_mock = LocalProxy(lambda: mock_account)
with patch("libs.login.current_user", proxy_mock):
api_instance = OAuthDataSourceSync()
# The route pattern uses <uuid:binding_id>, so we just pass a string for unit testing
response = api_instance.get("notion", "binding_123")
assert response[0]["result"] == "success"
assert response[1] == 200
mock_provider.sync_data_source.assert_called_once_with("binding_123")

View File

@ -1,417 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
from flask import Flask
from werkzeug.exceptions import BadRequest, NotFound
from controllers.console.auth.oauth_server import (
OAuthServerAppApi,
OAuthServerUserAccountApi,
OAuthServerUserAuthorizeApi,
OAuthServerUserTokenApi,
)
class TestOAuthServerAppApi:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
return app
@pytest.fixture
def mock_oauth_provider_app(self):
from models.model import OAuthProviderApp
oauth_app = MagicMock(spec=OAuthProviderApp)
oauth_app.client_id = "test_client_id"
oauth_app.redirect_uris = ["http://localhost/callback"]
oauth_app.app_icon = "icon_url"
oauth_app.app_label = "Test App"
oauth_app.scope = "read,write"
return oauth_app
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_successful_post(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider",
method="POST",
json={"client_id": "test_client_id", "redirect_uri": "http://localhost/callback"},
):
api_instance = OAuthServerAppApi()
response = api_instance.post()
assert response["app_icon"] == "icon_url"
assert response["app_label"] == "Test App"
assert response["scope"] == "read,write"
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_invalid_redirect_uri(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider",
method="POST",
json={"client_id": "test_client_id", "redirect_uri": "http://invalid/callback"},
):
api_instance = OAuthServerAppApi()
with pytest.raises(BadRequest, match="redirect_uri is invalid"):
api_instance.post()
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_invalid_client_id(self, mock_get_app, mock_db, app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = None
with app.test_request_context(
"/oauth/provider",
method="POST",
json={"client_id": "test_invalid_client_id", "redirect_uri": "http://localhost/callback"},
):
api_instance = OAuthServerAppApi()
with pytest.raises(NotFound, match="client_id is invalid"):
api_instance.post()
class TestOAuthServerUserAuthorizeApi:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
return app
@pytest.fixture
def mock_oauth_provider_app(self):
oauth_app = MagicMock()
oauth_app.client_id = "test_client_id"
return oauth_app
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
@patch("controllers.console.auth.oauth_server.current_account_with_tenant")
@patch("controllers.console.wraps.current_account_with_tenant")
@patch("controllers.console.auth.oauth_server.OAuthServerService.sign_oauth_authorization_code")
@patch("libs.login.check_csrf_token")
def test_successful_authorize(
self, mock_csrf, mock_sign, mock_wrap_current, mock_current, mock_get_app, mock_db, app, mock_oauth_provider_app
):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
mock_account = MagicMock()
mock_account.id = "user_123"
from models.account import AccountStatus
mock_account.status = AccountStatus.ACTIVE
mock_current.return_value = (mock_account, MagicMock())
mock_wrap_current.return_value = (mock_account, MagicMock())
mock_sign.return_value = "auth_code_123"
with app.test_request_context("/oauth/provider/authorize", method="POST", json={"client_id": "test_client_id"}):
with patch("libs.login.current_user", mock_account):
api_instance = OAuthServerUserAuthorizeApi()
response = api_instance.post()
assert response["code"] == "auth_code_123"
mock_sign.assert_called_once_with("test_client_id", "user_123")
class TestOAuthServerUserTokenApi:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
return app
@pytest.fixture
def mock_oauth_provider_app(self):
from models.model import OAuthProviderApp
oauth_app = MagicMock(spec=OAuthProviderApp)
oauth_app.client_id = "test_client_id"
oauth_app.client_secret = "test_secret"
oauth_app.redirect_uris = ["http://localhost/callback"]
return oauth_app
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
@patch("controllers.console.auth.oauth_server.OAuthServerService.sign_oauth_access_token")
def test_authorization_code_grant(self, mock_sign, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
mock_sign.return_value = ("access_123", "refresh_123")
with app.test_request_context(
"/oauth/provider/token",
method="POST",
json={
"client_id": "test_client_id",
"grant_type": "authorization_code",
"code": "auth_code",
"client_secret": "test_secret",
"redirect_uri": "http://localhost/callback",
},
):
api_instance = OAuthServerUserTokenApi()
response = api_instance.post()
assert response["access_token"] == "access_123"
assert response["refresh_token"] == "refresh_123"
assert response["token_type"] == "Bearer"
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_authorization_code_grant_missing_code(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider/token",
method="POST",
json={
"client_id": "test_client_id",
"grant_type": "authorization_code",
"client_secret": "test_secret",
"redirect_uri": "http://localhost/callback",
},
):
api_instance = OAuthServerUserTokenApi()
with pytest.raises(BadRequest, match="code is required"):
api_instance.post()
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_authorization_code_grant_invalid_secret(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider/token",
method="POST",
json={
"client_id": "test_client_id",
"grant_type": "authorization_code",
"code": "auth_code",
"client_secret": "invalid_secret",
"redirect_uri": "http://localhost/callback",
},
):
api_instance = OAuthServerUserTokenApi()
with pytest.raises(BadRequest, match="client_secret is invalid"):
api_instance.post()
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_authorization_code_grant_invalid_redirect_uri(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider/token",
method="POST",
json={
"client_id": "test_client_id",
"grant_type": "authorization_code",
"code": "auth_code",
"client_secret": "test_secret",
"redirect_uri": "http://invalid/callback",
},
):
api_instance = OAuthServerUserTokenApi()
with pytest.raises(BadRequest, match="redirect_uri is invalid"):
api_instance.post()
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
@patch("controllers.console.auth.oauth_server.OAuthServerService.sign_oauth_access_token")
def test_refresh_token_grant(self, mock_sign, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
mock_sign.return_value = ("new_access", "new_refresh")
with app.test_request_context(
"/oauth/provider/token",
method="POST",
json={"client_id": "test_client_id", "grant_type": "refresh_token", "refresh_token": "refresh_123"},
):
api_instance = OAuthServerUserTokenApi()
response = api_instance.post()
assert response["access_token"] == "new_access"
assert response["refresh_token"] == "new_refresh"
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_refresh_token_grant_missing_token(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider/token",
method="POST",
json={
"client_id": "test_client_id",
"grant_type": "refresh_token",
},
):
api_instance = OAuthServerUserTokenApi()
with pytest.raises(BadRequest, match="refresh_token is required"):
api_instance.post()
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_invalid_grant_type(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider/token",
method="POST",
json={
"client_id": "test_client_id",
"grant_type": "invalid_grant",
},
):
api_instance = OAuthServerUserTokenApi()
with pytest.raises(BadRequest, match="invalid grant_type"):
api_instance.post()
class TestOAuthServerUserAccountApi:
@pytest.fixture
def app(self):
app = Flask(__name__)
app.config["TESTING"] = True
return app
@pytest.fixture
def mock_oauth_provider_app(self):
from models.model import OAuthProviderApp
oauth_app = MagicMock(spec=OAuthProviderApp)
oauth_app.client_id = "test_client_id"
return oauth_app
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
@patch("controllers.console.auth.oauth_server.OAuthServerService.validate_oauth_access_token")
def test_successful_account_retrieval(self, mock_validate, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
mock_account = MagicMock()
mock_account.name = "Test User"
mock_account.email = "test@example.com"
mock_account.avatar = "avatar_url"
mock_account.interface_language = "en-US"
mock_account.timezone = "UTC"
mock_validate.return_value = mock_account
with app.test_request_context(
"/oauth/provider/account",
method="POST",
json={"client_id": "test_client_id"},
headers={"Authorization": "Bearer valid_access_token"},
):
api_instance = OAuthServerUserAccountApi()
response = api_instance.post()
assert response["name"] == "Test User"
assert response["email"] == "test@example.com"
assert response["avatar"] == "avatar_url"
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_missing_authorization_header(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context("/oauth/provider/account", method="POST", json={"client_id": "test_client_id"}):
api_instance = OAuthServerUserAccountApi()
response = api_instance.post()
assert response.status_code == 401
assert response.json["error"] == "Authorization header is required"
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_invalid_authorization_header_format(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider/account",
method="POST",
json={"client_id": "test_client_id"},
headers={"Authorization": "InvalidFormat"},
):
api_instance = OAuthServerUserAccountApi()
response = api_instance.post()
assert response.status_code == 401
assert response.json["error"] == "Invalid Authorization header format"
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_invalid_token_type(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider/account",
method="POST",
json={"client_id": "test_client_id"},
headers={"Authorization": "Basic something"},
):
api_instance = OAuthServerUserAccountApi()
response = api_instance.post()
assert response.status_code == 401
assert response.json["error"] == "token_type is invalid"
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
def test_missing_access_token(self, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
with app.test_request_context(
"/oauth/provider/account",
method="POST",
json={"client_id": "test_client_id"},
headers={"Authorization": "Bearer "},
):
api_instance = OAuthServerUserAccountApi()
response = api_instance.post()
assert response.status_code == 401
assert response.json["error"] == "Invalid Authorization header format"
@patch("controllers.console.wraps.db")
@patch("controllers.console.auth.oauth_server.OAuthServerService.get_oauth_provider_app")
@patch("controllers.console.auth.oauth_server.OAuthServerService.validate_oauth_access_token")
def test_invalid_access_token(self, mock_validate, mock_get_app, mock_db, app, mock_oauth_provider_app):
mock_db.session.query.return_value.first.return_value = MagicMock()
mock_get_app.return_value = mock_oauth_provider_app
mock_validate.return_value = None
with app.test_request_context(
"/oauth/provider/account",
method="POST",
json={"client_id": "test_client_id"},
headers={"Authorization": "Bearer invalid_token"},
):
api_instance = OAuthServerUserAccountApi()
response = api_instance.post()
assert response.status_code == 401
assert response.json["error"] == "access_token or client_id is invalid"