diff --git a/api/core/sandbox/initializer/draft_app_assets_initializer.py b/api/core/sandbox/initializer/draft_app_assets_initializer.py index d6fa945dd7..c942394186 100644 --- a/api/core/sandbox/initializer/draft_app_assets_initializer.py +++ b/api/core/sandbox/initializer/draft_app_assets_initializer.py @@ -47,7 +47,7 @@ class DraftAppAssetsInitializer(AsyncSandboxInitializer): items = [AssetDownloadItem(path=tree.get_path(node.id).lstrip("/"), url=url) for node, url in zip(nodes, urls)] script = AssetDownloadService.build_download_script(items, AppAssets.PATH) pipeline(vm).add( - ["sh", "-lc", script], + ["sh", "-c", script], error_message="Failed to download draft assets", ).execute(timeout=DRAFT_ASSETS_DOWNLOAD_TIMEOUT, raise_on_error=True) diff --git a/api/core/sandbox/services/asset_download_service.py b/api/core/sandbox/services/asset_download_service.py index 372f767e13..e79e866ea9 100644 --- a/api/core/sandbox/services/asset_download_service.py +++ b/api/core/sandbox/services/asset_download_service.py @@ -48,12 +48,10 @@ def _render_download_script(root_path: str, download_commands: str) -> str: wait if [ -s "${{fail_log}}" ]; then - echo 'Failed downloads:' >&2 - cat "${{fail_log}}" >&2 + mv "${{fail_log}}" "${{download_root}}/DOWNLOAD_FAILURES.txt" + else rm -f "${{fail_log}}" - exit 1 fi - rm -f "${{fail_log}}" """ return textwrap.dedent(script).strip() diff --git a/api/core/virtual_environment/__base/helpers.py b/api/core/virtual_environment/__base/helpers.py index 92cf2f87b2..0b36485560 100644 --- a/api/core/virtual_environment/__base/helpers.py +++ b/api/core/virtual_environment/__base/helpers.py @@ -185,7 +185,7 @@ class CommandPipeline: return [] script = self._build_script(fail_fast=raise_on_error) - batch_cmd = ["sh", "-lc", script] + batch_cmd = ["sh", "-c", script] if self.connection is not None: batch_result = try_execute(self.env, batch_cmd, timeout=timeout, cwd=self.cwd, connection=self.connection) diff --git a/api/extensions/storage/aws_s3_storage.py b/api/extensions/storage/aws_s3_storage.py index 95907c1c28..3fdfde2372 100644 --- a/api/extensions/storage/aws_s3_storage.py +++ b/api/extensions/storage/aws_s3_storage.py @@ -94,6 +94,16 @@ class AwsS3Storage(BaseStorage): ) return url + def get_download_urls(self, filenames: list[str], expires_in: int = 3600) -> list[str]: + return [ + self.client.generate_presigned_url( + ClientMethod="get_object", + Params={"Bucket": self.bucket_name, "Key": filename}, + ExpiresIn=expires_in, + ) + for filename in filenames + ] + def get_upload_url(self, filename: str, expires_in: int = 3600) -> str: url: str = self.client.generate_presigned_url( ClientMethod="put_object", diff --git a/api/extensions/storage/base_storage.py b/api/extensions/storage/base_storage.py index 59134044bb..a8f81c606b 100644 --- a/api/extensions/storage/base_storage.py +++ b/api/extensions/storage/base_storage.py @@ -58,6 +58,12 @@ class BaseStorage(ABC): """ raise NotImplementedError("This storage backend doesn't support pre-signed URLs") + def get_download_urls(self, filenames: list[str], expires_in: int = 3600) -> list[str]: + """ + Generate pre-signed URLs for downloading multiple files. + """ + raise NotImplementedError("This storage backend doesn't support pre-signed URLs") + def get_upload_url(self, filename: str, expires_in: int = 3600) -> str: """ Generate a pre-signed URL for uploading a file. diff --git a/api/extensions/storage/cached_presign_storage.py b/api/extensions/storage/cached_presign_storage.py index b15c5cba87..d636b4f117 100644 --- a/api/extensions/storage/cached_presign_storage.py +++ b/api/extensions/storage/cached_presign_storage.py @@ -1,15 +1,15 @@ """Storage wrapper that caches presigned download URLs.""" import logging -from collections.abc import Generator from typing import Any from extensions.storage.base_storage import BaseStorage +from extensions.storage.storage_wrapper import StorageWrapper logger = logging.getLogger(__name__) -class CachedPresignStorage(BaseStorage): +class CachedPresignStorage(StorageWrapper): """Storage wrapper that caches presigned download URLs. Wraps a storage with presign capability and caches the generated URLs @@ -33,36 +33,14 @@ class CachedPresignStorage(BaseStorage): redis_client: Any, cache_key_prefix: str = "presign_cache", ): - super().__init__() - self._storage = storage + super().__init__(storage) self._redis = redis_client self._cache_key_prefix = cache_key_prefix - def save(self, filename: str, data: bytes): - self._storage.save(filename, data) - - def load_once(self, filename: str) -> bytes: - return self._storage.load_once(filename) - - def load_stream(self, filename: str) -> Generator: - return self._storage.load_stream(filename) - - def download(self, filename: str, target_filepath: str): - self._storage.download(filename, target_filepath) - - def exists(self, filename: str) -> bool: - return self._storage.exists(filename) - def delete(self, filename: str): - self._storage.delete(filename) + super().delete(filename) self.invalidate([filename]) - def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]: - return self._storage.scan(path, files=files, directories=directories) - - def get_upload_url(self, filename: str, expires_in: int = 3600) -> str: - return self._storage.get_upload_url(filename, expires_in) - def get_download_url(self, filename: str, expires_in: int = 3600) -> str: """Get a presigned download URL, using cache when available. @@ -79,7 +57,7 @@ class CachedPresignStorage(BaseStorage): if cached: return cached - url = self._storage.get_download_url(filename, expires_in) + url = super().get_download_url(filename, expires_in) self._set_cached(cache_key, url, expires_in) return url @@ -104,16 +82,29 @@ class CachedPresignStorage(BaseStorage): cache_keys = [self._cache_key(f) for f in filenames] cached_values = self._get_cached_batch(cache_keys) - results: list[str] = [] - for filename, cache_key, cached in zip(filenames, cache_keys, cached_values): - if cached: - results.append(cached) - else: - url = self._storage.get_download_url(filename, expires_in) - self._set_cached(cache_key, url, expires_in) - results.append(url) + # Build results list, tracking which indices need fetching + results: list[str | None] = list(cached_values) + uncached_indices: list[int] = [] + uncached_filenames: list[str] = [] - return results + for i, (filename, cached) in enumerate(zip(filenames, cached_values)): + if not cached: + uncached_indices.append(i) + uncached_filenames.append(filename) + + # Batch fetch uncached URLs from storage + if uncached_filenames: + uncached_urls = [super().get_download_url(f, expires_in) for f in uncached_filenames] + + # Fill results at correct positions + for idx, url in zip(uncached_indices, uncached_urls): + results[idx] = url + + # Batch set cache + uncached_cache_keys = [cache_keys[i] for i in uncached_indices] + self._set_cached_batch(uncached_cache_keys, uncached_urls, expires_in) + + return results # type: ignore[return-value] def invalidate(self, filenames: list[str]) -> None: """Invalidate cached URLs for given filenames. @@ -170,3 +161,16 @@ class CachedPresignStorage(BaseStorage): self._redis.setex(cache_key, ttl, url) except Exception: logger.warning("Failed to write presign cache", exc_info=True) + + def _set_cached_batch(self, cache_keys: list[str], urls: list[str], expires_in: int) -> None: + """Store multiple URLs in cache with computed TTL using pipeline.""" + if not cache_keys: + return + ttl = self._compute_ttl(expires_in) + try: + pipe = self._redis.pipeline() + for cache_key, url in zip(cache_keys, urls): + pipe.setex(cache_key, ttl, url) + pipe.execute() + except Exception: + logger.warning("Failed to write presign cache batch", exc_info=True) diff --git a/api/extensions/storage/file_presign_storage.py b/api/extensions/storage/file_presign_storage.py index 67bf300d66..ae7dee3033 100644 --- a/api/extensions/storage/file_presign_storage.py +++ b/api/extensions/storage/file_presign_storage.py @@ -1,60 +1,48 @@ +"""Storage wrapper that provides presigned URL support with fallback to signed proxy URLs.""" + import base64 import hashlib import hmac import os import time import urllib.parse -from collections.abc import Generator from configs import dify_config -from extensions.storage.base_storage import BaseStorage +from extensions.storage.storage_wrapper import StorageWrapper -class FilePresignStorage(BaseStorage): +class FilePresignStorage(StorageWrapper): + """Storage wrapper that provides presigned URL support. + + If the wrapped storage supports presigned URLs, delegates to it. + Otherwise, generates signed proxy URLs for download. + """ + SIGNATURE_PREFIX = "storage-download" - def __init__(self, storage: BaseStorage): - super().__init__() - self._storage = storage - - def save(self, filename: str, data: bytes): - self._storage.save(filename, data) - - def load_once(self, filename: str) -> bytes: - return self._storage.load_once(filename) - - def load_stream(self, filename: str) -> Generator: - return self._storage.load_stream(filename) - - def download(self, filename: str, target_filepath: str): - self._storage.download(filename, target_filepath) - - def exists(self, filename: str) -> bool: - return self._storage.exists(filename) - - def delete(self, filename: str): - self._storage.delete(filename) - - def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]: - return self._storage.scan(path, files=files, directories=directories) - def get_download_url(self, filename: str, expires_in: int = 3600) -> str: try: - return self._storage.get_download_url(filename, expires_in) + return super().get_download_url(filename, expires_in) except NotImplementedError: - return self._generate_signed_proxy_url(filename) + return self._generate_signed_proxy_url(filename, expires_in) def get_upload_url(self, filename: str, expires_in: int = 3600) -> str: try: - return self._storage.get_upload_url(filename, expires_in) + return super().get_upload_url(filename, expires_in) except NotImplementedError: return self._generate_signed_upload_url(filename) + def get_download_urls(self, filenames: list[str], expires_in: int = 3600) -> list[str]: + try: + return super().get_download_urls(filenames, expires_in) + except NotImplementedError: + return [self._generate_signed_proxy_url(filename, expires_in) for filename in filenames] + def _generate_signed_upload_url(self, filename: str) -> str: # TODO: Implement this raise NotImplementedError("This storage backend doesn't support pre-signed URLs") - def _generate_signed_proxy_url(self, filename: str) -> str: + def _generate_signed_proxy_url(self, filename: str, expires_in: int = 3600) -> str: base_url = dify_config.FILES_URL encoded_filename = urllib.parse.quote(filename, safe="") url = f"{base_url}/files/storage/{encoded_filename}/download" diff --git a/api/extensions/storage/silent_storage.py b/api/extensions/storage/silent_storage.py new file mode 100644 index 0000000000..e8a9007aa5 --- /dev/null +++ b/api/extensions/storage/silent_storage.py @@ -0,0 +1,60 @@ +"""Storage wrapper that returns empty values instead of raising on get operations.""" + +import logging +from collections.abc import Generator +from typing import Any + +from extensions.storage.storage_wrapper import StorageWrapper + +logger = logging.getLogger(__name__) + + +class SilentStorage(StorageWrapper): + """Storage wrapper that silently returns empty values when get operations fail. + + Wraps any storage and catches exceptions on read operations (load_once, load_stream, + download, exists), returning empty/default values instead of raising. + + Example: + silent_storage = SilentGetStorage( + storage=CachedPresignStorage(...), + ) + content = silent_storage.load_once("path/to/file.txt") # Returns b"" if not found + """ + + def load_once(self, filename: str) -> bytes: + """Load file content, returning empty bytes if not found.""" + try: + return super().load_once(filename) + except FileNotFoundError: + logger.debug("File not found: %s", filename) + return b"File Not Found" + + def load_once_or_none(self, filename: str) -> bytes | None: + """Load file content, returning None if not found.""" + try: + return super().load_once(filename) + except FileNotFoundError: + logger.debug("File not found: %s", filename) + return b"File Not Found" + + def load_stream(self, filename: str) -> Generator[bytes, None, None]: + """Load file as stream, yielding nothing if not found.""" + try: + yield from super().load_stream(filename) + except FileNotFoundError: + logger.debug("File not found: %s", filename) + yield b"File Not Found" + + def download(self, filename: str, target_filepath: str) -> bool: + """Download file to target, returning False if not found.""" + try: + super().download(filename, target_filepath) + return True + except FileNotFoundError: + logger.debug("File not found or download failed: %s", filename) + return False + + def __getattr__(self, name: str) -> Any: + """Delegate any other attributes to the wrapped storage.""" + return getattr(self._storage, name) diff --git a/api/extensions/storage/storage_wrapper.py b/api/extensions/storage/storage_wrapper.py new file mode 100644 index 0000000000..d3ed8ea317 --- /dev/null +++ b/api/extensions/storage/storage_wrapper.py @@ -0,0 +1,54 @@ +"""Base class for storage wrappers that delegate to an inner storage.""" + +from collections.abc import Generator + +from extensions.storage.base_storage import BaseStorage + + +class StorageWrapper(BaseStorage): + """Base class for storage wrappers using the decorator pattern. + + Forwards all BaseStorage methods to the wrapped storage by default. + Subclasses can override specific methods to customize behavior. + + Example: + class MyCustomStorage(StorageWrapper): + def save(self, filename: str, data: bytes): + # Custom logic before save + super().save(filename, data) + # Custom logic after save + """ + + def __init__(self, storage: BaseStorage): + super().__init__() + self._storage = storage + + def save(self, filename: str, data: bytes): + self._storage.save(filename, data) + + def load_once(self, filename: str) -> bytes: + return self._storage.load_once(filename) + + def load_stream(self, filename: str) -> Generator: + return self._storage.load_stream(filename) + + def download(self, filename: str, target_filepath: str): + self._storage.download(filename, target_filepath) + + def exists(self, filename: str) -> bool: + return self._storage.exists(filename) + + def delete(self, filename: str): + self._storage.delete(filename) + + def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]: + return self._storage.scan(path, files=files, directories=directories) + + def get_download_url(self, filename: str, expires_in: int = 3600) -> str: + return self._storage.get_download_url(filename, expires_in) + + def get_download_urls(self, filenames: list[str], expires_in: int = 3600) -> list[str]: + return self._storage.get_download_urls(filenames, expires_in) + + def get_upload_url(self, filename: str, expires_in: int = 3600) -> str: + return self._storage.get_upload_url(filename, expires_in) diff --git a/api/services/app_asset_service.py b/api/services/app_asset_service.py index 93548535cb..e16b8082bb 100644 --- a/api/services/app_asset_service.py +++ b/api/services/app_asset_service.py @@ -16,12 +16,15 @@ from core.app_assets.builder import AssetBuildPipeline, BuildContext from core.app_assets.builder.file_builder import FileBuilder from core.app_assets.builder.skill_builder import SkillBuilder from core.app_assets.converters import tree_to_asset_items +from core.app_assets.entities.assets import AssetItem from core.app_assets.packager import AssetZipPackager from core.app_assets.paths import AssetPaths from extensions.ext_database import db from extensions.ext_redis import redis_client +from extensions.storage.base_storage import BaseStorage from extensions.storage.cached_presign_storage import CachedPresignStorage from extensions.storage.file_presign_storage import FilePresignStorage +from extensions.storage.silent_storage import SilentStorage from models.app_asset import AppAssets from models.model import App @@ -45,21 +48,17 @@ class AppAssetService: return redis_client.lock(f"app_asset:lock:{app_id}", timeout=AppAssetService._LOCK_TIMEOUT_SECONDS) @staticmethod - def assets_storage() -> CachedPresignStorage: + def assets_storage() -> BaseStorage: from extensions.ext_storage import storage - return CachedPresignStorage( - storage=FilePresignStorage(storage.storage_runner), - redis_client=redis_client, - cache_key_prefix=AppAssetService._DRAFT_CACHE_KEY_PREFIX, + return SilentStorage( + CachedPresignStorage( + storage=FilePresignStorage(storage.storage_runner), + redis_client=redis_client, + cache_key_prefix=AppAssetService._DRAFT_CACHE_KEY_PREFIX, + ) ) - @staticmethod - def _draft_storage_key_for_node(tenant_id: str, app_id: str, assets_id: str, node: AppAssetNode) -> str: - if node.extension == "md": - return AssetPaths.build_resolved_file(tenant_id, app_id, assets_id, node.id) - return AssetPaths.draft_file(tenant_id, app_id, node.id) - @staticmethod def get_or_create_assets(session: Session, app_model: App, account_id: str) -> AppAssets: assets = ( @@ -347,7 +346,7 @@ class AppAssetService: tree = assets.asset_tree ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=assets.id) - built_assets = AssetBuildPipeline( + built_assets: list[AssetItem] = AssetBuildPipeline( [SkillBuilder(storage=AppAssetService.assets_storage()), FileBuilder()] ).build_all(tree, ctx) diff --git a/api/services/app_bundle_service.py b/api/services/app_bundle_service.py index c0956ccc9e..f8740f0c8f 100644 --- a/api/services/app_bundle_service.py +++ b/api/services/app_bundle_service.py @@ -158,7 +158,7 @@ class AppBundleService: return None items = tree_to_asset_items(tree, app_model.tenant_id, app_model.id) - packager = AssetZipPackager(storage) + packager = AssetZipPackager(AppAssetService.assets_storage()) return packager.package(items) @staticmethod diff --git a/api/tests/unit_tests/extensions/storage/test_cached_presign_storage.py b/api/tests/unit_tests/extensions/storage/test_cached_presign_storage.py index 280f8925ab..d06b801209 100644 --- a/api/tests/unit_tests/extensions/storage/test_cached_presign_storage.py +++ b/api/tests/unit_tests/extensions/storage/test_cached_presign_storage.py @@ -63,7 +63,9 @@ class TestCachedPresignStorage: assert result == ["https://cached1.com", "https://new.com", "https://cached2.com"] mock_storage.get_download_url.assert_called_once_with("file2.txt", 3600) - mock_redis.setex.assert_called_once() + # Verify pipeline was used for batch cache write + mock_redis.pipeline.assert_called_once() + mock_redis.pipeline().execute.assert_called_once() def test_get_download_urls_empty_list(self, cached_storage, mock_storage, mock_redis): """Test batch URL retrieval with empty list."""