refactor(storage): unified storage cache layer and preasign interface

- Updated storage wrappers to utilize a new base class, StorageWrapper, for better delegation of methods.
- Introduced SilentStorage to handle read operations gracefully by returning empty values instead of raising exceptions.
- Enhanced CachedPresignStorage to support batch caching of download URLs, improving performance.
- Refactored FilePresignStorage to support both presigned URLs and signed proxy URLs for downloads.
- Updated AppAssetService to utilize the new storage structure, ensuring consistent asset management.
This commit is contained in:
Harry 2026-01-23 17:01:10 +08:00
parent 3165f3adbe
commit 248fa38c34
12 changed files with 209 additions and 88 deletions

View File

@ -47,7 +47,7 @@ class DraftAppAssetsInitializer(AsyncSandboxInitializer):
items = [AssetDownloadItem(path=tree.get_path(node.id).lstrip("/"), url=url) for node, url in zip(nodes, urls)]
script = AssetDownloadService.build_download_script(items, AppAssets.PATH)
pipeline(vm).add(
["sh", "-lc", script],
["sh", "-c", script],
error_message="Failed to download draft assets",
).execute(timeout=DRAFT_ASSETS_DOWNLOAD_TIMEOUT, raise_on_error=True)

View File

@ -48,12 +48,10 @@ def _render_download_script(root_path: str, download_commands: str) -> str:
wait
if [ -s "${{fail_log}}" ]; then
echo 'Failed downloads:' >&2
cat "${{fail_log}}" >&2
mv "${{fail_log}}" "${{download_root}}/DOWNLOAD_FAILURES.txt"
else
rm -f "${{fail_log}}"
exit 1
fi
rm -f "${{fail_log}}"
"""
return textwrap.dedent(script).strip()

View File

@ -185,7 +185,7 @@ class CommandPipeline:
return []
script = self._build_script(fail_fast=raise_on_error)
batch_cmd = ["sh", "-lc", script]
batch_cmd = ["sh", "-c", script]
if self.connection is not None:
batch_result = try_execute(self.env, batch_cmd, timeout=timeout, cwd=self.cwd, connection=self.connection)

View File

@ -94,6 +94,16 @@ class AwsS3Storage(BaseStorage):
)
return url
def get_download_urls(self, filenames: list[str], expires_in: int = 3600) -> list[str]:
return [
self.client.generate_presigned_url(
ClientMethod="get_object",
Params={"Bucket": self.bucket_name, "Key": filename},
ExpiresIn=expires_in,
)
for filename in filenames
]
def get_upload_url(self, filename: str, expires_in: int = 3600) -> str:
url: str = self.client.generate_presigned_url(
ClientMethod="put_object",

View File

@ -58,6 +58,12 @@ class BaseStorage(ABC):
"""
raise NotImplementedError("This storage backend doesn't support pre-signed URLs")
def get_download_urls(self, filenames: list[str], expires_in: int = 3600) -> list[str]:
"""
Generate pre-signed URLs for downloading multiple files.
"""
raise NotImplementedError("This storage backend doesn't support pre-signed URLs")
def get_upload_url(self, filename: str, expires_in: int = 3600) -> str:
"""
Generate a pre-signed URL for uploading a file.

View File

@ -1,15 +1,15 @@
"""Storage wrapper that caches presigned download URLs."""
import logging
from collections.abc import Generator
from typing import Any
from extensions.storage.base_storage import BaseStorage
from extensions.storage.storage_wrapper import StorageWrapper
logger = logging.getLogger(__name__)
class CachedPresignStorage(BaseStorage):
class CachedPresignStorage(StorageWrapper):
"""Storage wrapper that caches presigned download URLs.
Wraps a storage with presign capability and caches the generated URLs
@ -33,36 +33,14 @@ class CachedPresignStorage(BaseStorage):
redis_client: Any,
cache_key_prefix: str = "presign_cache",
):
super().__init__()
self._storage = storage
super().__init__(storage)
self._redis = redis_client
self._cache_key_prefix = cache_key_prefix
def save(self, filename: str, data: bytes):
self._storage.save(filename, data)
def load_once(self, filename: str) -> bytes:
return self._storage.load_once(filename)
def load_stream(self, filename: str) -> Generator:
return self._storage.load_stream(filename)
def download(self, filename: str, target_filepath: str):
self._storage.download(filename, target_filepath)
def exists(self, filename: str) -> bool:
return self._storage.exists(filename)
def delete(self, filename: str):
self._storage.delete(filename)
super().delete(filename)
self.invalidate([filename])
def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]:
return self._storage.scan(path, files=files, directories=directories)
def get_upload_url(self, filename: str, expires_in: int = 3600) -> str:
return self._storage.get_upload_url(filename, expires_in)
def get_download_url(self, filename: str, expires_in: int = 3600) -> str:
"""Get a presigned download URL, using cache when available.
@ -79,7 +57,7 @@ class CachedPresignStorage(BaseStorage):
if cached:
return cached
url = self._storage.get_download_url(filename, expires_in)
url = super().get_download_url(filename, expires_in)
self._set_cached(cache_key, url, expires_in)
return url
@ -104,16 +82,29 @@ class CachedPresignStorage(BaseStorage):
cache_keys = [self._cache_key(f) for f in filenames]
cached_values = self._get_cached_batch(cache_keys)
results: list[str] = []
for filename, cache_key, cached in zip(filenames, cache_keys, cached_values):
if cached:
results.append(cached)
else:
url = self._storage.get_download_url(filename, expires_in)
self._set_cached(cache_key, url, expires_in)
results.append(url)
# Build results list, tracking which indices need fetching
results: list[str | None] = list(cached_values)
uncached_indices: list[int] = []
uncached_filenames: list[str] = []
return results
for i, (filename, cached) in enumerate(zip(filenames, cached_values)):
if not cached:
uncached_indices.append(i)
uncached_filenames.append(filename)
# Batch fetch uncached URLs from storage
if uncached_filenames:
uncached_urls = [super().get_download_url(f, expires_in) for f in uncached_filenames]
# Fill results at correct positions
for idx, url in zip(uncached_indices, uncached_urls):
results[idx] = url
# Batch set cache
uncached_cache_keys = [cache_keys[i] for i in uncached_indices]
self._set_cached_batch(uncached_cache_keys, uncached_urls, expires_in)
return results # type: ignore[return-value]
def invalidate(self, filenames: list[str]) -> None:
"""Invalidate cached URLs for given filenames.
@ -170,3 +161,16 @@ class CachedPresignStorage(BaseStorage):
self._redis.setex(cache_key, ttl, url)
except Exception:
logger.warning("Failed to write presign cache", exc_info=True)
def _set_cached_batch(self, cache_keys: list[str], urls: list[str], expires_in: int) -> None:
"""Store multiple URLs in cache with computed TTL using pipeline."""
if not cache_keys:
return
ttl = self._compute_ttl(expires_in)
try:
pipe = self._redis.pipeline()
for cache_key, url in zip(cache_keys, urls):
pipe.setex(cache_key, ttl, url)
pipe.execute()
except Exception:
logger.warning("Failed to write presign cache batch", exc_info=True)

