This commit is contained in:
wangxiaolei 2026-03-24 06:23:22 +00:00 committed by GitHub
commit f4c9cb4e5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 211 additions and 42 deletions

View File

@ -1,3 +1,5 @@
import logging
from flask_restx import Resource, fields, marshal_with
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
@ -17,12 +19,14 @@ from fields.app_fields import (
)
from libs.login import current_account_with_tenant, login_required
from models.model import App
from services.app_dsl_service import AppDslService, ImportStatus
from services.app_dsl_service import AppDslService, Import, ImportStatus
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from .. import console_ns
logger = logging.getLogger(__name__)
# Register models for flask_restx to avoid dict type issues in Swagger
# Register base model first
leaked_dependency_model = console_ns.model("LeakedDependency", leaked_dependency_fields)
@ -70,28 +74,29 @@ class AppImportApi(Resource):
current_user, _ = current_account_with_tenant()
args = AppImportPayload.model_validate(console_ns.payload)
# Create service with session
with Session(db.engine) as session:
import_service = AppDslService(session)
# Import app
account = current_user
result = import_service.import_app(
account=account,
import_mode=args.mode,
yaml_content=args.yaml_content,
yaml_url=args.yaml_url,
name=args.name,
description=args.description,
icon_type=args.icon_type,
icon=args.icon,
icon_background=args.icon_background,
app_id=args.app_id,
)
session.commit()
try:
# Create service with session and transaction
with Session(bind=db.engine, expire_on_commit=False) as session, session.begin():
import_service = AppDslService(session)
result = import_service.import_app(
account=current_user,
import_mode=args.mode,
yaml_content=args.yaml_content,
yaml_url=args.yaml_url,
name=args.name,
description=args.description,
icon_type=args.icon_type,
icon=args.icon,
icon_background=args.icon_background,
app_id=args.app_id,
)
except Exception as e:
logger.exception("Failed to import app")
result = Import(id="", status=ImportStatus.FAILED, error=str(e))
if result.app_id and FeatureService.get_system_features().webapp_auth.enabled:
# update web app setting as private
EnterpriseService.WebAppAuth.update_app_access_mode(result.app_id, "private")
# Return appropriate status code based on result
status = result.status
if status == ImportStatus.FAILED:
return result.model_dump(mode="json"), 400
@ -111,13 +116,14 @@ class AppImportConfirmApi(Resource):
# Check user role first
current_user, _ = current_account_with_tenant()
# Create service with session
with Session(db.engine) as session:
import_service = AppDslService(session)
# Confirm import
account = current_user
result = import_service.confirm_import(import_id=import_id, account=account)
session.commit()
try:
# Create service with session and transaction
with Session(bind=db.engine, expire_on_commit=False) as session, session.begin():
import_service = AppDslService(session)
result = import_service.confirm_import(import_id=import_id, account=current_user)
except Exception as e:
logger.exception("Failed to confirm import")
result = Import(id=import_id, status=ImportStatus.FAILED, error=str(e))
# Return appropriate status code based on result
if result.status == ImportStatus.FAILED:

View File

