feat(bundle): manifest-driven import with sandbox upload

- Add BundleManifest with dsl_filename for 100% tree ID restoration
- Implement two-step import flow: prepare (get upload URL) + confirm
- Use sandbox for zip extraction and file upload via presigned URLs
- Store import session in Redis with 1h TTL
- Add SandboxUploadItem for symmetric download/upload API
- Remove legacy source_zip_extractor, inline logic in service
- Update frontend to use new prepare/confirm API flow
This commit is contained in:
Harry 2026-01-29 19:02:29 +08:00
parent 919d7ef5cd
commit f198540357
14 changed files with 468 additions and 317 deletions

View File

@ -51,7 +51,7 @@ class AppImportPayload(BaseModel):
app_id: str | None = Field(None)
class AppImportBundlePayload(BaseModel):
class AppImportBundleConfirmPayload(BaseModel):
name: str | None = None
description: str | None = None
icon_type: str | None = None
@ -149,15 +149,38 @@ class AppImportCheckDependenciesApi(Resource):
return result.model_dump(mode="json"), 200
@console_ns.route("/apps/imports-bundle")
class AppImportBundleApi(Resource):
@console_ns.route("/apps/imports-bundle/prepare")
class AppImportBundlePrepareApi(Resource):
"""Step 1: Get upload URL for bundle import."""
@setup_required
@login_required
@account_initialization_required
@edit_permission_required
def post(self):
from services.app_bundle_service import AppBundleService
current_user, current_tenant_id = current_account_with_tenant()
result = AppBundleService.prepare_import(
tenant_id=current_tenant_id,
account_id=current_user.id,
)
return {"import_id": result.import_id, "upload_url": result.upload_url}, 200
@console_ns.route("/apps/imports-bundle/<string:import_id>/confirm")
class AppImportBundleConfirmApi(Resource):
"""Step 2: Confirm bundle import after upload."""
@setup_required
@login_required
@account_initialization_required
@marshal_with(app_import_model)
@cloud_edition_billing_resource_check("apps")
@edit_permission_required
def post(self):
def post(self, import_id: str):
from flask import request
from core.app.entities.app_bundle_entities import BundleFormatError
@ -165,22 +188,12 @@ class AppImportBundleApi(Resource):
current_user, _ = current_account_with_tenant()
if "file" not in request.files:
return {"error": "No file provided"}, 400
file = request.files["file"]
if not file.filename or not file.filename.endswith(".zip"):
return {"error": "Invalid file format, expected .zip"}, 400
zip_bytes = file.read()
form_data = request.form.to_dict()
args = AppImportBundlePayload.model_validate(form_data)
args = AppImportBundleConfirmPayload.model_validate(request.get_json() or {})
try:
result = AppBundleService.import_bundle(
result = AppBundleService.confirm_import(
import_id=import_id,
account=current_user,
zip_bytes=zip_bytes,
name=args.name,
description=args.description,
icon_type=args.icon_type,

View File

@ -1,12 +1,17 @@
from __future__ import annotations
import re
from datetime import UTC, datetime
from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field
from core.app.entities.app_asset_entities import AppAssetFileTree
# Constants
BUNDLE_DSL_FILENAME_PATTERN = re.compile(r"^[^/]+\.ya?ml$")
BUNDLE_MAX_SIZE = 50 * 1024 * 1024 # 50MB
MANIFEST_FILENAME = "manifest.json"
MANIFEST_SCHEMA_VERSION = "1.0"
# Exceptions
@ -22,21 +27,70 @@ class ZipSecurityError(Exception):
pass
# Entities
# Manifest DTOs
class ManifestFileEntry(BaseModel):
"""Maps node_id to file path in the bundle."""
model_config = ConfigDict(extra="forbid")
node_id: str
path: str
class ManifestIntegrity(BaseModel):
"""Basic integrity check fields."""
model_config = ConfigDict(extra="forbid")
file_count: int
class ManifestAppAssets(BaseModel):
"""App assets section containing the full tree."""
model_config = ConfigDict(extra="forbid")
tree: AppAssetFileTree
class BundleManifest(BaseModel):
"""
Bundle manifest for app asset import/export.
Schema version 1.0:
- dsl_filename: DSL file name in bundle root (e.g. "my_app.yml")
- tree: Full AppAssetFileTree (files + folders) for 100% restoration including node IDs
- files: Explicit node_id -> path mapping for file nodes only
- integrity: Basic file_count validation
"""
model_config = ConfigDict(extra="forbid")
schema_version: str = Field(default=MANIFEST_SCHEMA_VERSION)
generated_at: datetime = Field(default_factory=lambda: datetime.now(tz=UTC))
dsl_filename: str = Field(description="DSL file name in bundle root")
app_assets: ManifestAppAssets
files: list[ManifestFileEntry]
integrity: ManifestIntegrity
@property
def assets_prefix(self) -> str:
"""Assets directory name (DSL filename without extension)."""
return self.dsl_filename.rsplit(".", 1)[0]
@classmethod
def from_tree(cls, tree: AppAssetFileTree, dsl_filename: str) -> BundleManifest:
"""Build manifest from an AppAssetFileTree."""
files = [ManifestFileEntry(node_id=n.id, path=tree.get_path(n.id)) for n in tree.walk_files()]
return cls(
dsl_filename=dsl_filename,
app_assets=ManifestAppAssets(tree=tree),
files=files,
integrity=ManifestIntegrity(file_count=len(files)),
)
# Export result
class BundleExportResult(BaseModel):
download_url: str = Field(description="Temporary download URL for the ZIP")
filename: str = Field(description="Suggested filename for the ZIP")
class SourceFileEntry(BaseModel):
path: str = Field(description="File path within the ZIP")
node_id: str = Field(description="Node ID in the asset tree")
class ExtractedFile(BaseModel):
path: str = Field(description="Relative path of the extracted file")
content: bytes = Field(description="File content as bytes")
class ExtractedFolder(BaseModel):
path: str = Field(description="Relative path of the extracted folder")

View File

@ -152,6 +152,20 @@ class _BundleExportZipAssetPath(SignedAssetPath):
return [self.asset_type, self.tenant_id, self.app_id, self.resource_id]
@dataclass(frozen=True)
class BundleImportZipPath:
"""Path for temporary import zip files. Not signed, uses direct presign URLs only."""
tenant_id: str
import_id: str
def __post_init__(self) -> None:
_require_uuid(self.tenant_id, "tenant_id")
def get_storage_key(self) -> str:
return f"{_ASSET_BASE}/{self.tenant_id}/imports/{self.import_id}.zip"
class AssetPath:
@staticmethod
def draft(tenant_id: str, app_id: str, node_id: str) -> SignedAssetPath:
@ -177,6 +191,10 @@ class AssetPath:
def bundle_export_zip(tenant_id: str, app_id: str, export_id: str) -> SignedAssetPath:
return _BundleExportZipAssetPath(tenant_id=tenant_id, app_id=app_id, resource_id=export_id)
@staticmethod
def bundle_import_zip(tenant_id: str, import_id: str) -> BundleImportZipPath:
return BundleImportZipPath(tenant_id=tenant_id, import_id=import_id)
@staticmethod
def from_components(
asset_type: str,
@ -386,6 +404,23 @@ class AppAssetStorage:
return self._generate_signed_proxy_upload_url(asset_path, expires_in)
def get_import_upload_url(self, path: BundleImportZipPath, expires_in: int = 3600) -> str:
"""Get upload URL for import zip (direct presign, no proxy fallback)."""
return self._storage.get_upload_url(path.get_storage_key(), expires_in)
def get_import_download_url(self, path: BundleImportZipPath, expires_in: int = 3600) -> str:
"""Get download URL for import zip (direct presign, no proxy fallback)."""
return self._storage.get_download_url(path.get_storage_key(), expires_in)
def delete_import_zip(self, path: BundleImportZipPath) -> None:
"""Delete import zip file. Errors are logged but not raised."""
try:
self._storage.delete(path.get_storage_key())
except Exception:
import logging
logging.getLogger(__name__).debug("Failed to delete import zip: %s", path.get_storage_key())
def _generate_signed_proxy_download_url(self, asset_path: SignedAssetPath, expires_in: int) -> str:
expires_in = min(expires_in, dify_config.FILES_ACCESS_TIMEOUT)
expires_at = int(time.time()) + max(expires_in, 1)

View File

@ -1,5 +1 @@
from .source_zip_extractor import SourceZipExtractor
__all__ = [
"SourceZipExtractor",
]
# App bundle utilities - manifest-driven import/export handled by AppBundleService

View File

@ -1,98 +0,0 @@
from __future__ import annotations
import io
import zipfile
from typing import TYPE_CHECKING
from uuid import uuid4
from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode
from core.app.entities.app_bundle_entities import ExtractedFile, ExtractedFolder, ZipSecurityError
from core.app_assets.storage import AssetPath
if TYPE_CHECKING:
from core.app_assets.storage import AppAssetStorage
class SourceZipExtractor:
def __init__(self, storage: AppAssetStorage) -> None:
self._storage = storage
def extract_entries(
self, zip_bytes: bytes, *, expected_prefix: str
) -> tuple[list[ExtractedFolder], list[ExtractedFile]]:
folders: list[ExtractedFolder] = []
files: list[ExtractedFile] = []
with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zf:
for info in zf.infolist():
name = info.filename
self._validate_path(name)
if not name.startswith(expected_prefix):
continue
relative_path = name[len(expected_prefix) :].lstrip("/")
if not relative_path:
continue
if info.is_dir():
folders.append(ExtractedFolder(path=relative_path.rstrip("/")))
else:
content = zf.read(info)
files.append(ExtractedFile(path=relative_path, content=content))
return folders, files
def build_tree_and_save(
self,
folders: list[ExtractedFolder],
files: list[ExtractedFile],
tenant_id: str,
app_id: str,
) -> AppAssetFileTree:
tree = AppAssetFileTree()
path_to_node_id: dict[str, str] = {}
all_folder_paths = {f.path for f in folders}
for file in files:
self._ensure_parent_folders(file.path, all_folder_paths)
sorted_folders = sorted(all_folder_paths, key=lambda p: p.count("/"))
for folder_path in sorted_folders:
node_id = str(uuid4())
name = folder_path.rsplit("/", 1)[-1]
parent_path = folder_path.rsplit("/", 1)[0] if "/" in folder_path else None
parent_id = path_to_node_id.get(parent_path) if parent_path else None
node = AppAssetNode.create_folder(node_id, name, parent_id)
tree.add(node)
path_to_node_id[folder_path] = node_id
sorted_files = sorted(files, key=lambda f: f.path)
for file in sorted_files:
node_id = str(uuid4())
name = file.path.rsplit("/", 1)[-1]
parent_path = file.path.rsplit("/", 1)[0] if "/" in file.path else None
parent_id = path_to_node_id.get(parent_path) if parent_path else None
node = AppAssetNode.create_file(node_id, name, parent_id, len(file.content))
tree.add(node)
asset_path = AssetPath.draft(tenant_id, app_id, node_id)
self._storage.save(asset_path, file.content)
return tree
def _validate_path(self, path: str) -> None:
if ".." in path:
raise ZipSecurityError(f"Path traversal detected: {path}")
if path.startswith("/"):
raise ZipSecurityError(f"Absolute path detected: {path}")
if "\\" in path:
raise ZipSecurityError(f"Backslash in path: {path}")
def _ensure_parent_folders(self, file_path: str, folder_set: set[str]) -> None:
parts = file_path.split("/")[:-1]
for i in range(1, len(parts) + 1):
parent = "/".join(parts[:i])
folder_set.add(parent)

View File

@ -2,21 +2,40 @@ import logging
from core.app_assets.storage import AssetPath
from core.skill.entities.skill_bundle import SkillBundle
from extensions.ext_redis import redis_client
from services.app_asset_service import AppAssetService
logger = logging.getLogger(__name__)
class SkillManager:
_CACHE_KEY_PREFIX = "skill_bundle"
_CACHE_TTL_SECONDS = 60 * 60 * 24
@staticmethod
def get_cache_key(
tenant_id: str,
app_id: str,
assets_id: str,
) -> str:
return f"{SkillManager._CACHE_KEY_PREFIX}:{tenant_id}:{app_id}:{assets_id}"
@staticmethod
def load_bundle(
tenant_id: str,
app_id: str,
assets_id: str,
) -> SkillBundle:
cache_key = SkillManager.get_cache_key(tenant_id, app_id, assets_id)
data = redis_client.get(cache_key)
if data:
return SkillBundle.model_validate_json(data)
asset_path = AssetPath.skill_bundle(tenant_id, app_id, assets_id)
data = AppAssetService.get_storage().load(asset_path)
return SkillBundle.model_validate_json(data)
bundle = SkillBundle.model_validate_json(data)
redis_client.setex(cache_key, SkillManager._CACHE_TTL_SECONDS, bundle.model_dump_json(indent=2).encode("utf-8"))
return bundle
@staticmethod
def save_bundle(
@ -30,3 +49,5 @@ class SkillManager:
asset_path,
bundle.model_dump_json(indent=2).encode("utf-8"),
)
cache_key = SkillManager.get_cache_key(tenant_id, app_id, assets_id)
redis_client.delete(cache_key)

View File

@ -1,3 +1,5 @@
from queue import Empty, Queue
from core.virtual_environment.channel.exec import TransportEOFError
from core.virtual_environment.channel.transport import TransportReadCloser
@ -27,8 +29,6 @@ class QueueTransportReadCloser(TransportReadCloser):
A write handler that writes data to a queue.
"""
from queue import Queue
def __init__(self, queue: Queue[bytes | None]) -> None:
self.queue = queue
@ -70,7 +70,6 @@ class QueueTransportReadCloser(TransportReadCloser):
NEVER USE IT IN A MULTI-THREADED CONTEXT WITHOUT PROPER SYNCHRONIZATION.
"""
from queue import Empty
if n <= 0:
return b""

View File

@ -148,7 +148,8 @@ class DockerDemuxer:
to periodically check for errors and closed state instead of blocking forever.
"""
if self._error:
raise TransportEOFError(f"Demuxer error: {self._error}") from self._error
error = cast(BaseException, self._error)
raise TransportEOFError(f"Demuxer error: {error}") from error
while True:
try:
@ -163,7 +164,8 @@ class DockerDemuxer:
if self._closed:
raise TransportEOFError("Demuxer closed")
if self._error:
raise TransportEOFError(f"Demuxer error: {self._error}") from self._error
error = cast(BaseException, self._error)
raise TransportEOFError(f"Demuxer error: {error}") from error
# No error, continue waiting
def close(self) -> None:
@ -292,6 +294,8 @@ class DockerDaemonEnvironment(VirtualEnvironment):
@classmethod
def validate(cls, options: Mapping[str, Any]) -> None:
# Import Docker SDK lazily so it is loaded after gevent monkey-patching.
import docker.errors
import docker
docker_sock = options.get(cls.OptionsKey.DOCKER_SOCK, cls._DEFAULT_DOCKER_SOCK)
@ -364,6 +368,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
NOTE: I guess nobody will use more than 5 different docker sockets in practice....
"""
import docker
return docker.DockerClient(base_url=docker_sock)
@classmethod
@ -373,6 +378,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
Get the Docker low-level API client.
"""
import docker
return docker.APIClient(base_url=docker_sock)
def get_docker_sock(self) -> str:
@ -431,6 +437,12 @@ class DockerDaemonEnvironment(VirtualEnvironment):
return self._container_path(path)
def upload_file(self, path: str, content: BytesIO) -> None:
"""Upload a file to the container.
Files and intermediate directories are created with world-writable permissions
(0o777 for directories, 0o666 for files) to avoid permission issues when the container
runs as a non-root user but Docker's put_archive creates files as root.
"""
container = self._get_container()
normalized = PurePosixPath(path)
@ -442,6 +454,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
with tarfile.open(fileobj=tar_stream, mode="w") as tar:
tar_info = tarfile.TarInfo(name=file_name)
tar_info.size = len(payload)
tar_info.mode = 0o666
tar.addfile(tar_info, BytesIO(payload))
tar_stream.seek(0)
container.put_archive(parent_dir, tar_stream.read()) # pyright: ignore[reportUnknownMemberType] #
@ -454,8 +467,18 @@ class DockerDaemonEnvironment(VirtualEnvironment):
payload = content.getvalue()
tar_stream = BytesIO()
with tarfile.open(fileobj=tar_stream, mode="w") as tar:
# Add intermediate directories with proper permissions
for i in range(len(relative_path.parts) - 1):
dir_path = PurePosixPath(*relative_path.parts[: i + 1])
dir_info = tarfile.TarInfo(name=dir_path.as_posix() + "/")
dir_info.type = tarfile.DIRTYPE
dir_info.mode = 0o777
tar.addfile(dir_info)
# Add the file
tar_info = tarfile.TarInfo(name=relative_path.as_posix())
tar_info.size = len(payload)
tar_info.mode = 0o666
tar.addfile(tar_info, BytesIO(payload))
tar_stream.seek(0)
container.put_archive(self._working_dir, tar_stream.read()) # pyright: ignore[reportUnknownMemberType] #
@ -479,7 +502,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
return BytesIO(extracted.read())
def list_files(self, directory_path: str, limit: int) -> Sequence[FileState]:
import docker
import docker.errors
container = self._get_container()
container_path = self._container_path(directory_path)
@ -525,7 +548,7 @@ class DockerDaemonEnvironment(VirtualEnvironment):
pass
def release_environment(self) -> None:
import docker
import docker.errors
try:
container = self._get_container()

View File

@ -1918,6 +1918,7 @@ class LLMNode(Node[LLMNodeData]):
) -> Generator[NodeEventBase, None, LLMGenerationData]:
result: LLMGenerationData | None = None
# FIXME(Mairuis): Async processing for bash session.
with SandboxBashSession(sandbox=sandbox, node_id=self.id, tools=tool_dependencies) as session:
prompt_files = self._extract_prompt_files(variable_pool)
model_features = self._get_model_features(model_instance)

View File

@ -1,7 +1,8 @@
from .zip_sandbox import SandboxDownloadItem, SandboxFile, ZipSandbox
from .zip_sandbox import SandboxDownloadItem, SandboxFile, SandboxUploadItem, ZipSandbox
__all__ = [
"SandboxDownloadItem",
"SandboxFile",
"SandboxUploadItem",
"ZipSandbox",
]

View File

@ -27,10 +27,20 @@ from .strategy import ZipStrategy
@dataclass(frozen=True)
class SandboxDownloadItem:
"""Item for downloading: URL -> sandbox path."""
url: str
path: str
@dataclass(frozen=True)
class SandboxUploadItem:
"""Item for uploading: sandbox path -> URL."""
path: str
url: str
@dataclass(frozen=True)
class SandboxFile:
"""A handle to a file in the sandbox."""
@ -210,25 +220,6 @@ class ZipSandbox:
# ========== Download operations ==========
def download(self, urls: list[str], *, dest_dir: str = ".") -> list[str]:
if not urls:
return []
dest_dir = self._normalize_path(dest_dir)
paths = [self._dest_path_for_url(dest_dir, u) for u in urls]
p = pipeline(self.vm)
p.add(["mkdir", "-p", dest_dir], error_message="Failed to create download directory")
for url, out_path in zip(urls, paths, strict=True):
p.add(["curl", "-fsSL", url, "-o", out_path], error_message="Failed to download file")
try:
p.execute(timeout=self._DEFAULT_TIMEOUT_SECONDS, raise_on_error=True)
except Exception as exc:
raise RuntimeError(str(exc)) from exc
return paths
def download_items(self, items: list[SandboxDownloadItem], *, dest_dir: str = ".") -> list[str]:
if not items:
return []
@ -286,6 +277,32 @@ class ZipSandbox:
except CommandExecutionError as exc:
raise RuntimeError(str(exc)) from exc
def upload_items(self, items: list[SandboxUploadItem], *, src_dir: str = ".") -> None:
"""Upload multiple files from sandbox to target URLs.
Args:
items: List of SandboxUploadItem(path, url)
src_dir: Base directory containing the files
"""
if not items:
return
src_dir = self._normalize_path(src_dir)
p = pipeline(self.vm)
for item in items:
rel = self._normalize_path(item.path)
src_path = posixpath.join(src_dir, rel) if src_dir not in ("", ".") else rel
p.add(
["curl", "-fsSL", "-X", "PUT", "-T", src_path, item.url],
error_message=f"Failed to upload {item.path}",
)
try:
p.execute(timeout=self._DEFAULT_TIMEOUT_SECONDS, raise_on_error=True)
except Exception as exc:
raise RuntimeError(str(exc)) from exc
# ========== Archive operations ==========
def zip(self, src: str = ".", *, include_base: bool = True) -> SandboxFile:

View File

@ -54,6 +54,22 @@ class AppAssetService:
def _lock(app_id: str):
return redis_client.lock(f"app_asset:lock:{app_id}", timeout=AppAssetService._LOCK_TIMEOUT_SECONDS)
@staticmethod
def get_assets_by_version(tenant_id: str, app_id: str, workflow_id: str | None = None) -> AppAssets:
"""Get asset tree by workflow_id (published) or draft if workflow_id is None."""
with Session(db.engine) as session:
version = workflow_id or AppAssets.VERSION_DRAFT
assets = (
session.query(AppAssets)
.filter(
AppAssets.tenant_id == tenant_id,
AppAssets.app_id == app_id,
AppAssets.version == version,
)
.first()
)
return assets or AppAssets(tenant_id=tenant_id, app_id=app_id, version=version)
@staticmethod
def get_draft_assets(tenant_id: str, app_id: str) -> list[AssetItem]:
with Session(db.engine) as session:

View File

@ -1,26 +1,47 @@
"""Service for exporting and importing App Bundles (DSL + assets).
Bundle structure:
bundle.zip/
{app_name}.yml # DSL file
manifest.json # Asset manifest (required for import)
{app_name}/ # Asset files
folder/file.txt
...
Import flow (sandbox-based):
1. prepare_import: Frontend gets upload URL, stores import_id in Redis
2. Frontend uploads zip to storage
3. confirm_import: Sandbox downloads zip, extracts, uploads assets via presigned URLs
Manifest format (schema_version 1.0):
- app_assets.tree: Full AppAssetFileTree for 100% ID restoration
- files: node_id -> path mapping for file nodes
- integrity.file_count: Basic validation
"""
from __future__ import annotations
import io
import json
import logging
import re
import zipfile
from dataclasses import dataclass
from uuid import uuid4
import yaml
from pydantic import ValidationError
from sqlalchemy.orm import Session
from core.app.entities.app_bundle_entities import (
BUNDLE_DSL_FILENAME_PATTERN,
BUNDLE_MAX_SIZE,
MANIFEST_FILENAME,
BundleExportResult,
BundleFormatError,
ZipSecurityError,
BundleManifest,
)
from core.app_assets.storage import AssetPath
from core.app_bundle import SourceZipExtractor
from core.zip_sandbox import SandboxDownloadItem, ZipSandbox
from core.app_assets.storage import AppAssetStorage, AssetPath, BundleImportZipPath
from core.zip_sandbox import SandboxDownloadItem, SandboxUploadItem, ZipSandbox
from extensions.ext_database import db
from models import Account, App
from extensions.ext_redis import redis_client
from models.account import Account
from models.model import App
from .app_asset_package_service import AppAssetPackageService
from .app_asset_service import AppAssetService
@ -28,6 +49,15 @@ from .app_dsl_service import AppDslService, Import
logger = logging.getLogger(__name__)
_IMPORT_REDIS_PREFIX = "app_bundle:import:"
_IMPORT_TTL_SECONDS = 3600 # 1 hour
@dataclass
class ImportPrepareResult:
import_id: str
upload_url: str
class AppBundleService:
@staticmethod
@ -38,14 +68,10 @@ class AppBundleService:
marked_name: str = "",
marked_comment: str = "",
):
"""
Publish App Bundle (workflow + assets).
Coordinates WorkflowService and AppAssetService publishing in a single transaction.
"""
"""Publish App Bundle (workflow + assets) in a single transaction."""
from models.workflow import Workflow
from services.workflow_service import WorkflowService
# 1. Publish workflow
workflow: Workflow = WorkflowService().publish_workflow(
session=session,
app_model=app_model,
@ -53,17 +79,16 @@ class AppBundleService:
marked_name=marked_name,
marked_comment=marked_comment,
)
# 2. Publish assets (bound to workflow_id)
AppAssetPackageService.publish(
session=session,
app_model=app_model,
account_id=account.id,
workflow_id=workflow.id,
)
return workflow
# ========== Export ==========
@staticmethod
def export_bundle(
*,
@ -73,14 +98,14 @@ class AppBundleService:
workflow_id: str | None = None,
expires_in: int = 10 * 60,
) -> BundleExportResult:
"""Export bundle and return a temporary download URL.
Uses sandbox VM to build the ZIP, avoiding memory pressure in API process.
"""
"""Export bundle with manifest.json and return a temporary download URL."""
tenant_id = app_model.tenant_id
app_id = app_model.id
safe_name = AppBundleService._sanitize_filename(app_model.name)
filename = f"{safe_name}.zip"
dsl_filename = f"{safe_name}.yml"
app_assets = AppAssetService.get_assets_by_version(tenant_id, app_id, workflow_id)
manifest = BundleManifest.from_tree(app_assets.asset_tree, dsl_filename)
export_id = uuid4().hex
export_path = AssetPath.bundle_export_zip(tenant_id, app_id, export_id)
@ -95,147 +120,170 @@ class AppBundleService:
with ZipSandbox(tenant_id=tenant_id, user_id=account_id, app_id="app-bundle-export") as zs:
zs.write_file(f"bundle_root/{safe_name}.yml", dsl_content.encode("utf-8"))
zs.write_file(f"bundle_root/{MANIFEST_FILENAME}", manifest.model_dump_json(indent=2).encode("utf-8"))
# Published assets: use stored source zip and unzip into <safe_name>/...
if workflow_id is not None:
source_zip_path = AssetPath.source_zip(tenant_id, app_id, workflow_id)
source_url = asset_storage.get_download_url(source_zip_path, expires_in)
zs.download_archive(source_url, path="tmp/source_assets.zip")
zs.unzip(archive_path="tmp/source_assets.zip", dest_dir=f"bundle_root/{safe_name}")
else:
# Draft assets: download individual files and place under <safe_name>/...
asset_items = AppAssetService.get_draft_assets(tenant_id, app_id)
asset_urls = asset_storage.get_download_urls(
[AssetPath.draft(tenant_id, app_id, a.asset_id) for a in asset_items], expires_in
)
zs.download_items(
[
SandboxDownloadItem(url=url, path=f"{safe_name}/{a.path}")
for a, url in zip(asset_items, asset_urls, strict=True)
],
dest_dir="bundle_root",
)
if asset_items:
asset_urls = asset_storage.get_download_urls(
[AssetPath.draft(tenant_id, app_id, a.asset_id) for a in asset_items], expires_in
)
zs.download_items(
[
SandboxDownloadItem(url=url, path=f"{safe_name}/{a.path}")
for a, url in zip(asset_items, asset_urls, strict=True)
],
dest_dir="bundle_root",
)
archive = zs.zip(src="bundle_root", include_base=False)
zs.upload(archive, upload_url)
download_url = asset_storage.get_download_url(export_path, expires_in)
return BundleExportResult(download_url=download_url, filename=filename)
return BundleExportResult(download_url=download_url, filename=f"{safe_name}.zip")
# ========== Import ==========
@staticmethod
def import_bundle(
def prepare_import(tenant_id: str, account_id: str) -> ImportPrepareResult:
"""Prepare import: generate import_id and upload URL."""
import_id = uuid4().hex
import_path = AssetPath.bundle_import_zip(tenant_id, import_id)
asset_storage = AppAssetService.get_storage()
upload_url = asset_storage.get_import_upload_url(import_path, _IMPORT_TTL_SECONDS)
redis_client.setex(
f"{_IMPORT_REDIS_PREFIX}{import_id}",
_IMPORT_TTL_SECONDS,
json.dumps({"tenant_id": tenant_id, "account_id": account_id}),
)
return ImportPrepareResult(import_id=import_id, upload_url=upload_url)
@staticmethod
def confirm_import(
import_id: str,
account: Account,
zip_bytes: bytes,
*,
name: str | None = None,
description: str | None = None,
icon_type: str | None = None,
icon: str | None = None,
icon_background: str | None = None,
) -> Import:
if len(zip_bytes) > BUNDLE_MAX_SIZE:
raise BundleFormatError(f"Bundle size exceeds limit: {BUNDLE_MAX_SIZE} bytes")
"""Confirm import: download zip in sandbox, extract, and upload assets."""
redis_key = f"{_IMPORT_REDIS_PREFIX}{import_id}"
redis_data = redis_client.get(redis_key)
if not redis_data:
raise BundleFormatError("Import session expired or not found")
dsl_content, assets_prefix = AppBundleService._extract_dsl_from_bundle(zip_bytes)
import_meta = json.loads(redis_data)
tenant_id: str = import_meta["tenant_id"]
with Session(db.engine) as session:
dsl_service = AppDslService(session)
import_result = dsl_service.import_app(
if tenant_id != account.current_tenant_id:
raise BundleFormatError("Import session tenant mismatch")
import_path = AssetPath.bundle_import_zip(tenant_id, import_id)
asset_storage = AppAssetService.get_storage()
try:
result = AppBundleService.import_bundle(
tenant_id=tenant_id,
account=account,
import_mode="yaml-content",
yaml_content=dsl_content,
import_path=import_path,
asset_storage=asset_storage,
name=name,
description=description,
icon_type=icon_type,
icon=icon,
icon_background=icon_background,
app_id=None,
)
session.commit()
finally:
redis_client.delete(redis_key)
asset_storage.delete_import_zip(import_path)
if import_result.app_id and assets_prefix:
AppBundleService._import_assets_from_bundle(
zip_bytes=zip_bytes,
assets_prefix=assets_prefix,
app_id=import_result.app_id,
account_id=account.id,
)
return result
@staticmethod
def import_bundle(
*,
tenant_id: str,
account: Account,
import_path: BundleImportZipPath,
asset_storage: AppAssetStorage,
name: str | None,
description: str | None,
icon_type: str | None,
icon: str | None,
icon_background: str | None,
) -> Import:
"""Execute import in sandbox."""
download_url = asset_storage.get_import_download_url(import_path, _IMPORT_TTL_SECONDS)
with ZipSandbox(tenant_id=tenant_id, user_id=account.id, app_id="app-bundle-import") as zs:
zs.download_archive(download_url, path="import.zip")
zs.unzip(archive_path="import.zip", dest_dir="bundle")
manifest_bytes = zs.read_file(f"bundle/{MANIFEST_FILENAME}")
try:
manifest = BundleManifest.model_validate_json(manifest_bytes)
except ValidationError as e:
raise BundleFormatError(f"Invalid manifest.json: {e}") from e
dsl_content = zs.read_file(f"bundle/{manifest.dsl_filename}").decode("utf-8")
with Session(db.engine) as session:
dsl_service = AppDslService(session)
import_result = dsl_service.import_app(
account=account,
import_mode="yaml-content",
yaml_content=dsl_content,
name=name,
description=description,
icon_type=icon_type,
icon=icon,
icon_background=icon_background,
app_id=None,
)
session.commit()
if not import_result.app_id:
return import_result
app_id = import_result.app_id
tree = manifest.app_assets.tree
upload_items: list[SandboxUploadItem] = []
for file_entry in manifest.files:
asset_path = AssetPath.draft(tenant_id, app_id, file_entry.node_id)
file_upload_url = asset_storage.get_upload_url(asset_path, _IMPORT_TTL_SECONDS)
src_path = f"{manifest.assets_prefix}/{file_entry.path}"
upload_items.append(SandboxUploadItem(path=src_path, url=file_upload_url))
if upload_items:
zs.upload_items(upload_items, src_dir="bundle")
# Tree sizes are already set from manifest; no need to update
app_model = db.session.query(App).filter(App.id == app_id).first()
if app_model:
AppAssetService.set_draft_assets(
app_model=app_model,
account_id=account.id,
new_tree=tree,
)
return import_result
@staticmethod
def _extract_dsl_from_bundle(zip_bytes: bytes) -> tuple[str, str | None]:
dsl_content: str | None = None
dsl_filename: str | None = None
with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zf:
for info in zf.infolist():
if info.is_dir():
continue
if BUNDLE_DSL_FILENAME_PATTERN.match(info.filename):
if dsl_content is not None:
raise BundleFormatError("Multiple DSL files found in bundle")
dsl_content = zf.read(info).decode("utf-8")
dsl_filename = info.filename
if dsl_content is None or dsl_filename is None:
raise BundleFormatError("No DSL file (*.yml or *.yaml) found in bundle root")
yaml.safe_load(dsl_content)
assets_prefix = dsl_filename.rsplit(".", 1)[0]
has_assets = AppBundleService._check_assets_prefix_exists(zip_bytes, assets_prefix)
return dsl_content, assets_prefix if has_assets else None
@staticmethod
def _check_assets_prefix_exists(zip_bytes: bytes, prefix: str) -> bool:
with zipfile.ZipFile(io.BytesIO(zip_bytes), "r") as zf:
for info in zf.infolist():
if info.filename.startswith(f"{prefix}/"):
return True
return False
@staticmethod
def _import_assets_from_bundle(
zip_bytes: bytes,
assets_prefix: str,
app_id: str,
account_id: str,
) -> None:
app_model = db.session.query(App).filter(App.id == app_id).first()
if not app_model:
logger.warning("App not found for asset import: %s", app_id)
return
asset_storage = AppAssetService.get_storage()
extractor = SourceZipExtractor(asset_storage)
try:
folders, files = extractor.extract_entries(
zip_bytes,
expected_prefix=f"{assets_prefix}/",
)
except ZipSecurityError as e:
logger.warning("Zip security error during asset import: %s", e)
return
if not folders and not files:
return
new_tree = extractor.build_tree_and_save(
folders=folders,
files=files,
tenant_id=app_model.tenant_id,
app_id=app_model.id,
)
AppAssetService.set_draft_assets(
app_model=app_model,
account_id=account_id,
new_tree=new_tree,
)
# ========== Helpers ==========
@staticmethod
def _sanitize_filename(name: str) -> str:
"""Sanitize app name for use as filename."""
safe = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name)
safe = safe.strip(". ")
return safe[:100] if safe else "app"

View File

@ -157,6 +157,41 @@ export const importDSLConfirm = ({ import_id }: { import_id: string }): Promise<
return post<DSLImportResponse>(`apps/imports/${import_id}/confirm`, { body: {} })
}
export type ImportBundlePrepareResponse = {
import_id: string
upload_url: string
}
export const prepareImportBundle = (): Promise<ImportBundlePrepareResponse> => {
return post<ImportBundlePrepareResponse>('apps/imports-bundle/prepare', { body: {} })
}
export const confirmImportBundle = ({
import_id,
name,
description,
icon_type,
icon,
icon_background,
}: {
import_id: string
name?: string
description?: string
icon_type?: string
icon?: string
icon_background?: string
}): Promise<DSLImportResponse> => {
return post<DSLImportResponse>(`apps/imports-bundle/${import_id}/confirm`, {
body: {
name,
description,
icon_type,
icon,
icon_background,
},
})
}
export const importAppBundle = async ({
file,
name,
@ -172,37 +207,27 @@ export const importAppBundle = async ({
icon?: string
icon_background?: string
}): Promise<DSLImportResponse> => {
const { API_PREFIX, CSRF_COOKIE_NAME, CSRF_HEADER_NAME } = await import('@/config')
const Cookies = (await import('js-cookie')).default
// Step 1: Prepare import and get upload URL
const { import_id, upload_url } = await prepareImportBundle()
const formData = new FormData()
formData.append('file', file)
if (name)
formData.append('name', name)
if (description)
formData.append('description', description)
if (icon_type)
formData.append('icon_type', icon_type)
if (icon)
formData.append('icon', icon)
if (icon_background)
formData.append('icon_background', icon_background)
const response = await fetch(`${API_PREFIX}/apps/imports-bundle`, {
method: 'POST',
credentials: 'include',
headers: {
[CSRF_HEADER_NAME]: Cookies.get(CSRF_COOKIE_NAME()) || '',
},
body: formData,
// Step 2: Upload file to presigned URL
const uploadResponse = await fetch(upload_url, {
method: 'PUT',
body: file,
})
if (!response.ok) {
const errorData = await response.json()
throw new Error(errorData.error || 'Import bundle failed')
}
if (!uploadResponse.ok)
throw new Error('Failed to upload bundle file')
return response.json()
// Step 3: Confirm import
return confirmImportBundle({
import_id,
name,
description,
icon_type,
icon,
icon_background,
})
}
export const switchApp = ({ appID, name, icon_type, icon, icon_background }: { appID: string, name: string, icon_type: AppIconType, icon: string, icon_background?: string | null }): Promise<{ new_app_id: string }> => {