View File

@ -1,60 +1,48 @@
"""Storage wrapper that provides presigned URL support with fallback to signed proxy URLs."""
import base64
import hashlib
import hmac
import os
import time
import urllib.parse
from collections.abc import Generator
from configs import dify_config
from extensions.storage.base_storage import BaseStorage
from extensions.storage.storage_wrapper import StorageWrapper
class FilePresignStorage(BaseStorage):
class FilePresignStorage(StorageWrapper):
"""Storage wrapper that provides presigned URL support.
If the wrapped storage supports presigned URLs, delegates to it.
Otherwise, generates signed proxy URLs for download.
"""
SIGNATURE_PREFIX = "storage-download"
def __init__(self, storage: BaseStorage):
super().__init__()
self._storage = storage
def save(self, filename: str, data: bytes):
self._storage.save(filename, data)
def load_once(self, filename: str) -> bytes:
return self._storage.load_once(filename)
def load_stream(self, filename: str) -> Generator:
return self._storage.load_stream(filename)
def download(self, filename: str, target_filepath: str):
self._storage.download(filename, target_filepath)
def exists(self, filename: str) -> bool:
return self._storage.exists(filename)
def delete(self, filename: str):
self._storage.delete(filename)
def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]:
return self._storage.scan(path, files=files, directories=directories)
def get_download_url(self, filename: str, expires_in: int = 3600) -> str:
try:
return self._storage.get_download_url(filename, expires_in)
return super().get_download_url(filename, expires_in)
except NotImplementedError:
return self._generate_signed_proxy_url(filename)
return self._generate_signed_proxy_url(filename, expires_in)
def get_upload_url(self, filename: str, expires_in: int = 3600) -> str:
try:
return self._storage.get_upload_url(filename, expires_in)
return super().get_upload_url(filename, expires_in)
except NotImplementedError:
return self._generate_signed_upload_url(filename)
def get_download_urls(self, filenames: list[str], expires_in: int = 3600) -> list[str]:
try:
return super().get_download_urls(filenames, expires_in)
except NotImplementedError:
return [self._generate_signed_proxy_url(filename, expires_in) for filename in filenames]
def _generate_signed_upload_url(self, filename: str) -> str:
# TODO: Implement this
raise NotImplementedError("This storage backend doesn't support pre-signed URLs")
def _generate_signed_proxy_url(self, filename: str) -> str:
def _generate_signed_proxy_url(self, filename: str, expires_in: int = 3600) -> str:
base_url = dify_config.FILES_URL
encoded_filename = urllib.parse.quote(filename, safe="")
url = f"{base_url}/files/storage/{encoded_filename}/download"

