diff --git a/api/controllers/console/app/app_import.py b/api/controllers/console/app/app_import.py index 362291d779..092b346975 100644 --- a/api/controllers/console/app/app_import.py +++ b/api/controllers/console/app/app_import.py @@ -51,7 +51,7 @@ class AppImportPayload(BaseModel): app_id: str | None = Field(None) -class AppImportBundlePayload(BaseModel): +class AppImportBundleConfirmPayload(BaseModel): name: str | None = None description: str | None = None icon_type: str | None = None @@ -149,15 +149,38 @@ class AppImportCheckDependenciesApi(Resource): return result.model_dump(mode="json"), 200 -@console_ns.route("/apps/imports-bundle") -class AppImportBundleApi(Resource): +@console_ns.route("/apps/imports-bundle/prepare") +class AppImportBundlePrepareApi(Resource): + """Step 1: Get upload URL for bundle import.""" + + @setup_required + @login_required + @account_initialization_required + @edit_permission_required + def post(self): + from services.app_bundle_service import AppBundleService + + current_user, current_tenant_id = current_account_with_tenant() + + result = AppBundleService.prepare_import( + tenant_id=current_tenant_id, + account_id=current_user.id, + ) + + return {"import_id": result.import_id, "upload_url": result.upload_url}, 200 + + +@console_ns.route("/apps/imports-bundle//confirm") +class AppImportBundleConfirmApi(Resource): + """Step 2: Confirm bundle import after upload.""" + @setup_required @login_required @account_initialization_required @marshal_with(app_import_model) @cloud_edition_billing_resource_check("apps") @edit_permission_required - def post(self): + def post(self, import_id: str): from flask import request from core.app.entities.app_bundle_entities import BundleFormatError @@ -165,22 +188,12 @@ class AppImportBundleApi(Resource): current_user, _ = current_account_with_tenant() - if "file" not in request.files: - return {"error": "No file provided"}, 400 - - file = request.files["file"] - if not file.filename or not file.filename.endswith(".zip"): - return {"error": "Invalid file format, expected .zip"}, 400 - - zip_bytes = file.read() - - form_data = request.form.to_dict() - args = AppImportBundlePayload.model_validate(form_data) + args = AppImportBundleConfirmPayload.model_validate(request.get_json() or {}) try: - result = AppBundleService.import_bundle( + result = AppBundleService.confirm_import( + import_id=import_id, account=current_user, - zip_bytes=zip_bytes, name=args.name, description=args.description, icon_type=args.icon_type, diff --git a/api/core/app/entities/app_bundle_entities.py b/api/core/app/entities/app_bundle_entities.py index 4ed7807346..8566fd2bb1 100644 --- a/api/core/app/entities/app_bundle_entities.py +++ b/api/core/app/entities/app_bundle_entities.py @@ -1,12 +1,17 @@ from __future__ import annotations import re +from datetime import UTC, datetime -from pydantic import BaseModel, Field +from pydantic import BaseModel, ConfigDict, Field + +from core.app.entities.app_asset_entities import AppAssetFileTree # Constants BUNDLE_DSL_FILENAME_PATTERN = re.compile(r"^[^/]+\.ya?ml$") BUNDLE_MAX_SIZE = 50 * 1024 * 1024 # 50MB +MANIFEST_FILENAME = "manifest.json" +MANIFEST_SCHEMA_VERSION = "1.0" # Exceptions @@ -22,21 +27,70 @@ class ZipSecurityError(Exception): pass -# Entities +# Manifest DTOs +class ManifestFileEntry(BaseModel): + """Maps node_id to file path in the bundle.""" + + model_config = ConfigDict(extra="forbid") + + node_id: str + path: str + + +class ManifestIntegrity(BaseModel): + """Basic integrity check fields.""" + + model_config = ConfigDict(extra="forbid") + + file_count: int + + +class ManifestAppAssets(BaseModel): + """App assets section containing the full tree.""" + + model_config = ConfigDict(extra="forbid") + + tree: AppAssetFileTree + + +class BundleManifest(BaseModel): + """ + Bundle manifest for app asset import/export. + + Schema version 1.0: + - dsl_filename: DSL file name in bundle root (e.g. "my_app.yml") + - tree: Full AppAssetFileTree (files + folders) for 100% restoration including node IDs + - files: Explicit node_id -> path mapping for file nodes only + - integrity: Basic file_count validation + """ + + model_config = ConfigDict(extra="forbid") + + schema_version: str = Field(default=MANIFEST_SCHEMA_VERSION) + generated_at: datetime = Field(default_factory=lambda: datetime.now(tz=UTC)) + dsl_filename: str = Field(description="DSL file name in bundle root") + app_assets: ManifestAppAssets + files: list[ManifestFileEntry] + integrity: ManifestIntegrity + + @property + def assets_prefix(self) -> str: + """Assets directory name (DSL filename without extension).""" + return self.dsl_filename.rsplit(".", 1)[0] + + @classmethod + def from_tree(cls, tree: AppAssetFileTree, dsl_filename: str) -> BundleManifest: + """Build manifest from an AppAssetFileTree.""" + files = [ManifestFileEntry(node_id=n.id, path=tree.get_path(n.id)) for n in tree.walk_files()] + return cls( + dsl_filename=dsl_filename, + app_assets=ManifestAppAssets(tree=tree), + files=files, + integrity=ManifestIntegrity(file_count=len(files)), + ) + + +# Export result class BundleExportResult(BaseModel): download_url: str = Field(description="Temporary download URL for the ZIP") filename: str = Field(description="Suggested filename for the ZIP") - - -class SourceFileEntry(BaseModel): - path: str = Field(description="File path within the ZIP") - node_id: str = Field(description="Node ID in the asset tree") - - -class ExtractedFile(BaseModel): - path: str = Field(description="Relative path of the extracted file") - content: bytes = Field(description="File content as bytes") - - -class ExtractedFolder(BaseModel): - path: str = Field(description="Relative path of the extracted folder") diff --git a/api/core/app_assets/storage.py b/api/core/app_assets/storage.py index ae0f137898..50e9d5dac0 100644 --- a/api/core/app_assets/storage.py +++ b/api/core/app_assets/storage.py @@ -152,6 +152,20 @@ class _BundleExportZipAssetPath(SignedAssetPath): return [self.asset_type, self.tenant_id, self.app_id, self.resource_id] +@dataclass(frozen=True) +class BundleImportZipPath: + """Path for temporary import zip files. Not signed, uses direct presign URLs only.""" + + tenant_id: str + import_id: str + + def __post_init__(self) -> None: + _require_uuid(self.tenant_id, "tenant_id") + + def get_storage_key(self) -> str: + return f"{_ASSET_BASE}/{self.tenant_id}/imports/{self.import_id}.zip" + + class AssetPath: @staticmethod def draft(tenant_id: str, app_id: str, node_id: str) -> SignedAssetPath: @@ -177,6 +191,10 @@ class AssetPath: def bundle_export_zip(tenant_id: str, app_id: str, export_id: str) -> SignedAssetPath: return _BundleExportZipAssetPath(tenant_id=tenant_id, app_id=app_id, resource_id=export_id) + @staticmethod + def bundle_import_zip(tenant_id: str, import_id: str) -> BundleImportZipPath: + return BundleImportZipPath(tenant_id=tenant_id, import_id=import_id) + @staticmethod def from_components( asset_type: str, @@ -386,6 +404,23 @@ class AppAssetStorage: return self._generate_signed_proxy_upload_url(asset_path, expires_in) + def get_import_upload_url(self, path: BundleImportZipPath, expires_in: int = 3600) -> str: + """Get upload URL for import zip (direct presign, no proxy fallback).""" + return self._storage.get_upload_url(path.get_storage_key(), expires_in) + + def get_import_download_url(self, path: BundleImportZipPath, expires_in: int = 3600) -> str: + """Get download URL for import zip (direct presign, no proxy fallback).""" + return self._storage.get_download_url(path.get_storage_key(), expires_in) + + def delete_import_zip(self, path: BundleImportZipPath) -> None: + """Delete import zip file. Errors are logged but not raised.""" + try: + self._storage.delete(path.get_storage_key()) + except Exception: + import logging + + logging.getLogger(__name__).debug("Failed to delete import zip: %s", path.get_storage_key()) + def _generate_signed_proxy_download_url(self, asset_path: SignedAssetPath, expires_in: int) -> str: expires_in = min(expires_in, dify_config.FILES_ACCESS_TIMEOUT) expires_at = int(time.time()) + max(expires_in, 1) diff --git a/api/core/app_bundle/__init__.py b/api/core/app_bundle/__init__.py index 5c1c22f206..7fb33b2b6d 100644 --- a/api/core/app_bundle/__init__.py +++ b/api/core/app_bundle/__init__.py @@ -1,5 +1 @@ -from .source_zip_extractor import SourceZipExtractor - -__all__ = [ - "SourceZipExtractor", -] +# App bundle utilities - manifest-driven import/export handled by AppBundleService diff --git a/api/core/app_bundle/source_zip_extractor.py b/api/core/app_bundle/source_zip_extractor.py deleted file mode 100644 index 7d489015b5..0000000000 --- a/api/core/app_bundle/source_zip_extractor.py +++ /dev/null @@ -1,98 +0,0 @@ -from __future__ import annotations - -import io -import zipfile -from typing import TYPE_CHECKING -from uuid import uuid4 - -from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode -from core.app.entities.app_bundle_entities import ExtractedFile, ExtractedFolder, ZipSecurityError -from core.app_assets.storage import AssetPath - -if TYPE_CHECKING: - from core.app_assets.storage import AppAssetStorage - - -class SourceZipExtractor: - def __init__(self, storage: AppAssetStorage) -> None: - self._storage = storage - - def extract_entries( - self, zip_bytes: bytes, *, expected_prefix: str - ) -> tuple[list[ExtractedFolder], list[ExtractedFile]]: - folders: list[ExtractedFolder] = [] - files: list[ExtractedFile] = [] - - with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zf: - for info in zf.infolist(): - name = info.filename - self._validate_path(name) - - if not name.startswith(expected_prefix): - continue - - relative_path = name[len(expected_prefix) :].lstrip("/") - if not relative_path: - continue - - if info.is_dir(): - folders.append(ExtractedFolder(path=relative_path.rstrip("/"))) - else: - content = zf.read(info) - files.append(ExtractedFile(path=relative_path, content=content)) - - return folders, files - - def build_tree_and_save( - self, - folders: list[ExtractedFolder], - files: list[ExtractedFile], - tenant_id: str, - app_id: str, - ) -> AppAssetFileTree: - tree = AppAssetFileTree() - path_to_node_id: dict[str, str] = {} - - all_folder_paths = {f.path for f in folders} - for file in files: - self._ensure_parent_folders(file.path, all_folder_paths) - - sorted_folders = sorted(all_folder_paths, key=lambda p: p.count("/")) - for folder_path in sorted_folders: - node_id = str(uuid4()) - name = folder_path.rsplit("/", 1)[-1] - parent_path = folder_path.rsplit("/", 1)[0] if "/" in folder_path else None - parent_id = path_to_node_id.get(parent_path) if parent_path else None - - node = AppAssetNode.create_folder(node_id, name, parent_id) - tree.add(node) - path_to_node_id[folder_path] = node_id - - sorted_files = sorted(files, key=lambda f: f.path) - for file in sorted_files: - node_id = str(uuid4()) - name = file.path.rsplit("/", 1)[-1] - parent_path = file.path.rsplit("/", 1)[0] if "/" in file.path else None - parent_id = path_to_node_id.get(parent_path) if parent_path else None - - node = AppAssetNode.create_file(node_id, name, parent_id, len(file.content)) - tree.add(node) - - asset_path = AssetPath.draft(tenant_id, app_id, node_id) - self._storage.save(asset_path, file.content) - - return tree - - def _validate_path(self, path: str) -> None: - if ".." in path: - raise ZipSecurityError(f"Path traversal detected: {path}") - if path.startswith("/"): - raise ZipSecurityError(f"Absolute path detected: {path}") - if "\\" in path: - raise ZipSecurityError(f"Backslash in path: {path}") - - def _ensure_parent_folders(self, file_path: str, folder_set: set[str]) -> None: - parts = file_path.split("/")[:-1] - for i in range(1, len(parts) + 1): - parent = "/".join(parts[:i]) - folder_set.add(parent) diff --git a/api/core/skill/skill_manager.py b/api/core/skill/skill_manager.py index 29be138501..e5d3880ac2 100644 --- a/api/core/skill/skill_manager.py +++ b/api/core/skill/skill_manager.py @@ -2,21 +2,40 @@ import logging from core.app_assets.storage import AssetPath from core.skill.entities.skill_bundle import SkillBundle +from extensions.ext_redis import redis_client from services.app_asset_service import AppAssetService logger = logging.getLogger(__name__) class SkillManager: + _CACHE_KEY_PREFIX = "skill_bundle" + _CACHE_TTL_SECONDS = 60 * 60 * 24 + + @staticmethod + def get_cache_key( + tenant_id: str, + app_id: str, + assets_id: str, + ) -> str: + return f"{SkillManager._CACHE_KEY_PREFIX}:{tenant_id}:{app_id}:{assets_id}" + @staticmethod def load_bundle( tenant_id: str, app_id: str, assets_id: str, ) -> SkillBundle: + cache_key = SkillManager.get_cache_key(tenant_id, app_id, assets_id) + data = redis_client.get(cache_key) + if data: + return SkillBundle.model_validate_json(data) + asset_path = AssetPath.skill_bundle(tenant_id, app_id, assets_id) data = AppAssetService.get_storage().load(asset_path) - return SkillBundle.model_validate_json(data) + bundle = SkillBundle.model_validate_json(data) + redis_client.setex(cache_key, SkillManager._CACHE_TTL_SECONDS, bundle.model_dump_json(indent=2).encode("utf-8")) + return bundle @staticmethod def save_bundle( @@ -30,3 +49,5 @@ class SkillManager: asset_path, bundle.model_dump_json(indent=2).encode("utf-8"), ) + cache_key = SkillManager.get_cache_key(tenant_id, app_id, assets_id) + redis_client.delete(cache_key) diff --git a/api/core/virtual_environment/channel/queue_transport.py b/api/core/virtual_environment/channel/queue_transport.py index 7cf524316a..fa1114ca91 100644 --- a/api/core/virtual_environment/channel/queue_transport.py +++ b/api/core/virtual_environment/channel/queue_transport.py @@ -1,3 +1,5 @@ +from queue import Empty, Queue + from core.virtual_environment.channel.exec import TransportEOFError from core.virtual_environment.channel.transport import TransportReadCloser @@ -27,8 +29,6 @@ class QueueTransportReadCloser(TransportReadCloser): A write handler that writes data to a queue. """ - from queue import Queue - def __init__(self, queue: Queue[bytes | None]) -> None: self.queue = queue @@ -70,7 +70,6 @@ class QueueTransportReadCloser(TransportReadCloser): NEVER USE IT IN A MULTI-THREADED CONTEXT WITHOUT PROPER SYNCHRONIZATION. """ - from queue import Empty if n <= 0: return b"" diff --git a/api/core/virtual_environment/providers/docker_daemon_sandbox.py b/api/core/virtual_environment/providers/docker_daemon_sandbox.py index 2824856d09..52007591c0 100644 --- a/api/core/virtual_environment/providers/docker_daemon_sandbox.py +++ b/api/core/virtual_environment/providers/docker_daemon_sandbox.py @@ -148,7 +148,8 @@ class DockerDemuxer: to periodically check for errors and closed state instead of blocking forever. """ if self._error: - raise TransportEOFError(f"Demuxer error: {self._error}") from self._error + error = cast(BaseException, self._error) + raise TransportEOFError(f"Demuxer error: {error}") from error while True: try: @@ -163,7 +164,8 @@ class DockerDemuxer: if self._closed: raise TransportEOFError("Demuxer closed") if self._error: - raise TransportEOFError(f"Demuxer error: {self._error}") from self._error + error = cast(BaseException, self._error) + raise TransportEOFError(f"Demuxer error: {error}") from error # No error, continue waiting def close(self) -> None: @@ -292,6 +294,8 @@ class DockerDaemonEnvironment(VirtualEnvironment): @classmethod def validate(cls, options: Mapping[str, Any]) -> None: # Import Docker SDK lazily so it is loaded after gevent monkey-patching. + import docker.errors + import docker docker_sock = options.get(cls.OptionsKey.DOCKER_SOCK, cls._DEFAULT_DOCKER_SOCK) @@ -364,6 +368,7 @@ class DockerDaemonEnvironment(VirtualEnvironment): NOTE: I guess nobody will use more than 5 different docker sockets in practice.... """ import docker + return docker.DockerClient(base_url=docker_sock) @classmethod @@ -373,6 +378,7 @@ class DockerDaemonEnvironment(VirtualEnvironment): Get the Docker low-level API client. """ import docker + return docker.APIClient(base_url=docker_sock) def get_docker_sock(self) -> str: @@ -431,6 +437,12 @@ class DockerDaemonEnvironment(VirtualEnvironment): return self._container_path(path) def upload_file(self, path: str, content: BytesIO) -> None: + """Upload a file to the container. + + Files and intermediate directories are created with world-writable permissions + (0o777 for directories, 0o666 for files) to avoid permission issues when the container + runs as a non-root user but Docker's put_archive creates files as root. + """ container = self._get_container() normalized = PurePosixPath(path) @@ -442,6 +454,7 @@ class DockerDaemonEnvironment(VirtualEnvironment): with tarfile.open(fileobj=tar_stream, mode="w") as tar: tar_info = tarfile.TarInfo(name=file_name) tar_info.size = len(payload) + tar_info.mode = 0o666 tar.addfile(tar_info, BytesIO(payload)) tar_stream.seek(0) container.put_archive(parent_dir, tar_stream.read()) # pyright: ignore[reportUnknownMemberType] # @@ -454,8 +467,18 @@ class DockerDaemonEnvironment(VirtualEnvironment): payload = content.getvalue() tar_stream = BytesIO() with tarfile.open(fileobj=tar_stream, mode="w") as tar: + # Add intermediate directories with proper permissions + for i in range(len(relative_path.parts) - 1): + dir_path = PurePosixPath(*relative_path.parts[: i + 1]) + dir_info = tarfile.TarInfo(name=dir_path.as_posix() + "/") + dir_info.type = tarfile.DIRTYPE + dir_info.mode = 0o777 + tar.addfile(dir_info) + + # Add the file tar_info = tarfile.TarInfo(name=relative_path.as_posix()) tar_info.size = len(payload) + tar_info.mode = 0o666 tar.addfile(tar_info, BytesIO(payload)) tar_stream.seek(0) container.put_archive(self._working_dir, tar_stream.read()) # pyright: ignore[reportUnknownMemberType] # @@ -479,7 +502,7 @@ class DockerDaemonEnvironment(VirtualEnvironment): return BytesIO(extracted.read()) def list_files(self, directory_path: str, limit: int) -> Sequence[FileState]: - import docker + import docker.errors container = self._get_container() container_path = self._container_path(directory_path) @@ -525,7 +548,7 @@ class DockerDaemonEnvironment(VirtualEnvironment): pass def release_environment(self) -> None: - import docker + import docker.errors try: container = self._get_container() diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index d6f69b4346..f780f5a54f 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -1918,6 +1918,7 @@ class LLMNode(Node[LLMNodeData]): ) -> Generator[NodeEventBase, None, LLMGenerationData]: result: LLMGenerationData | None = None + # FIXME(Mairuis): Async processing for bash session. with SandboxBashSession(sandbox=sandbox, node_id=self.id, tools=tool_dependencies) as session: prompt_files = self._extract_prompt_files(variable_pool) model_features = self._get_model_features(model_instance) diff --git a/api/core/zip_sandbox/__init__.py b/api/core/zip_sandbox/__init__.py index 746bb99a79..71d36e6ee9 100644 --- a/api/core/zip_sandbox/__init__.py +++ b/api/core/zip_sandbox/__init__.py @@ -1,7 +1,8 @@ -from .zip_sandbox import SandboxDownloadItem, SandboxFile, ZipSandbox +from .zip_sandbox import SandboxDownloadItem, SandboxFile, SandboxUploadItem, ZipSandbox __all__ = [ "SandboxDownloadItem", "SandboxFile", + "SandboxUploadItem", "ZipSandbox", ] diff --git a/api/core/zip_sandbox/zip_sandbox.py b/api/core/zip_sandbox/zip_sandbox.py index bd71096d47..d58814781a 100644 --- a/api/core/zip_sandbox/zip_sandbox.py +++ b/api/core/zip_sandbox/zip_sandbox.py @@ -27,10 +27,20 @@ from .strategy import ZipStrategy @dataclass(frozen=True) class SandboxDownloadItem: + """Item for downloading: URL -> sandbox path.""" + url: str path: str +@dataclass(frozen=True) +class SandboxUploadItem: + """Item for uploading: sandbox path -> URL.""" + + path: str + url: str + + @dataclass(frozen=True) class SandboxFile: """A handle to a file in the sandbox.""" @@ -210,25 +220,6 @@ class ZipSandbox: # ========== Download operations ========== - def download(self, urls: list[str], *, dest_dir: str = ".") -> list[str]: - if not urls: - return [] - - dest_dir = self._normalize_path(dest_dir) - paths = [self._dest_path_for_url(dest_dir, u) for u in urls] - - p = pipeline(self.vm) - p.add(["mkdir", "-p", dest_dir], error_message="Failed to create download directory") - for url, out_path in zip(urls, paths, strict=True): - p.add(["curl", "-fsSL", url, "-o", out_path], error_message="Failed to download file") - - try: - p.execute(timeout=self._DEFAULT_TIMEOUT_SECONDS, raise_on_error=True) - except Exception as exc: - raise RuntimeError(str(exc)) from exc - - return paths - def download_items(self, items: list[SandboxDownloadItem], *, dest_dir: str = ".") -> list[str]: if not items: return [] @@ -286,6 +277,32 @@ class ZipSandbox: except CommandExecutionError as exc: raise RuntimeError(str(exc)) from exc + def upload_items(self, items: list[SandboxUploadItem], *, src_dir: str = ".") -> None: + """Upload multiple files from sandbox to target URLs. + + Args: + items: List of SandboxUploadItem(path, url) + src_dir: Base directory containing the files + """ + if not items: + return + + src_dir = self._normalize_path(src_dir) + p = pipeline(self.vm) + + for item in items: + rel = self._normalize_path(item.path) + src_path = posixpath.join(src_dir, rel) if src_dir not in ("", ".") else rel + p.add( + ["curl", "-fsSL", "-X", "PUT", "-T", src_path, item.url], + error_message=f"Failed to upload {item.path}", + ) + + try: + p.execute(timeout=self._DEFAULT_TIMEOUT_SECONDS, raise_on_error=True) + except Exception as exc: + raise RuntimeError(str(exc)) from exc + # ========== Archive operations ========== def zip(self, src: str = ".", *, include_base: bool = True) -> SandboxFile: diff --git a/api/services/app_asset_service.py b/api/services/app_asset_service.py index c8aeca8605..8382d88e85 100644 --- a/api/services/app_asset_service.py +++ b/api/services/app_asset_service.py @@ -54,6 +54,22 @@ class AppAssetService: def _lock(app_id: str): return redis_client.lock(f"app_asset:lock:{app_id}", timeout=AppAssetService._LOCK_TIMEOUT_SECONDS) + @staticmethod + def get_assets_by_version(tenant_id: str, app_id: str, workflow_id: str | None = None) -> AppAssets: + """Get asset tree by workflow_id (published) or draft if workflow_id is None.""" + with Session(db.engine) as session: + version = workflow_id or AppAssets.VERSION_DRAFT + assets = ( + session.query(AppAssets) + .filter( + AppAssets.tenant_id == tenant_id, + AppAssets.app_id == app_id, + AppAssets.version == version, + ) + .first() + ) + return assets or AppAssets(tenant_id=tenant_id, app_id=app_id, version=version) + @staticmethod def get_draft_assets(tenant_id: str, app_id: str) -> list[AssetItem]: with Session(db.engine) as session: diff --git a/api/services/app_bundle_service.py b/api/services/app_bundle_service.py index 1a1102a66d..4fd8f1371f 100644 --- a/api/services/app_bundle_service.py +++ b/api/services/app_bundle_service.py @@ -1,26 +1,47 @@ +"""Service for exporting and importing App Bundles (DSL + assets). + +Bundle structure: + bundle.zip/ + {app_name}.yml # DSL file + manifest.json # Asset manifest (required for import) + {app_name}/ # Asset files + folder/file.txt + ... + +Import flow (sandbox-based): + 1. prepare_import: Frontend gets upload URL, stores import_id in Redis + 2. Frontend uploads zip to storage + 3. confirm_import: Sandbox downloads zip, extracts, uploads assets via presigned URLs + +Manifest format (schema_version 1.0): + - app_assets.tree: Full AppAssetFileTree for 100% ID restoration + - files: node_id -> path mapping for file nodes + - integrity.file_count: Basic validation +""" + from __future__ import annotations -import io +import json import logging import re -import zipfile +from dataclasses import dataclass from uuid import uuid4 -import yaml +from pydantic import ValidationError from sqlalchemy.orm import Session from core.app.entities.app_bundle_entities import ( - BUNDLE_DSL_FILENAME_PATTERN, - BUNDLE_MAX_SIZE, + MANIFEST_FILENAME, BundleExportResult, BundleFormatError, - ZipSecurityError, + BundleManifest, ) -from core.app_assets.storage import AssetPath -from core.app_bundle import SourceZipExtractor -from core.zip_sandbox import SandboxDownloadItem, ZipSandbox +from core.app_assets.storage import AppAssetStorage, AssetPath, BundleImportZipPath +from core.zip_sandbox import SandboxDownloadItem, SandboxUploadItem, ZipSandbox from extensions.ext_database import db -from models import Account, App +from extensions.ext_redis import redis_client +from models.account import Account +from models.model import App from .app_asset_package_service import AppAssetPackageService from .app_asset_service import AppAssetService @@ -28,6 +49,15 @@ from .app_dsl_service import AppDslService, Import logger = logging.getLogger(__name__) +_IMPORT_REDIS_PREFIX = "app_bundle:import:" +_IMPORT_TTL_SECONDS = 3600 # 1 hour + + +@dataclass +class ImportPrepareResult: + import_id: str + upload_url: str + class AppBundleService: @staticmethod @@ -38,14 +68,10 @@ class AppBundleService: marked_name: str = "", marked_comment: str = "", ): - """ - Publish App Bundle (workflow + assets). - Coordinates WorkflowService and AppAssetService publishing in a single transaction. - """ + """Publish App Bundle (workflow + assets) in a single transaction.""" from models.workflow import Workflow from services.workflow_service import WorkflowService - # 1. Publish workflow workflow: Workflow = WorkflowService().publish_workflow( session=session, app_model=app_model, @@ -53,17 +79,16 @@ class AppBundleService: marked_name=marked_name, marked_comment=marked_comment, ) - - # 2. Publish assets (bound to workflow_id) AppAssetPackageService.publish( session=session, app_model=app_model, account_id=account.id, workflow_id=workflow.id, ) - return workflow + # ========== Export ========== + @staticmethod def export_bundle( *, @@ -73,14 +98,14 @@ class AppBundleService: workflow_id: str | None = None, expires_in: int = 10 * 60, ) -> BundleExportResult: - """Export bundle and return a temporary download URL. - - Uses sandbox VM to build the ZIP, avoiding memory pressure in API process. - """ + """Export bundle with manifest.json and return a temporary download URL.""" tenant_id = app_model.tenant_id app_id = app_model.id safe_name = AppBundleService._sanitize_filename(app_model.name) - filename = f"{safe_name}.zip" + + dsl_filename = f"{safe_name}.yml" + app_assets = AppAssetService.get_assets_by_version(tenant_id, app_id, workflow_id) + manifest = BundleManifest.from_tree(app_assets.asset_tree, dsl_filename) export_id = uuid4().hex export_path = AssetPath.bundle_export_zip(tenant_id, app_id, export_id) @@ -95,147 +120,170 @@ class AppBundleService: with ZipSandbox(tenant_id=tenant_id, user_id=account_id, app_id="app-bundle-export") as zs: zs.write_file(f"bundle_root/{safe_name}.yml", dsl_content.encode("utf-8")) + zs.write_file(f"bundle_root/{MANIFEST_FILENAME}", manifest.model_dump_json(indent=2).encode("utf-8")) - # Published assets: use stored source zip and unzip into /... if workflow_id is not None: source_zip_path = AssetPath.source_zip(tenant_id, app_id, workflow_id) source_url = asset_storage.get_download_url(source_zip_path, expires_in) zs.download_archive(source_url, path="tmp/source_assets.zip") zs.unzip(archive_path="tmp/source_assets.zip", dest_dir=f"bundle_root/{safe_name}") else: - # Draft assets: download individual files and place under /... asset_items = AppAssetService.get_draft_assets(tenant_id, app_id) - asset_urls = asset_storage.get_download_urls( - [AssetPath.draft(tenant_id, app_id, a.asset_id) for a in asset_items], expires_in - ) - zs.download_items( - [ - SandboxDownloadItem(url=url, path=f"{safe_name}/{a.path}") - for a, url in zip(asset_items, asset_urls, strict=True) - ], - dest_dir="bundle_root", - ) + if asset_items: + asset_urls = asset_storage.get_download_urls( + [AssetPath.draft(tenant_id, app_id, a.asset_id) for a in asset_items], expires_in + ) + zs.download_items( + [ + SandboxDownloadItem(url=url, path=f"{safe_name}/{a.path}") + for a, url in zip(asset_items, asset_urls, strict=True) + ], + dest_dir="bundle_root", + ) archive = zs.zip(src="bundle_root", include_base=False) zs.upload(archive, upload_url) download_url = asset_storage.get_download_url(export_path, expires_in) - return BundleExportResult(download_url=download_url, filename=filename) + return BundleExportResult(download_url=download_url, filename=f"{safe_name}.zip") + + # ========== Import ========== @staticmethod - def import_bundle( + def prepare_import(tenant_id: str, account_id: str) -> ImportPrepareResult: + """Prepare import: generate import_id and upload URL.""" + import_id = uuid4().hex + import_path = AssetPath.bundle_import_zip(tenant_id, import_id) + asset_storage = AppAssetService.get_storage() + upload_url = asset_storage.get_import_upload_url(import_path, _IMPORT_TTL_SECONDS) + + redis_client.setex( + f"{_IMPORT_REDIS_PREFIX}{import_id}", + _IMPORT_TTL_SECONDS, + json.dumps({"tenant_id": tenant_id, "account_id": account_id}), + ) + + return ImportPrepareResult(import_id=import_id, upload_url=upload_url) + + @staticmethod + def confirm_import( + import_id: str, account: Account, - zip_bytes: bytes, + *, name: str | None = None, description: str | None = None, icon_type: str | None = None, icon: str | None = None, icon_background: str | None = None, ) -> Import: - if len(zip_bytes) > BUNDLE_MAX_SIZE: - raise BundleFormatError(f"Bundle size exceeds limit: {BUNDLE_MAX_SIZE} bytes") + """Confirm import: download zip in sandbox, extract, and upload assets.""" + redis_key = f"{_IMPORT_REDIS_PREFIX}{import_id}" + redis_data = redis_client.get(redis_key) + if not redis_data: + raise BundleFormatError("Import session expired or not found") - dsl_content, assets_prefix = AppBundleService._extract_dsl_from_bundle(zip_bytes) + import_meta = json.loads(redis_data) + tenant_id: str = import_meta["tenant_id"] - with Session(db.engine) as session: - dsl_service = AppDslService(session) - import_result = dsl_service.import_app( + if tenant_id != account.current_tenant_id: + raise BundleFormatError("Import session tenant mismatch") + + import_path = AssetPath.bundle_import_zip(tenant_id, import_id) + asset_storage = AppAssetService.get_storage() + + try: + result = AppBundleService.import_bundle( + tenant_id=tenant_id, account=account, - import_mode="yaml-content", - yaml_content=dsl_content, + import_path=import_path, + asset_storage=asset_storage, name=name, description=description, icon_type=icon_type, icon=icon, icon_background=icon_background, - app_id=None, ) - session.commit() + finally: + redis_client.delete(redis_key) + asset_storage.delete_import_zip(import_path) - if import_result.app_id and assets_prefix: - AppBundleService._import_assets_from_bundle( - zip_bytes=zip_bytes, - assets_prefix=assets_prefix, - app_id=import_result.app_id, - account_id=account.id, - ) + return result + + @staticmethod + def import_bundle( + *, + tenant_id: str, + account: Account, + import_path: BundleImportZipPath, + asset_storage: AppAssetStorage, + name: str | None, + description: str | None, + icon_type: str | None, + icon: str | None, + icon_background: str | None, + ) -> Import: + """Execute import in sandbox.""" + download_url = asset_storage.get_import_download_url(import_path, _IMPORT_TTL_SECONDS) + + with ZipSandbox(tenant_id=tenant_id, user_id=account.id, app_id="app-bundle-import") as zs: + zs.download_archive(download_url, path="import.zip") + zs.unzip(archive_path="import.zip", dest_dir="bundle") + + manifest_bytes = zs.read_file(f"bundle/{MANIFEST_FILENAME}") + try: + manifest = BundleManifest.model_validate_json(manifest_bytes) + except ValidationError as e: + raise BundleFormatError(f"Invalid manifest.json: {e}") from e + + dsl_content = zs.read_file(f"bundle/{manifest.dsl_filename}").decode("utf-8") + + with Session(db.engine) as session: + dsl_service = AppDslService(session) + import_result = dsl_service.import_app( + account=account, + import_mode="yaml-content", + yaml_content=dsl_content, + name=name, + description=description, + icon_type=icon_type, + icon=icon, + icon_background=icon_background, + app_id=None, + ) + session.commit() + + if not import_result.app_id: + return import_result + + app_id = import_result.app_id + tree = manifest.app_assets.tree + + upload_items: list[SandboxUploadItem] = [] + for file_entry in manifest.files: + asset_path = AssetPath.draft(tenant_id, app_id, file_entry.node_id) + file_upload_url = asset_storage.get_upload_url(asset_path, _IMPORT_TTL_SECONDS) + src_path = f"{manifest.assets_prefix}/{file_entry.path}" + upload_items.append(SandboxUploadItem(path=src_path, url=file_upload_url)) + + if upload_items: + zs.upload_items(upload_items, src_dir="bundle") + + # Tree sizes are already set from manifest; no need to update + app_model = db.session.query(App).filter(App.id == app_id).first() + if app_model: + AppAssetService.set_draft_assets( + app_model=app_model, + account_id=account.id, + new_tree=tree, + ) return import_result - @staticmethod - def _extract_dsl_from_bundle(zip_bytes: bytes) -> tuple[str, str | None]: - dsl_content: str | None = None - dsl_filename: str | None = None - - with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zf: - for info in zf.infolist(): - if info.is_dir(): - continue - if BUNDLE_DSL_FILENAME_PATTERN.match(info.filename): - if dsl_content is not None: - raise BundleFormatError("Multiple DSL files found in bundle") - dsl_content = zf.read(info).decode("utf-8") - dsl_filename = info.filename - - if dsl_content is None or dsl_filename is None: - raise BundleFormatError("No DSL file (*.yml or *.yaml) found in bundle root") - - yaml.safe_load(dsl_content) - - assets_prefix = dsl_filename.rsplit(".", 1)[0] - has_assets = AppBundleService._check_assets_prefix_exists(zip_bytes, assets_prefix) - - return dsl_content, assets_prefix if has_assets else None - - @staticmethod - def _check_assets_prefix_exists(zip_bytes: bytes, prefix: str) -> bool: - with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zf: - for info in zf.infolist(): - if info.filename.startswith(f"{prefix}/"): - return True - return False - - @staticmethod - def _import_assets_from_bundle( - zip_bytes: bytes, - assets_prefix: str, - app_id: str, - account_id: str, - ) -> None: - app_model = db.session.query(App).filter(App.id == app_id).first() - if not app_model: - logger.warning("App not found for asset import: %s", app_id) - return - - asset_storage = AppAssetService.get_storage() - extractor = SourceZipExtractor(asset_storage) - try: - folders, files = extractor.extract_entries( - zip_bytes, - expected_prefix=f"{assets_prefix}/", - ) - except ZipSecurityError as e: - logger.warning("Zip security error during asset import: %s", e) - return - - if not folders and not files: - return - - new_tree = extractor.build_tree_and_save( - folders=folders, - files=files, - tenant_id=app_model.tenant_id, - app_id=app_model.id, - ) - - AppAssetService.set_draft_assets( - app_model=app_model, - account_id=account_id, - new_tree=new_tree, - ) + # ========== Helpers ========== @staticmethod def _sanitize_filename(name: str) -> str: + """Sanitize app name for use as filename.""" safe = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name) safe = safe.strip(". ") return safe[:100] if safe else "app" diff --git a/web/service/apps.ts b/web/service/apps.ts index 3ac5395a11..88df129cc3 100644 --- a/web/service/apps.ts +++ b/web/service/apps.ts @@ -157,6 +157,41 @@ export const importDSLConfirm = ({ import_id }: { import_id: string }): Promise< return post(`apps/imports/${import_id}/confirm`, { body: {} }) } +export type ImportBundlePrepareResponse = { + import_id: string + upload_url: string +} + +export const prepareImportBundle = (): Promise => { + return post('apps/imports-bundle/prepare', { body: {} }) +} + +export const confirmImportBundle = ({ + import_id, + name, + description, + icon_type, + icon, + icon_background, +}: { + import_id: string + name?: string + description?: string + icon_type?: string + icon?: string + icon_background?: string +}): Promise => { + return post(`apps/imports-bundle/${import_id}/confirm`, { + body: { + name, + description, + icon_type, + icon, + icon_background, + }, + }) +} + export const importAppBundle = async ({ file, name, @@ -172,37 +207,27 @@ export const importAppBundle = async ({ icon?: string icon_background?: string }): Promise => { - const { API_PREFIX, CSRF_COOKIE_NAME, CSRF_HEADER_NAME } = await import('@/config') - const Cookies = (await import('js-cookie')).default + // Step 1: Prepare import and get upload URL + const { import_id, upload_url } = await prepareImportBundle() - const formData = new FormData() - formData.append('file', file) - if (name) - formData.append('name', name) - if (description) - formData.append('description', description) - if (icon_type) - formData.append('icon_type', icon_type) - if (icon) - formData.append('icon', icon) - if (icon_background) - formData.append('icon_background', icon_background) - - const response = await fetch(`${API_PREFIX}/apps/imports-bundle`, { - method: 'POST', - credentials: 'include', - headers: { - [CSRF_HEADER_NAME]: Cookies.get(CSRF_COOKIE_NAME()) || '', - }, - body: formData, + // Step 2: Upload file to presigned URL + const uploadResponse = await fetch(upload_url, { + method: 'PUT', + body: file, }) - if (!response.ok) { - const errorData = await response.json() - throw new Error(errorData.error || 'Import bundle failed') - } + if (!uploadResponse.ok) + throw new Error('Failed to upload bundle file') - return response.json() + // Step 3: Confirm import + return confirmImportBundle({ + import_id, + name, + description, + icon_type, + icon, + icon_background, + }) } export const switchApp = ({ appID, name, icon_type, icon, icon_background }: { appID: string, name: string, icon_type: AppIconType, icon: string, icon_background?: string | null }): Promise<{ new_app_id: string }> => {