@ -319,14 +319,9 @@ class AppDslService:
status=ImportStatus.FAILED,
error=f"Invalid YAML format: {str(e)}",
)
except Exception as e:
except Exception:
logger.exception("Failed to import app")
return Import(
id=import_id,
status=ImportStatus.FAILED,
error=str(e),
)
raise
def confirm_import(self, *, import_id: str, account: Account) -> Import:
"""
@ -381,13 +376,9 @@ class AppDslService:
imported_dsl_version=data.get("version", "0.1.0"),
)
except Exception as e:
except Exception:
logger.exception("Error confirming import")
return Import(
id=import_id,
status=ImportStatus.FAILED,
error=str(e),
)
raise
def check_dependencies(
self,
@ -471,7 +462,7 @@ class AppDslService:
app.updated_by = account.id
self._session.add(app)
self._session.commit()
self._session.flush()
app_was_created.send(app, account=account)
# save dependencies

View File

@ -4,7 +4,9 @@ from unittest.mock import MagicMock, patch
import pytest
import yaml
from faker import Faker
from sqlalchemy.orm import Session
from extensions.ext_database import db
from models.model import App, AppModelConfig
from services.account_service import AccountService, TenantService
from services.app_dsl_service import AppDslService, ImportMode, ImportStatus
@ -432,3 +434,173 @@ class TestAppDslService:
# Verify dependencies service was called
mock_external_service_dependencies["dependencies_service"].get_leaked_dependencies.assert_called_once()
def test_import_app_workflow_sync_failure_rollback(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test that app is not created when workflow sync fails during import.
This verifies transaction rollback behavior.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create workflow YAML content
yaml_content = self._create_simple_yaml_content(app_name="Workflow Test", app_mode="workflow")
yaml_data = yaml.safe_load(yaml_content)
yaml_data["workflow"] = {
"environment_variables": [],
"conversation_variables": [],
"graph": {"nodes": [], "edges": []},
}
yaml_content = yaml.dump(yaml_data, allow_unicode=True)
# Mock workflow sync to raise exception (simulating failure)
mock_external_service_dependencies["workflow_service"].return_value.sync_draft_workflow.side_effect = Exception(
"Workflow sync failed"
)
# Run import in a transaction to test rollback behavior
def _import_with_transaction() -> None:
with Session(bind=db.engine, expire_on_commit=False) as transaction_session, transaction_session.begin():
dsl_service = AppDslService(transaction_session)
dsl_service.import_app(
account=account,
import_mode=ImportMode.YAML_CONTENT,
yaml_content=yaml_content,
)
with pytest.raises(Exception, match="Workflow sync failed"):
_import_with_transaction()
# After rollback, verify no app was created in database using a fresh session
with Session(bind=db.engine) as verify_session:
apps_count = (
verify_session.query(App)
.where(App.tenant_id == account.current_tenant_id)
.where(App.name == "Workflow Test")
.count()
)
assert apps_count == 0 # No new app should be in database
# Verify workflow sync was called
mock_external_service_dependencies["workflow_service"].return_value.sync_draft_workflow.assert_called_once()
def test_import_app_variable_deletion_failure_rollback(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test that app is not created when variable deletion fails during import.
This verifies transaction rollback behavior.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create workflow YAML content
yaml_content = self._create_simple_yaml_content(app_name="Variable Test", app_mode="workflow")
yaml_data = yaml.safe_load(yaml_content)
yaml_data["workflow"] = {
"environment_variables": [],
"conversation_variables": [],
"graph": {"nodes": [], "edges": []},
}
yaml_content = yaml.dump(yaml_data, allow_unicode=True)
# Mock variable deletion to raise exception (simulating failure)
draft_variable_service = mock_external_service_dependencies["draft_variable_service"]
draft_variable_service.return_value.delete_workflow_variables.side_effect = Exception(
"Variable deletion failed"
)
# Run import in a transaction to test rollback behavior
def _import_with_transaction() -> None:
with Session(bind=db.engine, expire_on_commit=False) as transaction_session, transaction_session.begin():
dsl_service = AppDslService(transaction_session)
dsl_service.import_app(
account=account,
import_mode=ImportMode.YAML_CONTENT,
yaml_content=yaml_content,
)
with pytest.raises(Exception, match="Variable deletion failed"):
_import_with_transaction()
# After rollback, verify no app was created in database using a fresh session
with Session(bind=db.engine) as verify_session:
apps_count = (
verify_session.query(App)
.where(App.tenant_id == account.current_tenant_id)
.where(App.name == "Variable Test")
.count()
)
assert apps_count == 0 # No new app should be in database
# Verify variable deletion was called
mock_external_service_dependencies[
"draft_variable_service"
].return_value.delete_workflow_variables.assert_called_once()
def test_confirm_import_workflow_sync_failure_rollback(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test that app is not modified when workflow sync fails during confirm_import.
This verifies transaction rollback behavior for confirm_import.
"""
fake = Faker()
app, account = self._create_test_app_and_account(db_session_with_containers, mock_external_service_dependencies)
# Create pending import data in Redis
yaml_content = self._create_simple_yaml_content(app_name="Confirm Test", app_mode="workflow")
yaml_data = yaml.safe_load(yaml_content)
yaml_data["workflow"] = {
"environment_variables": [],
"conversation_variables": [],
"graph": {"nodes": [], "edges": []},
}
yaml_content = yaml.dump(yaml_data, allow_unicode=True)
import_id = "test-import-id"
pending_data_json = json.dumps(
{
"import_mode": "yaml-content",
"yaml_content": yaml_content,
"name": "Confirm Test",
"description": None,
"icon_type": None,
"icon": None,
"icon_background": None,
"app_id": app.id, # Overwrite existing app
}
)
mock_external_service_dependencies["redis_client"].get.return_value = pending_data_json
# Store original app name for verification
original_app_name = app.name
original_app_description = app.description
# Mock workflow sync to raise exception (simulating failure)
mock_external_service_dependencies["workflow_service"].return_value.sync_draft_workflow.side_effect = Exception(
"Workflow sync failed during confirm"
)
# Run confirm_import in a transaction to test rollback behavior
def _confirm_with_transaction() -> None:
with Session(bind=db.engine, expire_on_commit=False) as transaction_session, transaction_session.begin():
dsl_service = AppDslService(transaction_session)
dsl_service.confirm_import(import_id=import_id, account=account)
with pytest.raises(Exception, match="Workflow sync failed during confirm"):
_confirm_with_transaction()
# Verify app was not modified (transaction was rolled back)
db_session_with_containers.refresh(app)
assert app.name == original_app_name
assert app.description == original_app_description
# Verify workflow sync was called
mock_external_service_dependencies["workflow_service"].return_value.sync_draft_workflow.assert_called_once()
# Verify Redis delete was NOT called (transaction rolled back)
mock_external_service_dependencies["redis_client"].delete.assert_not_called()