View File

@ -0,0 +1,60 @@
"""Storage wrapper that returns empty values instead of raising on get operations."""
import logging
from collections.abc import Generator
from typing import Any
from extensions.storage.storage_wrapper import StorageWrapper
logger = logging.getLogger(__name__)
class SilentStorage(StorageWrapper):
"""Storage wrapper that silently returns empty values when get operations fail.
Wraps any storage and catches exceptions on read operations (load_once, load_stream,
download, exists), returning empty/default values instead of raising.
Example:
silent_storage = SilentGetStorage(
storage=CachedPresignStorage(...),
)
content = silent_storage.load_once("path/to/file.txt") # Returns b"" if not found
"""
def load_once(self, filename: str) -> bytes:
"""Load file content, returning empty bytes if not found."""
try:
return super().load_once(filename)
except FileNotFoundError:
logger.debug("File not found: %s", filename)
return b"File Not Found"
def load_once_or_none(self, filename: str) -> bytes | None:
"""Load file content, returning None if not found."""
try:
return super().load_once(filename)
except FileNotFoundError:
logger.debug("File not found: %s", filename)
return b"File Not Found"
def load_stream(self, filename: str) -> Generator[bytes, None, None]:
"""Load file as stream, yielding nothing if not found."""
try:
yield from super().load_stream(filename)
except FileNotFoundError:
logger.debug("File not found: %s", filename)
yield b"File Not Found"
def download(self, filename: str, target_filepath: str) -> bool:
"""Download file to target, returning False if not found."""
try:
super().download(filename, target_filepath)
return True
except FileNotFoundError:
logger.debug("File not found or download failed: %s", filename)
return False
def __getattr__(self, name: str) -> Any:
"""Delegate any other attributes to the wrapped storage."""
return getattr(self._storage, name)

View File

@ -0,0 +1,54 @@
"""Base class for storage wrappers that delegate to an inner storage."""
from collections.abc import Generator
from extensions.storage.base_storage import BaseStorage
class StorageWrapper(BaseStorage):
"""Base class for storage wrappers using the decorator pattern.
Forwards all BaseStorage methods to the wrapped storage by default.
Subclasses can override specific methods to customize behavior.
Example:
class MyCustomStorage(StorageWrapper):
def save(self, filename: str, data: bytes):
# Custom logic before save
super().save(filename, data)
# Custom logic after save
"""
def __init__(self, storage: BaseStorage):
super().__init__()
self._storage = storage
def save(self, filename: str, data: bytes):
self._storage.save(filename, data)
def load_once(self, filename: str) -> bytes:
return self._storage.load_once(filename)
def load_stream(self, filename: str) -> Generator:
return self._storage.load_stream(filename)
def download(self, filename: str, target_filepath: str):
self._storage.download(filename, target_filepath)
def exists(self, filename: str) -> bool:
return self._storage.exists(filename)
def delete(self, filename: str):
self._storage.delete(filename)
def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]:
return self._storage.scan(path, files=files, directories=directories)
def get_download_url(self, filename: str, expires_in: int = 3600) -> str:
return self._storage.get_download_url(filename, expires_in)
def get_download_urls(self, filenames: list[str], expires_in: int = 3600) -> list[str]:
return self._storage.get_download_urls(filenames, expires_in)
def get_upload_url(self, filename: str, expires_in: int = 3600) -> str:
return self._storage.get_upload_url(filename, expires_in)

View File

@ -16,12 +16,15 @@ from core.app_assets.builder import AssetBuildPipeline, BuildContext
from core.app_assets.builder.file_builder import FileBuilder
from core.app_assets.builder.skill_builder import SkillBuilder
from core.app_assets.converters import tree_to_asset_items
from core.app_assets.entities.assets import AssetItem
from core.app_assets.packager import AssetZipPackager
from core.app_assets.paths import AssetPaths
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.storage.base_storage import BaseStorage
from extensions.storage.cached_presign_storage import CachedPresignStorage
from extensions.storage.file_presign_storage import FilePresignStorage
from extensions.storage.silent_storage import SilentStorage
from models.app_asset import AppAssets
from models.model import App
@ -45,21 +48,17 @@ class AppAssetService:
return redis_client.lock(f"app_asset:lock:{app_id}", timeout=AppAssetService._LOCK_TIMEOUT_SECONDS)
@staticmethod
def assets_storage() -> CachedPresignStorage:
def assets_storage() -> BaseStorage:
from extensions.ext_storage import storage
return CachedPresignStorage(
storage=FilePresignStorage(storage.storage_runner),
redis_client=redis_client,
cache_key_prefix=AppAssetService._DRAFT_CACHE_KEY_PREFIX,
return SilentStorage(
CachedPresignStorage(
storage=FilePresignStorage(storage.storage_runner),
redis_client=redis_client,
cache_key_prefix=AppAssetService._DRAFT_CACHE_KEY_PREFIX,
)
)
@staticmethod
def _draft_storage_key_for_node(tenant_id: str, app_id: str, assets_id: str, node: AppAssetNode) -> str:
if node.extension == "md":
return AssetPaths.build_resolved_file(tenant_id, app_id, assets_id, node.id)
return AssetPaths.draft_file(tenant_id, app_id, node.id)
@staticmethod
def get_or_create_assets(session: Session, app_model: App, account_id: str) -> AppAssets:
assets = (
@ -347,7 +346,7 @@ class AppAssetService:
tree = assets.asset_tree
ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=assets.id)
built_assets = AssetBuildPipeline(
built_assets: list[AssetItem] = AssetBuildPipeline(
[SkillBuilder(storage=AppAssetService.assets_storage()), FileBuilder()]
).build_all(tree, ctx)

View File

@ -158,7 +158,7 @@ class AppBundleService:
return None
items = tree_to_asset_items(tree, app_model.tenant_id, app_model.id)
packager = AssetZipPackager(storage)
packager = AssetZipPackager(AppAssetService.assets_storage())
return packager.package(items)
@staticmethod

View File

@ -63,7 +63,9 @@ class TestCachedPresignStorage:
assert result == ["https://cached1.com", "https://new.com", "https://cached2.com"]
mock_storage.get_download_url.assert_called_once_with("file2.txt", 3600)
mock_redis.setex.assert_called_once()
# Verify pipeline was used for batch cache write
mock_redis.pipeline.assert_called_once()
mock_redis.pipeline().execute.assert_called_once()
def test_get_download_urls_empty_list(self, cached_storage, mock_storage, mock_redis):
"""Test batch URL retrieval with empty list."""