Merge branch 'main' into chore/bump-pyrefly-0.57.0

This commit is contained in:
Asuka Minato 2026-03-24 11:03:25 +09:00 committed by GitHub
commit 8e0858726a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
714 changed files with 43368 additions and 13514 deletions

View File

@ -94,11 +94,6 @@ jobs:
find . -name "*.py" -type f -exec sed -i.bak -E 's/"([^"]+)" \| None/Optional["\1"]/g; s/'"'"'([^'"'"']+)'"'"' \| None/Optional['"'"'\1'"'"']/g' {} \;
find . -name "*.py.bak" -type f -delete
# mdformat breaks YAML front matter in markdown files. Add --exclude for directories containing YAML front matter.
- name: mdformat
run: |
uvx --python 3.13 mdformat . --exclude ".agents/skills/**"
- name: Setup web environment
if: steps.web-changes.outputs.any_changed == 'true'
uses: ./.github/actions/setup-web

View File

@ -120,7 +120,7 @@ jobs:
- name: Run Claude Code for Translation Sync
if: steps.detect_changes.outputs.CHANGED_FILES != ''
uses: anthropics/claude-code-action@df37d2f0760a4b5683a6e617c9325bc1a36443f6 # v1.0.75
uses: anthropics/claude-code-action@6062f3709600659be5e47fcddf2cf76993c235c2 # v1.0.76
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
github_token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -97,3 +97,8 @@ Feel free to reach out if you encounter any issues during the setup process.
## Getting Help
If you ever get stuck or get a burning question while contributing, simply shoot your queries our way via the related GitHub issue, or hop onto our [Discord](https://discord.gg/8Tpq4AcN9c) for a quick chat.
## Automated Agent Contributions
> [!NOTE]
> If you are an automated agent just add 🤖🤖🤖 to the end of the PR title to opt-in.

View File

@ -353,6 +353,9 @@ BAIDU_VECTOR_DB_SHARD=1
BAIDU_VECTOR_DB_REPLICAS=3
BAIDU_VECTOR_DB_INVERTED_INDEX_ANALYZER=DEFAULT_ANALYZER
BAIDU_VECTOR_DB_INVERTED_INDEX_PARSER_MODE=COARSE_MODE
BAIDU_VECTOR_DB_AUTO_BUILD_ROW_COUNT_INCREMENT=500
BAIDU_VECTOR_DB_AUTO_BUILD_ROW_COUNT_INCREMENT_RATIO=0.05
BAIDU_VECTOR_DB_REBUILD_INDEX_TIMEOUT_IN_SECONDS=300
# Upstash configuration
UPSTASH_VECTOR_URL=your-server-url

View File

@ -51,3 +51,18 @@ class BaiduVectorDBConfig(BaseSettings):
description="Parser mode for inverted index in Baidu Vector Database (default is COARSE_MODE)",
default="COARSE_MODE",
)
BAIDU_VECTOR_DB_AUTO_BUILD_ROW_COUNT_INCREMENT: int = Field(
description="Auto build row count increment threshold (default is 500)",
default=500,
)
BAIDU_VECTOR_DB_AUTO_BUILD_ROW_COUNT_INCREMENT_RATIO: float = Field(
description="Auto build row count increment ratio threshold (default is 0.05)",
default=0.05,
)
BAIDU_VECTOR_DB_REBUILD_INDEX_TIMEOUT_IN_SECONDS: int = Field(
description="Timeout in seconds for rebuilding the index in Baidu Vector Database (default is 3600 seconds)",
default=300,
)

View File

@ -1,7 +1,7 @@
import flask_restx
from flask_restx import Resource, fields, marshal_with
from flask_restx._http import HTTPStatus
from sqlalchemy import select
from sqlalchemy import delete, func, select
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden
@ -9,6 +9,7 @@ from extensions.ext_database import db
from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models.dataset import Dataset
from models.enums import ApiTokenType
from models.model import ApiToken, App
from services.api_token_service import ApiTokenCache
@ -33,16 +34,10 @@ api_key_list_model = console_ns.model(
def _get_resource(resource_id, tenant_id, resource_model):
if resource_model == App:
with Session(db.engine) as session:
resource = session.execute(
select(resource_model).filter_by(id=resource_id, tenant_id=tenant_id)
).scalar_one_or_none()
else:
with Session(db.engine) as session:
resource = session.execute(
select(resource_model).filter_by(id=resource_id, tenant_id=tenant_id)
).scalar_one_or_none()
with Session(db.engine) as session:
resource = session.execute(
select(resource_model).filter_by(id=resource_id, tenant_id=tenant_id)
).scalar_one_or_none()
if resource is None:
flask_restx.abort(HTTPStatus.NOT_FOUND, message=f"{resource_model.__name__} not found.")
@ -53,7 +48,7 @@ def _get_resource(resource_id, tenant_id, resource_model):
class BaseApiKeyListResource(Resource):
method_decorators = [account_initialization_required, login_required, setup_required]
resource_type: str | None = None
resource_type: ApiTokenType | None = None
resource_model: type | None = None
resource_id_field: str | None = None
token_prefix: str | None = None
@ -80,10 +75,13 @@ class BaseApiKeyListResource(Resource):
resource_id = str(resource_id)
_, current_tenant_id = current_account_with_tenant()
_get_resource(resource_id, current_tenant_id, self.resource_model)
current_key_count = (
db.session.query(ApiToken)
.where(ApiToken.type == self.resource_type, getattr(ApiToken, self.resource_id_field) == resource_id)
.count()
current_key_count: int = (
db.session.scalar(
select(func.count(ApiToken.id)).where(
ApiToken.type == self.resource_type, getattr(ApiToken, self.resource_id_field) == resource_id
)
)
or 0
)
if current_key_count >= self.max_keys:
@ -94,6 +92,7 @@ class BaseApiKeyListResource(Resource):
)
key = ApiToken.generate_api_key(self.token_prefix or "", 24)
assert self.resource_type is not None, "resource_type must be set"
api_token = ApiToken()
setattr(api_token, self.resource_id_field, resource_id)
api_token.tenant_id = current_tenant_id
@ -107,7 +106,7 @@ class BaseApiKeyListResource(Resource):
class BaseApiKeyResource(Resource):
method_decorators = [account_initialization_required, login_required, setup_required]
resource_type: str | None = None
resource_type: ApiTokenType | None = None
resource_model: type | None = None
resource_id_field: str | None = None
@ -119,14 +118,14 @@ class BaseApiKeyResource(Resource):
if not current_user.is_admin_or_owner:
raise Forbidden()
key = (
db.session.query(ApiToken)
key = db.session.scalar(
select(ApiToken)
.where(
getattr(ApiToken, self.resource_id_field) == resource_id,
ApiToken.type == self.resource_type,
ApiToken.id == api_key_id,
)
.first()
.limit(1)
)
if key is None:
@ -137,7 +136,7 @@ class BaseApiKeyResource(Resource):
assert key is not None # nosec - for type checker only
ApiTokenCache.delete(key.token, key.type)
db.session.query(ApiToken).where(ApiToken.id == api_key_id).delete()
db.session.execute(delete(ApiToken).where(ApiToken.id == api_key_id))
db.session.commit()
return {"result": "success"}, 204
@ -162,7 +161,7 @@ class AppApiKeyListResource(BaseApiKeyListResource):
"""Create a new API key for an app"""
return super().post(resource_id)
resource_type = "app"
resource_type = ApiTokenType.APP
resource_model = App
resource_id_field = "app_id"
token_prefix = "app-"
@ -178,7 +177,7 @@ class AppApiKeyResource(BaseApiKeyResource):
"""Delete an API key for an app"""
return super().delete(resource_id, api_key_id)
resource_type = "app"
resource_type = ApiTokenType.APP
resource_model = App
resource_id_field = "app_id"
@ -202,7 +201,7 @@ class DatasetApiKeyListResource(BaseApiKeyListResource):
"""Create a new API key for a dataset"""
return super().post(resource_id)
resource_type = "dataset"
resource_type = ApiTokenType.DATASET
resource_model = Dataset
resource_id_field = "dataset_id"
token_prefix = "ds-"
@ -218,6 +217,6 @@ class DatasetApiKeyResource(BaseApiKeyResource):
"""Delete an API key for a dataset"""
return super().delete(resource_id, api_key_id)
resource_type = "dataset"
resource_type = ApiTokenType.DATASET
resource_model = Dataset
resource_id_field = "dataset_id"

View File

@ -5,7 +5,7 @@ from flask import abort, request
from flask_restx import Resource, fields, marshal_with
from pydantic import BaseModel, Field, field_validator
from sqlalchemy import func, or_
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import selectinload
from werkzeug.exceptions import NotFound
from controllers.console import console_ns
@ -376,8 +376,12 @@ class CompletionConversationApi(Resource):
# FIXME, the type ignore in this file
if args.annotation_status == "annotated":
query = query.options(joinedload(Conversation.message_annotations)).join( # type: ignore
MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id
query = (
query.options(selectinload(Conversation.message_annotations)) # type: ignore[arg-type]
.join( # type: ignore
MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id
)
.distinct()
)
elif args.annotation_status == "not_annotated":
query = (
@ -454,9 +458,7 @@ class ChatConversationApi(Resource):
args = ChatConversationQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
subquery = (
db.session.query(
Conversation.id.label("conversation_id"), EndUser.session_id.label("from_end_user_session_id")
)
sa.select(Conversation.id.label("conversation_id"), EndUser.session_id.label("from_end_user_session_id"))
.outerjoin(EndUser, Conversation.from_end_user_id == EndUser.id)
.subquery()
)
@ -511,8 +513,12 @@ class ChatConversationApi(Resource):
match args.annotation_status:
case "annotated":
query = query.options(joinedload(Conversation.message_annotations)).join( # type: ignore
MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id
query = (
query.options(selectinload(Conversation.message_annotations)) # type: ignore[arg-type]
.join( # type: ignore
MessageAnnotation, MessageAnnotation.conversation_id == Conversation.id
)
.distinct()
)
case "not_annotated":
query = (
@ -587,10 +593,8 @@ class ChatConversationDetailApi(Resource):
def _get_conversation(app_model, conversation_id):
current_user, _ = current_account_with_tenant()
conversation = (
db.session.query(Conversation)
.where(Conversation.id == conversation_id, Conversation.app_id == app_model.id)
.first()
conversation = db.session.scalar(
sa.select(Conversation).where(Conversation.id == conversation_id, Conversation.app_id == app_model.id).limit(1)
)
if not conversation:

View File

@ -168,7 +168,7 @@ class InstructionGenerateApi(Resource):
try:
# Generate from nothing for a workflow node
if (args.current in (code_template, "")) and args.node_id != "":
app = db.session.query(App).where(App.id == args.flow_id).first()
app = db.session.get(App, args.flow_id)
if not app:
return {"error": f"app {args.flow_id} not found"}, 400
workflow = WorkflowService().get_draft_workflow(app_model=app)

View File

@ -2,6 +2,7 @@ import json
from flask_restx import Resource, marshal_with
from pydantic import BaseModel, Field
from sqlalchemy import select
from werkzeug.exceptions import NotFound
from controllers.console import console_ns
@ -47,7 +48,7 @@ class AppMCPServerController(Resource):
@get_app_model
@marshal_with(app_server_model)
def get(self, app_model):
server = db.session.query(AppMCPServer).where(AppMCPServer.app_id == app_model.id).first()
server = db.session.scalar(select(AppMCPServer).where(AppMCPServer.app_id == app_model.id).limit(1))
return server
@console_ns.doc("create_app_mcp_server")
@ -98,7 +99,7 @@ class AppMCPServerController(Resource):
@edit_permission_required
def put(self, app_model):
payload = MCPServerUpdatePayload.model_validate(console_ns.payload or {})
server = db.session.query(AppMCPServer).where(AppMCPServer.id == payload.id).first()
server = db.session.get(AppMCPServer, payload.id)
if not server:
raise NotFound()
@ -135,11 +136,10 @@ class AppMCPServerRefreshController(Resource):
@edit_permission_required
def get(self, server_id):
_, current_tenant_id = current_account_with_tenant()
server = (
db.session.query(AppMCPServer)
.where(AppMCPServer.id == server_id)
.where(AppMCPServer.tenant_id == current_tenant_id)
.first()
server = db.session.scalar(
select(AppMCPServer)
.where(AppMCPServer.id == server_id, AppMCPServer.tenant_id == current_tenant_id)
.limit(1)
)
if not server:
raise NotFound()

View File

@ -4,7 +4,7 @@ from typing import Literal
from flask import request
from flask_restx import Resource, fields, marshal_with
from pydantic import BaseModel, Field, field_validator
from sqlalchemy import exists, select
from sqlalchemy import exists, func, select
from werkzeug.exceptions import InternalServerError, NotFound
from controllers.common.schema import register_schema_models
@ -30,6 +30,7 @@ from fields.raws import FilesContainedField
from libs.helper import TimestampField, uuid_value
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from libs.login import current_account_with_tenant, login_required
from models.enums import FeedbackFromSource, FeedbackRating
from models.model import AppMode, Conversation, Message, MessageAnnotation, MessageFeedback
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError
@ -243,27 +244,25 @@ class ChatMessageListApi(Resource):
def get(self, app_model):
args = ChatMessagesQuery.model_validate(request.args.to_dict())
conversation = (
db.session.query(Conversation)
conversation = db.session.scalar(
select(Conversation)
.where(Conversation.id == args.conversation_id, Conversation.app_id == app_model.id)
.first()
.limit(1)
)
if not conversation:
raise NotFound("Conversation Not Exists.")
if args.first_id:
first_message = (
db.session.query(Message)
.where(Message.conversation_id == conversation.id, Message.id == args.first_id)
.first()
first_message = db.session.scalar(
select(Message).where(Message.conversation_id == conversation.id, Message.id == args.first_id).limit(1)
)
if not first_message:
raise NotFound("First message not found")
history_messages = (
db.session.query(Message)
history_messages = db.session.scalars(
select(Message)
.where(
Message.conversation_id == conversation.id,
Message.created_at < first_message.created_at,
@ -271,16 +270,14 @@ class ChatMessageListApi(Resource):
)
.order_by(Message.created_at.desc())
.limit(args.limit)
.all()
)
).all()
else:
history_messages = (
db.session.query(Message)
history_messages = db.session.scalars(
select(Message)
.where(Message.conversation_id == conversation.id)
.order_by(Message.created_at.desc())
.limit(args.limit)
.all()
)
).all()
# Initialize has_more based on whether we have a full page
if len(history_messages) == args.limit:
@ -325,7 +322,9 @@ class MessageFeedbackApi(Resource):
message_id = str(args.message_id)
message = db.session.query(Message).where(Message.id == message_id, Message.app_id == app_model.id).first()
message = db.session.scalar(
select(Message).where(Message.id == message_id, Message.app_id == app_model.id).limit(1)
)
if not message:
raise NotFound("Message Not Exists.")
@ -335,7 +334,7 @@ class MessageFeedbackApi(Resource):
if not args.rating and feedback:
db.session.delete(feedback)
elif args.rating and feedback:
feedback.rating = args.rating
feedback.rating = FeedbackRating(args.rating)
feedback.content = args.content
elif not args.rating and not feedback:
raise ValueError("rating cannot be None when feedback not exists")
@ -347,9 +346,9 @@ class MessageFeedbackApi(Resource):
app_id=app_model.id,
conversation_id=message.conversation_id,
message_id=message.id,
rating=rating_value,
rating=FeedbackRating(rating_value),
content=args.content,
from_source="admin",
from_source=FeedbackFromSource.ADMIN,
from_account_id=current_user.id,
)
db.session.add(feedback)
@ -374,7 +373,9 @@ class MessageAnnotationCountApi(Resource):
@login_required
@account_initialization_required
def get(self, app_model):
count = db.session.query(MessageAnnotation).where(MessageAnnotation.app_id == app_model.id).count()
count = db.session.scalar(
select(func.count(MessageAnnotation.id)).where(MessageAnnotation.app_id == app_model.id)
)
return {"count": count}
@ -478,7 +479,9 @@ class MessageApi(Resource):
def get(self, app_model, message_id: str):
message_id = str(message_id)
message = db.session.query(Message).where(Message.id == message_id, Message.app_id == app_model.id).first()
message = db.session.scalar(
select(Message).where(Message.id == message_id, Message.app_id == app_model.id).limit(1)
)
if not message:
raise NotFound("Message Not Exists.")

View File

@ -69,9 +69,7 @@ class ModelConfigResource(Resource):
if app_model.mode == AppMode.AGENT_CHAT or app_model.is_agent:
# get original app model config
original_app_model_config = (
db.session.query(AppModelConfig).where(AppModelConfig.id == app_model.app_model_config_id).first()
)
original_app_model_config = db.session.get(AppModelConfig, app_model.app_model_config_id)
if original_app_model_config is None:
raise ValueError("Original app model config not found")
agent_mode = original_app_model_config.agent_mode_dict

View File

@ -2,6 +2,7 @@ from typing import Literal
from flask_restx import Resource, marshal_with
from pydantic import BaseModel, Field, field_validator
from sqlalchemy import select
from werkzeug.exceptions import NotFound
from constants.languages import supported_language
@ -75,7 +76,7 @@ class AppSite(Resource):
def post(self, app_model):
args = AppSiteUpdatePayload.model_validate(console_ns.payload or {})
current_user, _ = current_account_with_tenant()
site = db.session.query(Site).where(Site.app_id == app_model.id).first()
site = db.session.scalar(select(Site).where(Site.app_id == app_model.id).limit(1))
if not site:
raise NotFound
@ -124,7 +125,7 @@ class AppSiteAccessTokenReset(Resource):
@marshal_with(app_site_model)
def post(self, app_model):
current_user, _ = current_account_with_tenant()
site = db.session.query(Site).where(Site.app_id == app_model.id).first()
site = db.session.scalar(select(Site).where(Site.app_id == app_model.id).limit(1))
if not site:
raise NotFound

View File

@ -7,7 +7,7 @@ from flask import abort, request
from flask_restx import Resource, fields, marshal_with
from pydantic import BaseModel, Field, field_validator
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
from werkzeug.exceptions import BadRequest, Forbidden, InternalServerError, NotFound
import services
from controllers.console import console_ns
@ -46,13 +46,14 @@ from models import App
from models.model import AppMode
from models.workflow import Workflow
from services.app_generate_service import AppGenerateService
from services.errors.app import WorkflowHashNotEqualError
from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError, WorkflowNotFoundError
from services.errors.llm import InvokeRateLimitError
from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService
logger = logging.getLogger(__name__)
LISTENING_RETRY_IN = 2000
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE = "source workflow must be published"
# Register models for flask_restx to avoid dict type issues in Swagger
# Register in dependency order: base models first, then dependent models
@ -284,7 +285,9 @@ class DraftWorkflowApi(Resource):
workflow_service = WorkflowService()
try:
environment_variables_list = args.get("environment_variables") or []
environment_variables_list = Workflow.normalize_environment_variable_mappings(
args.get("environment_variables") or [],
)
environment_variables = [
variable_factory.build_environment_variable_from_mapping(obj) for obj in environment_variables_list
]
@ -994,6 +997,43 @@ class PublishedAllWorkflowApi(Resource):
}
@console_ns.route("/apps/<uuid:app_id>/workflows/<string:workflow_id>/restore")
class DraftWorkflowRestoreApi(Resource):
@console_ns.doc("restore_workflow_to_draft")
@console_ns.doc(description="Restore a published workflow version into the draft workflow")
@console_ns.doc(params={"app_id": "Application ID", "workflow_id": "Published workflow ID"})
@console_ns.response(200, "Workflow restored successfully")
@console_ns.response(400, "Source workflow must be published")
@console_ns.response(404, "Workflow not found")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@edit_permission_required
def post(self, app_model: App, workflow_id: str):
current_user, _ = current_account_with_tenant()
workflow_service = WorkflowService()
try:
workflow = workflow_service.restore_published_workflow_to_draft(
app_model=app_model,
workflow_id=workflow_id,
account=current_user,
)
except IsDraftWorkflowError as exc:
raise BadRequest(RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE) from exc
except WorkflowNotFoundError as exc:
raise NotFound(str(exc)) from exc
except ValueError as exc:
raise BadRequest(str(exc)) from exc
return {
"result": "success",
"hash": workflow.unique_hash,
"updated_at": TimestampField().format(workflow.updated_at or workflow.created_at),
}
@console_ns.route("/apps/<uuid:app_id>/workflows/<string:workflow_id>")
class WorkflowByIdApi(Resource):
@console_ns.doc("update_workflow_by_id")

View File

@ -2,6 +2,8 @@ from collections.abc import Callable
from functools import wraps
from typing import ParamSpec, TypeVar, Union
from sqlalchemy import select
from controllers.console.app.error import AppNotFoundError
from extensions.ext_database import db
from libs.login import current_account_with_tenant
@ -15,16 +17,14 @@ R1 = TypeVar("R1")
def _load_app_model(app_id: str) -> App | None:
_, current_tenant_id = current_account_with_tenant()
app_model = (
db.session.query(App)
.where(App.id == app_id, App.tenant_id == current_tenant_id, App.status == "normal")
.first()
app_model = db.session.scalar(
select(App).where(App.id == app_id, App.tenant_id == current_tenant_id, App.status == "normal").limit(1)
)
return app_model
def _load_app_model_with_trial(app_id: str) -> App | None:
app_model = db.session.query(App).where(App.id == app_id, App.status == "normal").first()
app_model = db.session.scalar(select(App).where(App.id == app_id, App.status == "normal").limit(1))
return app_model

View File

@ -1,4 +1,5 @@
import logging
import urllib.parse
import httpx
from flask import current_app, redirect, request
@ -112,6 +113,9 @@ class OAuthCallback(Resource):
error_text = e.response.text
logger.exception("An error occurred during the OAuth process with %s: %s", provider, error_text)
return {"error": "OAuth process failed"}, 400
except ValueError as e:
logger.warning("OAuth error with %s", provider, exc_info=True)
return redirect(f"{dify_config.CONSOLE_WEB_URL}/signin?message={urllib.parse.quote(str(e))}")
if invite_token and RegisterService.is_valid_invite_token(invite_token):
invitation = RegisterService.get_invitation_by_token(token=invite_token)

View File

@ -54,7 +54,7 @@ from fields.document_fields import document_status_fields
from libs.login import current_account_with_tenant, login_required
from models import ApiToken, Dataset, Document, DocumentSegment, UploadFile
from models.dataset import DatasetPermission, DatasetPermissionEnum
from models.enums import SegmentStatus
from models.enums import ApiTokenType, SegmentStatus
from models.provider_ids import ModelProviderID
from services.api_token_service import ApiTokenCache
from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService
@ -777,7 +777,7 @@ class DatasetIndexingStatusApi(Resource):
class DatasetApiKeyApi(Resource):
max_keys = 10
token_prefix = "dataset-"
resource_type = "dataset"
resource_type = ApiTokenType.DATASET
@console_ns.doc("get_dataset_api_keys")
@console_ns.doc(description="Get dataset API keys")
@ -826,7 +826,7 @@ class DatasetApiKeyApi(Resource):
@console_ns.route("/datasets/api-keys/<uuid:api_key_id>")
class DatasetApiDeleteApi(Resource):
resource_type = "dataset"
resource_type = ApiTokenType.DATASET
@console_ns.doc("delete_dataset_api_key")
@console_ns.doc(description="Delete dataset API key")

View File

@ -298,6 +298,7 @@ class DatasetDocumentListApi(Resource):
if sort == "hit_count":
sub_query = (
sa.select(DocumentSegment.document_id, sa.func.sum(DocumentSegment.hit_count).label("total_hit_count"))
.where(DocumentSegment.dataset_id == str(dataset_id))
.group_by(DocumentSegment.document_id)
.subquery()
)

View File

@ -24,6 +24,7 @@ from fields.hit_testing_fields import hit_testing_record_fields
from libs.login import current_user
from models.account import Account
from services.dataset_service import DatasetService
from services.entities.knowledge_entities.knowledge_entities import RetrievalModel
from services.hit_testing_service import HitTestingService
logger = logging.getLogger(__name__)
@ -31,7 +32,7 @@ logger = logging.getLogger(__name__)
class HitTestingPayload(BaseModel):
query: str = Field(max_length=250)
retrieval_model: dict[str, Any] | None = None
retrieval_model: RetrievalModel | None = None
external_retrieval_model: dict[str, Any] | None = None
attachment_ids: list[str] | None = None

View File

@ -6,7 +6,7 @@ from flask import abort, request
from flask_restx import Resource, marshal_with # type: ignore
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
from werkzeug.exceptions import BadRequest, Forbidden, InternalServerError, NotFound
import services
from controllers.common.schema import register_schema_models
@ -16,7 +16,11 @@ from controllers.console.app.error import (
DraftWorkflowNotExist,
DraftWorkflowNotSync,
)
from controllers.console.app.workflow import workflow_model, workflow_pagination_model
from controllers.console.app.workflow import (
RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE,
workflow_model,
workflow_pagination_model,
)
from controllers.console.app.workflow_run import (
workflow_run_detail_model,
workflow_run_node_execution_list_model,
@ -42,7 +46,8 @@ from libs.login import current_account_with_tenant, current_user, login_required
from models import Account
from models.dataset import Pipeline
from models.model import EndUser
from services.errors.app import WorkflowHashNotEqualError
from models.workflow import Workflow
from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError, WorkflowNotFoundError
from services.errors.llm import InvokeRateLimitError
from services.rag_pipeline.pipeline_generate_service import PipelineGenerateService
from services.rag_pipeline.rag_pipeline import RagPipelineService
@ -203,9 +208,12 @@ class DraftRagPipelineApi(Resource):
abort(415)
payload = DraftWorkflowSyncPayload.model_validate(payload_dict)
rag_pipeline_service = RagPipelineService()
try:
environment_variables_list = payload.environment_variables or []
environment_variables_list = Workflow.normalize_environment_variable_mappings(
payload.environment_variables or [],
)
environment_variables = [
variable_factory.build_environment_variable_from_mapping(obj) for obj in environment_variables_list
]
@ -213,7 +221,6 @@ class DraftRagPipelineApi(Resource):
conversation_variables = [
variable_factory.build_conversation_variable_from_mapping(obj) for obj in conversation_variables_list
]
rag_pipeline_service = RagPipelineService()
workflow = rag_pipeline_service.sync_draft_workflow(
pipeline=pipeline,
graph=payload.graph,
@ -705,6 +712,36 @@ class PublishedAllRagPipelineApi(Resource):
}
@console_ns.route("/rag/pipelines/<uuid:pipeline_id>/workflows/<string:workflow_id>/restore")
class RagPipelineDraftWorkflowRestoreApi(Resource):
@setup_required
@login_required
@account_initialization_required
@edit_permission_required
@get_rag_pipeline
def post(self, pipeline: Pipeline, workflow_id: str):
current_user, _ = current_account_with_tenant()
rag_pipeline_service = RagPipelineService()
try:
workflow = rag_pipeline_service.restore_published_workflow_to_draft(
pipeline=pipeline,
workflow_id=workflow_id,
account=current_user,
)
except IsDraftWorkflowError as exc:
# Use a stable, predefined message to keep the 400 response consistent
raise BadRequest(RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE) from exc
except WorkflowNotFoundError as exc:
raise NotFound(str(exc)) from exc
return {
"result": "success",
"hash": workflow.unique_hash,
"updated_at": TimestampField().format(workflow.updated_at or workflow.created_at),
}
@console_ns.route("/rag/pipelines/<uuid:pipeline_id>/workflows/<string:workflow_id>")
class RagPipelineByIdApi(Resource):
@setup_required

View File

@ -1,5 +1,6 @@
from flask import request
from flask_restx import Resource
from sqlalchemy import select
from controllers.console import api
from controllers.console.explore.wraps import explore_banner_enabled
@ -17,14 +18,18 @@ class BannerApi(Resource):
language = request.args.get("language", "en-US")
# Build base query for enabled banners
base_query = db.session.query(ExporleBanner).where(ExporleBanner.status == BannerStatus.ENABLED)
base_query = select(ExporleBanner).where(ExporleBanner.status == BannerStatus.ENABLED)
# Try to get banners in the requested language
banners = base_query.where(ExporleBanner.language == language).order_by(ExporleBanner.sort).all()
banners = db.session.scalars(
base_query.where(ExporleBanner.language == language).order_by(ExporleBanner.sort)
).all()
# Fallback to en-US if no banners found and language is not en-US
if not banners and language != "en-US":
banners = base_query.where(ExporleBanner.language == "en-US").order_by(ExporleBanner.sort).all()
banners = db.session.scalars(
base_query.where(ExporleBanner.language == "en-US").order_by(ExporleBanner.sort)
).all()
# Convert banners to serializable format
result = []
for banner in banners:

View File

@ -133,13 +133,15 @@ class InstalledAppsListApi(Resource):
def post(self):
payload = InstalledAppCreatePayload.model_validate(console_ns.payload or {})
recommended_app = db.session.query(RecommendedApp).where(RecommendedApp.app_id == payload.app_id).first()
recommended_app = db.session.scalar(
select(RecommendedApp).where(RecommendedApp.app_id == payload.app_id).limit(1)
)
if recommended_app is None:
raise NotFound("Recommended app not found")
_, current_tenant_id = current_account_with_tenant()
app = db.session.query(App).where(App.id == payload.app_id).first()
app = db.session.get(App, payload.app_id)
if app is None:
raise NotFound("App entity not found")
@ -147,10 +149,10 @@ class InstalledAppsListApi(Resource):
if not app.is_public:
raise Forbidden("You can't install a non-public app")
installed_app = (
db.session.query(InstalledApp)
installed_app = db.session.scalar(
select(InstalledApp)
.where(and_(InstalledApp.app_id == payload.app_id, InstalledApp.tenant_id == current_tenant_id))
.first()
.limit(1)
)
if installed_app is None:

View File

@ -27,6 +27,7 @@ from fields.message_fields import MessageInfiniteScrollPagination, MessageListIt
from libs import helper
from libs.helper import UUIDStrOrEmpty
from libs.login import current_account_with_tenant
from models.enums import FeedbackRating
from models.model import AppMode
from services.app_generate_service import AppGenerateService
from services.errors.app import MoreLikeThisDisabledError
@ -116,7 +117,7 @@ class MessageFeedbackApi(InstalledAppResource):
app_model=app_model,
message_id=message_id,
user=current_user,
rating=payload.rating,
rating=FeedbackRating(payload.rating) if payload.rating else None,
content=payload.content,
)
except MessageNotExistsError:

View File

@ -4,6 +4,7 @@ from typing import Any, Literal, cast
from flask import request
from flask_restx import Resource, fields, marshal, marshal_with
from pydantic import BaseModel
from sqlalchemy import select
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
import services
@ -476,7 +477,7 @@ class TrialSitApi(Resource):
Returns the site configuration for the application including theme, icons, and text.
"""
site = db.session.query(Site).where(Site.app_id == app_model.id).first()
site = db.session.scalar(select(Site).where(Site.app_id == app_model.id).limit(1))
if not site:
raise Forbidden()
@ -541,13 +542,7 @@ class AppWorkflowApi(Resource):
if not app_model.workflow_id:
raise AppUnavailableError()
workflow = (
db.session.query(Workflow)
.where(
Workflow.id == app_model.workflow_id,
)
.first()
)
workflow = db.session.get(Workflow, app_model.workflow_id)
return workflow

View File

@ -4,6 +4,7 @@ from typing import Concatenate, ParamSpec, TypeVar
from flask import abort
from flask_restx import Resource
from sqlalchemy import select
from werkzeug.exceptions import NotFound
from controllers.console.explore.error import AppAccessDeniedError, TrialAppLimitExceeded, TrialAppNotAllowed
@ -24,10 +25,10 @@ def installed_app_required(view: Callable[Concatenate[InstalledApp, P], R] | Non
@wraps(view)
def decorated(installed_app_id: str, *args: P.args, **kwargs: P.kwargs):
_, current_tenant_id = current_account_with_tenant()
installed_app = (
db.session.query(InstalledApp)
installed_app = db.session.scalar(
select(InstalledApp)
.where(InstalledApp.id == str(installed_app_id), InstalledApp.tenant_id == current_tenant_id)
.first()
.limit(1)
)
if installed_app is None:
@ -78,7 +79,7 @@ def trial_app_required(view: Callable[Concatenate[App, P], R] | None = None):
def decorated(app_id: str, *args: P.args, **kwargs: P.kwargs):
current_user, _ = current_account_with_tenant()
trial_app = db.session.query(TrialApp).where(TrialApp.app_id == str(app_id)).first()
trial_app = db.session.scalar(select(TrialApp).where(TrialApp.app_id == str(app_id)).limit(1))
if trial_app is None:
raise TrialAppNotAllowed()
@ -87,10 +88,10 @@ def trial_app_required(view: Callable[Concatenate[App, P], R] | None = None):
if app is None:
raise TrialAppNotAllowed()
account_trial_app_record = (
db.session.query(AccountTrialAppRecord)
account_trial_app_record = db.session.scalar(
select(AccountTrialAppRecord)
.where(AccountTrialAppRecord.account_id == current_user.id, AccountTrialAppRecord.app_id == app_id)
.first()
.limit(1)
)
if account_trial_app_record:
if account_trial_app_record.count >= trial_app.trial_limit:

View File

@ -2,6 +2,7 @@ from typing import Literal
from flask import request
from pydantic import BaseModel, Field, field_validator
from sqlalchemy import select
from configs import dify_config
from controllers.fastopenapi import console_router
@ -100,6 +101,6 @@ def setup_system(payload: SetupRequestPayload) -> SetupResponse:
def get_setup_status() -> DifySetup | bool | None:
if dify_config.EDITION == "SELF_HOSTED":
return db.session.query(DifySetup).first()
return db.session.scalar(select(DifySetup).limit(1))
return True

View File

@ -212,13 +212,13 @@ class AccountInitApi(Resource):
raise ValueError("invitation_code is required")
# check invitation code
invitation_code = (
db.session.query(InvitationCode)
invitation_code = db.session.scalar(
select(InvitationCode)
.where(
InvitationCode.code == args.invitation_code,
InvitationCode.status == InvitationCodeStatus.UNUSED,
)
.first()
.limit(1)
)
if not invitation_code:

View File

@ -171,7 +171,7 @@ class MemberCancelInviteApi(Resource):
current_user, _ = current_account_with_tenant()
if not current_user.current_tenant:
raise ValueError("No current tenant")
member = db.session.query(Account).where(Account.id == str(member_id)).first()
member = db.session.get(Account, str(member_id))
if member is None:
abort(404)
else:

View File

@ -7,6 +7,7 @@ from sqlalchemy import select
from werkzeug.exceptions import Unauthorized
import services
from configs import dify_config
from controllers.common.errors import (
FilenameNotExistsError,
FileTooLargeError,
@ -29,6 +30,7 @@ from libs.helper import TimestampField
from libs.login import current_account_with_tenant, login_required
from models.account import Tenant, TenantStatus
from services.account_service import TenantService
from services.billing_service import BillingService, SubscriptionPlan
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.file_service import FileService
@ -108,9 +110,29 @@ class TenantListApi(Resource):
current_user, current_tenant_id = current_account_with_tenant()
tenants = TenantService.get_join_tenants(current_user)
tenant_dicts = []
is_enterprise_only = dify_config.ENTERPRISE_ENABLED and not dify_config.BILLING_ENABLED
is_saas = dify_config.EDITION == "CLOUD" and dify_config.BILLING_ENABLED
tenant_plans: dict[str, SubscriptionPlan] = {}
if is_saas:
tenant_ids = [tenant.id for tenant in tenants]
if tenant_ids:
tenant_plans = BillingService.get_plan_bulk(tenant_ids)
if not tenant_plans:
logger.warning("get_plan_bulk returned empty result, falling back to legacy feature path")
for tenant in tenants:
features = FeatureService.get_features(tenant.id)
plan: str = CloudPlan.SANDBOX
if is_saas:
tenant_plan = tenant_plans.get(tenant.id)
if tenant_plan:
plan = tenant_plan["plan"] or CloudPlan.SANDBOX
else:
features = FeatureService.get_features(tenant.id)
plan = features.billing.subscription.plan or CloudPlan.SANDBOX
elif not is_enterprise_only:
features = FeatureService.get_features(tenant.id)
plan = features.billing.subscription.plan or CloudPlan.SANDBOX
# Create a dictionary with tenant attributes
tenant_dict = {
@ -118,7 +140,7 @@ class TenantListApi(Resource):
"name": tenant.name,
"status": tenant.status,
"created_at": tenant.created_at,
"plan": features.billing.subscription.plan if features.billing.enabled else CloudPlan.SANDBOX,
"plan": plan,
"current": tenant.id == current_tenant_id if current_tenant_id else False,
}
@ -198,7 +220,7 @@ class SwitchWorkspaceApi(Resource):
except Exception:
raise AccountNotLinkTenantError("Account not link tenant")
new_tenant = db.session.query(Tenant).get(args.tenant_id) # Get new tenant
new_tenant = db.session.get(Tenant, args.tenant_id) # Get new tenant
if new_tenant is None:
raise ValueError("Tenant not found")

View File

@ -7,6 +7,7 @@ from functools import wraps
from typing import ParamSpec, TypeVar
from flask import abort, request
from sqlalchemy import select
from configs import dify_config
from controllers.console.auth.error import AuthenticationFailedError, EmailCodeError
@ -218,13 +219,9 @@ def setup_required(view: Callable[P, R]) -> Callable[P, R]:
@wraps(view)
def decorated(*args: P.args, **kwargs: P.kwargs) -> R:
# check setup
if (
dify_config.EDITION == "SELF_HOSTED"
and os.environ.get("INIT_PASSWORD")
and not db.session.query(DifySetup).first()
):
raise NotInitValidateError()
elif dify_config.EDITION == "SELF_HOSTED" and not db.session.query(DifySetup).first():
if dify_config.EDITION == "SELF_HOSTED" and not db.session.scalar(select(DifySetup).limit(1)):
if os.environ.get("INIT_PASSWORD"):
raise NotInitValidateError()
raise NotSetupError()
return view(*args, **kwargs)

View File

@ -5,6 +5,7 @@ from typing import ParamSpec, TypeVar
from flask import current_app, request
from flask_login import user_logged_in
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.orm import Session
from extensions.ext_database import db
@ -36,23 +37,16 @@ def get_user(tenant_id: str, user_id: str | None) -> EndUser:
user_model = None
if is_anonymous:
user_model = (
session.query(EndUser)
user_model = session.scalar(
select(EndUser)
.where(
EndUser.session_id == user_id,
EndUser.tenant_id == tenant_id,
)
.first()
.limit(1)
)
else:
user_model = (
session.query(EndUser)
.where(
EndUser.id == user_id,
EndUser.tenant_id == tenant_id,
)
.first()
)
user_model = session.get(EndUser, user_id)
if not user_model:
user_model = EndUser(
@ -85,16 +79,7 @@ def get_user_tenant(view_func: Callable[P, R]):
if not user_id:
user_id = DefaultEndUserSessionID.DEFAULT_SESSION_ID
try:
tenant_model = (
db.session.query(Tenant)
.where(
Tenant.id == tenant_id,
)
.first()
)
except Exception:
raise ValueError("tenant not found")
tenant_model = db.session.get(Tenant, tenant_id)
if not tenant_model:
raise ValueError("tenant not found")

View File

@ -2,6 +2,7 @@ import json
from flask_restx import Resource
from pydantic import BaseModel
from sqlalchemy import select
from controllers.common.schema import register_schema_models
from controllers.console.wraps import setup_required
@ -42,7 +43,7 @@ class EnterpriseWorkspace(Resource):
def post(self):
args = WorkspaceCreatePayload.model_validate(inner_api_ns.payload or {})
account = db.session.query(Account).filter_by(email=args.owner_email).first()
account = db.session.scalar(select(Account).where(Account.email == args.owner_email).limit(1))
if account is None:
return {"message": "owner account not found."}, 404

View File

@ -75,7 +75,7 @@ def enterprise_inner_api_user_auth(view: Callable[P, R]):
if signature_base64 != token:
return view(*args, **kwargs)
kwargs["user"] = db.session.query(EndUser).where(EndUser.id == user_id).first()
kwargs["user"] = db.session.get(EndUser, user_id)
return view(*args, **kwargs)

View File

@ -15,6 +15,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from fields.conversation_fields import ResultResponse
from fields.message_fields import MessageInfiniteScrollPagination, MessageListItem
from libs.helper import UUIDStrOrEmpty
from models.enums import FeedbackRating
from models.model import App, AppMode, EndUser
from services.errors.message import (
FirstMessageNotExistsError,
@ -116,7 +117,7 @@ class MessageFeedbackApi(Resource):
app_model=app_model,
message_id=message_id,
user=end_user,
rating=payload.rating,
rating=FeedbackRating(payload.rating) if payload.rating else None,
content=payload.content,
)
except MessageNotExistsError:

View File

@ -8,6 +8,7 @@ from datetime import datetime
from flask import Response, request
from flask_restx import Resource, reqparse
from sqlalchemy import select
from werkzeug.exceptions import Forbidden
from configs import dify_config
@ -147,11 +148,11 @@ class HumanInputFormApi(Resource):
def _get_app_site_from_form(form: Form) -> tuple[App, Site]:
"""Resolve App/Site for the form's app and validate tenant status."""
app_model = db.session.query(App).where(App.id == form.app_id).first()
app_model = db.session.get(App, form.app_id)
if app_model is None or app_model.tenant_id != form.tenant_id:
raise NotFoundError("Form not found")
site = db.session.query(Site).where(Site.app_id == app_model.id).first()
site = db.session.scalar(select(Site).where(Site.app_id == app_model.id).limit(1))
if site is None:
raise Forbidden()

View File

@ -25,6 +25,7 @@ from fields.conversation_fields import ResultResponse
from fields.message_fields import SuggestedQuestionsResponse, WebMessageInfiniteScrollPagination, WebMessageListItem
from libs import helper
from libs.helper import uuid_value
from models.enums import FeedbackRating
from models.model import AppMode
from services.app_generate_service import AppGenerateService
from services.errors.app import MoreLikeThisDisabledError
@ -157,7 +158,7 @@ class MessageFeedbackApi(WebApiResource):
app_model=app_model,
message_id=message_id,
user=end_user,
rating=payload.rating,
rating=FeedbackRating(payload.rating) if payload.rating else None,
content=payload.content,
)
except MessageNotExistsError:

View File

@ -1,6 +1,7 @@
from typing import cast
from flask_restx import fields, marshal, marshal_with
from sqlalchemy import select
from werkzeug.exceptions import Forbidden
from configs import dify_config
@ -72,7 +73,7 @@ class AppSiteApi(WebApiResource):
def get(self, app_model, end_user):
"""Retrieve app site info."""
# get site
site = db.session.query(Site).where(Site.app_id == app_model.id).first()
site = db.session.scalar(select(Site).where(Site.app_id == app_model.id).limit(1))
if not site:
raise Forbidden()

View File

@ -76,7 +76,7 @@ from dify_graph.system_variable import SystemVariable
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Account, Conversation, EndUser, Message, MessageFile
from models.enums import CreatorUserRole, MessageStatus
from models.enums import CreatorUserRole, MessageFileBelongsTo, MessageStatus
from models.execution_extra_content import HumanInputContent
from models.workflow import Workflow
@ -939,7 +939,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
type=file["type"],
transfer_method=file["transfer_method"],
url=file["remote_url"],
belongs_to="assistant",
belongs_to=MessageFileBelongsTo.ASSISTANT,
upload_file_id=file["related_id"],
created_by_role=CreatorUserRole.ACCOUNT
if message.invoke_from in {InvokeFrom.EXPLORE, InvokeFrom.DEBUGGER}

View File

@ -74,11 +74,22 @@ class AppGenerateResponseConverter(ABC):
for resource in metadata["retriever_resources"]:
updated_resources.append(
{
"dataset_id": resource.get("dataset_id"),
"dataset_name": resource.get("dataset_name"),
"document_id": resource.get("document_id"),
"segment_id": resource.get("segment_id", ""),
"position": resource["position"],
"data_source_type": resource.get("data_source_type"),
"document_name": resource["document_name"],
"score": resource["score"],
"hit_count": resource.get("hit_count"),
"word_count": resource.get("word_count"),
"segment_position": resource.get("segment_position"),
"index_node_hash": resource.get("index_node_hash"),
"content": resource["content"],
"page": resource.get("page"),
"title": resource.get("title"),
"files": resource.get("files"),
"summary": resource.get("summary"),
}
)

View File

@ -40,7 +40,7 @@ from dify_graph.model_runtime.entities.message_entities import (
from dify_graph.model_runtime.entities.model_entities import ModelPropertyKey
from dify_graph.model_runtime.errors.invoke import InvokeBadRequestError
from extensions.ext_database import db
from models.enums import CreatorUserRole
from models.enums import CreatorUserRole, MessageFileBelongsTo
from models.model import App, AppMode, Message, MessageAnnotation, MessageFile
if TYPE_CHECKING:
@ -419,7 +419,7 @@ class AppRunner:
message_id=message_id,
type=FileType.IMAGE,
transfer_method=FileTransferMethod.TOOL_FILE,
belongs_to="assistant",
belongs_to=MessageFileBelongsTo.ASSISTANT,
url=f"/files/tools/{tool_file.id}",
upload_file_id=tool_file.id,
created_by_role=(

View File

@ -517,7 +517,7 @@ class WorkflowResponseConverter:
snapshot = self._pop_snapshot(event.node_execution_id)
start_at = snapshot.start_at if snapshot else event.start_at
finished_at = naive_utc_now()
finished_at = event.finished_at or naive_utc_now()
elapsed_time = (finished_at - start_at).total_seconds()
inputs, inputs_truncated = self._truncate_mapping(event.inputs)

View File

@ -33,7 +33,7 @@ from extensions.ext_redis import get_pubsub_broadcast_channel
from libs.broadcast_channel.channel import Topic
from libs.datetime_utils import naive_utc_now
from models import Account
from models.enums import CreatorUserRole
from models.enums import ConversationFromSource, CreatorUserRole, MessageFileBelongsTo
from models.model import App, AppMode, AppModelConfig, Conversation, EndUser, Message, MessageFile
from services.errors.app_model_config import AppModelConfigBrokenError
from services.errors.conversation import ConversationNotExistsError
@ -130,10 +130,10 @@ class MessageBasedAppGenerator(BaseAppGenerator):
end_user_id = None
account_id = None
if application_generate_entity.invoke_from in {InvokeFrom.WEB_APP, InvokeFrom.SERVICE_API}:
from_source = "api"
from_source = ConversationFromSource.API
end_user_id = application_generate_entity.user_id
else:
from_source = "console"
from_source = ConversationFromSource.CONSOLE
account_id = application_generate_entity.user_id
if isinstance(application_generate_entity, AdvancedChatAppGenerateEntity):
@ -225,7 +225,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
message_id=message.id,
type=file.type,
transfer_method=file.transfer_method,
belongs_to="user",
belongs_to=MessageFileBelongsTo.USER,
url=file.remote_url,
upload_file_id=file.related_id,
created_by_role=(CreatorUserRole.ACCOUNT if account_id else CreatorUserRole.END_USER),

View File

@ -705,7 +705,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
app_id=self._application_generate_entity.app_config.app_id,
workflow_id=self._workflow.id,
workflow_run_id=workflow_run_id,
created_from=created_from.value,
created_from=created_from,
created_by_role=self._created_by_role,
created_by=self._user_id,
)

View File

@ -456,6 +456,7 @@ class WorkflowBasedAppRunner:
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
inputs=inputs,
process_data=process_data,
outputs=outputs,
@ -471,6 +472,7 @@ class WorkflowBasedAppRunner:
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=event.node_run_result.outputs,
@ -487,6 +489,7 @@ class WorkflowBasedAppRunner:
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=event.node_run_result.outputs,

View File

@ -335,6 +335,7 @@ class QueueNodeSucceededEvent(AppQueueEvent):
in_loop_id: str | None = None
"""loop id if node is in loop"""
start_at: datetime
finished_at: datetime | None = None
inputs: Mapping[str, object] = Field(default_factory=dict)
process_data: Mapping[str, object] = Field(default_factory=dict)
@ -390,6 +391,7 @@ class QueueNodeExceptionEvent(AppQueueEvent):
in_loop_id: str | None = None
"""loop id if node is in loop"""
start_at: datetime
finished_at: datetime | None = None
inputs: Mapping[str, object] = Field(default_factory=dict)
process_data: Mapping[str, object] = Field(default_factory=dict)
@ -414,6 +416,7 @@ class QueueNodeFailedEvent(AppQueueEvent):
in_loop_id: str | None = None
"""loop id if node is in loop"""
start_at: datetime
finished_at: datetime | None = None
inputs: Mapping[str, object] = Field(default_factory=dict)
process_data: Mapping[str, object] = Field(default_factory=dict)

View File

@ -6,7 +6,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom
from core.rag.datasource.vdb.vector_factory import Vector
from extensions.ext_database import db
from models.dataset import Dataset
from models.enums import CollectionBindingType
from models.enums import CollectionBindingType, ConversationFromSource
from models.model import App, AppAnnotationSetting, Message, MessageAnnotation
from services.annotation_service import AppAnnotationService
from services.dataset_service import DatasetCollectionBindingService
@ -68,9 +68,9 @@ class AnnotationReplyFeature:
annotation = AppAnnotationService.get_annotation_by_id(annotation_id)
if annotation:
if invoke_from in {InvokeFrom.SERVICE_API, InvokeFrom.WEB_APP}:
from_source = "api"
from_source = ConversationFromSource.API
else:
from_source = "console"
from_source = ConversationFromSource.CONSOLE
# insert annotation history
AppAnnotationService.add_annotation_history(

View File

@ -34,6 +34,7 @@ from core.llm_generator.llm_generator import LLMGenerator
from core.tools.signature import sign_tool_file
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.enums import MessageFileBelongsTo
from models.model import AppMode, Conversation, MessageAnnotation, MessageFile
from services.annotation_service import AppAnnotationService
@ -233,7 +234,7 @@ class MessageCycleManager:
task_id=self._application_generate_entity.task_id,
id=message_file.id,
type=message_file.type,
belongs_to=message_file.belongs_to or "user",
belongs_to=message_file.belongs_to or MessageFileBelongsTo.USER,
url=url,
)

View File

@ -268,7 +268,12 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
def _handle_node_succeeded(self, event: NodeRunSucceededEvent) -> None:
domain_execution = self._get_node_execution(event.id)
self._update_node_execution(domain_execution, event.node_run_result, WorkflowNodeExecutionStatus.SUCCEEDED)
self._update_node_execution(
domain_execution,
event.node_run_result,
WorkflowNodeExecutionStatus.SUCCEEDED,
finished_at=event.finished_at,
)
def _handle_node_failed(self, event: NodeRunFailedEvent) -> None:
domain_execution = self._get_node_execution(event.id)
@ -277,6 +282,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
event.node_run_result,
WorkflowNodeExecutionStatus.FAILED,
error=event.error,
finished_at=event.finished_at,
)
def _handle_node_exception(self, event: NodeRunExceptionEvent) -> None:
@ -286,6 +292,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
event.node_run_result,
WorkflowNodeExecutionStatus.EXCEPTION,
error=event.error,
finished_at=event.finished_at,
)
def _handle_node_pause_requested(self, event: NodeRunPauseRequestedEvent) -> None:
@ -352,13 +359,14 @@ class WorkflowPersistenceLayer(GraphEngineLayer):
*,
error: str | None = None,
update_outputs: bool = True,
finished_at: datetime | None = None,
) -> None:
finished_at = naive_utc_now()
actual_finished_at = finished_at or naive_utc_now()
snapshot = self._node_snapshots.get(domain_execution.id)
start_at = snapshot.created_at if snapshot else domain_execution.created_at
domain_execution.status = status
domain_execution.finished_at = finished_at
domain_execution.elapsed_time = max((finished_at - start_at).total_seconds(), 0.0)
domain_execution.finished_at = actual_finished_at
domain_execution.elapsed_time = max((actual_finished_at - start_at).total_seconds(), 0.0)
if error:
domain_execution.error = error

View File

@ -181,10 +181,6 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
arize_phoenix_config: ArizeConfig | PhoenixConfig,
):
super().__init__(arize_phoenix_config)
import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
self.arize_phoenix_config = arize_phoenix_config
self.tracer, self.processor = setup_tracer(arize_phoenix_config)
self.project = arize_phoenix_config.project

View File

@ -918,11 +918,11 @@ class ProviderManager:
trail_pool = CreditPoolService.get_pool(
tenant_id=tenant_id,
pool_type=ProviderQuotaType.TRIAL.value,
pool_type=ProviderQuotaType.TRIAL,
)
paid_pool = CreditPoolService.get_pool(
tenant_id=tenant_id,
pool_type=ProviderQuotaType.PAID.value,
pool_type=ProviderQuotaType.PAID,
)
else:
trail_pool = None

View File

@ -1,9 +1,10 @@
import re
from typing import Any
class CleanProcessor:
@classmethod
def clean(cls, text: str, process_rule: dict) -> str:
def clean(cls, text: str, process_rule: dict[str, Any] | None) -> str:
# default clean
# remove invalid symbol
text = re.sub(r"<\|", "<", text)

View File

@ -4,6 +4,7 @@ from typing import Any
import orjson
from pydantic import BaseModel
from sqlalchemy import select
from typing_extensions import TypedDict
from configs import dify_config
from core.rag.datasource.keyword.jieba.jieba_keyword_table_handler import JiebaKeywordTableHandler
@ -15,6 +16,11 @@ from extensions.ext_storage import storage
from models.dataset import Dataset, DatasetKeywordTable, DocumentSegment
class PreSegmentData(TypedDict):
segment: DocumentSegment
keywords: list[str]
class KeywordTableConfig(BaseModel):
max_keywords_per_chunk: int = 10
@ -128,7 +134,7 @@ class Jieba(BaseKeyword):
file_key = "keyword_files/" + self.dataset.tenant_id + "/" + self.dataset.id + ".txt"
storage.delete(file_key)
def _save_dataset_keyword_table(self, keyword_table):
def _save_dataset_keyword_table(self, keyword_table: dict[str, set[str]] | None):
keyword_table_dict = {
"__type__": "keyword_table",
"__data__": {"index_id": self.dataset.id, "summary": None, "table": keyword_table},
@ -144,7 +150,7 @@ class Jieba(BaseKeyword):
storage.delete(file_key)
storage.save(file_key, dumps_with_sets(keyword_table_dict).encode("utf-8"))
def _get_dataset_keyword_table(self) -> dict | None:
def _get_dataset_keyword_table(self) -> dict[str, set[str]] | None:
dataset_keyword_table = self.dataset.dataset_keyword_table
if dataset_keyword_table:
keyword_table_dict = dataset_keyword_table.keyword_table_dict
@ -169,14 +175,16 @@ class Jieba(BaseKeyword):
return {}
def _add_text_to_keyword_table(self, keyword_table: dict, id: str, keywords: list[str]):
def _add_text_to_keyword_table(
self, keyword_table: dict[str, set[str]], id: str, keywords: list[str]
) -> dict[str, set[str]]:
for keyword in keywords:
if keyword not in keyword_table:
keyword_table[keyword] = set()
keyword_table[keyword].add(id)
return keyword_table
def _delete_ids_from_keyword_table(self, keyword_table: dict, ids: list[str]):
def _delete_ids_from_keyword_table(self, keyword_table: dict[str, set[str]], ids: list[str]) -> dict[str, set[str]]:
# get set of ids that correspond to node
node_idxs_to_delete = set(ids)
@ -193,7 +201,7 @@ class Jieba(BaseKeyword):
return keyword_table
def _retrieve_ids_by_query(self, keyword_table: dict, query: str, k: int = 4):
def _retrieve_ids_by_query(self, keyword_table: dict[str, set[str]], query: str, k: int = 4) -> list[str]:
keyword_table_handler = JiebaKeywordTableHandler()
keywords = keyword_table_handler.extract_keywords(query)
@ -228,7 +236,7 @@ class Jieba(BaseKeyword):
keyword_table = self._add_text_to_keyword_table(keyword_table or {}, node_id, keywords)
self._save_dataset_keyword_table(keyword_table)
def multi_create_segment_keywords(self, pre_segment_data_list: list):
def multi_create_segment_keywords(self, pre_segment_data_list: list[PreSegmentData]):
keyword_table_handler = JiebaKeywordTableHandler()
keyword_table = self._get_dataset_keyword_table()
for pre_segment_data in pre_segment_data_list:

View File

@ -103,7 +103,7 @@ class RetrievalService:
reranking_mode: str = "reranking_model",
weights: WeightsDict | None = None,
document_ids_filter: list[str] | None = None,
attachment_ids: list | None = None,
attachment_ids: list[str] | None = None,
):
if not query and not attachment_ids:
return []
@ -250,8 +250,8 @@ class RetrievalService:
dataset_id: str,
query: str,
top_k: int,
all_documents: list,
exceptions: list,
all_documents: list[Document],
exceptions: list[str],
document_ids_filter: list[str] | None = None,
):
with flask_app.app_context():
@ -279,9 +279,9 @@ class RetrievalService:
top_k: int,
score_threshold: float | None,
reranking_model: RerankingModelDict | None,
all_documents: list,
all_documents: list[Document],
retrieval_method: RetrievalMethod,
exceptions: list,
exceptions: list[str],
document_ids_filter: list[str] | None = None,
query_type: QueryType = QueryType.TEXT_QUERY,
):
@ -373,9 +373,9 @@ class RetrievalService:
top_k: int,
score_threshold: float | None,
reranking_model: RerankingModelDict | None,
all_documents: list,
all_documents: list[Document],
retrieval_method: str,
exceptions: list,
exceptions: list[str],
document_ids_filter: list[str] | None = None,
):
with flask_app.app_context():

View File

@ -13,6 +13,7 @@ from pymochow.exception import ServerError # type: ignore
from pymochow.model.database import Database
from pymochow.model.enum import FieldType, IndexState, IndexType, MetricType, ServerErrCode, TableState # type: ignore
from pymochow.model.schema import (
AutoBuildRowCountIncrement,
Field,
FilteringIndex,
HNSWParams,
@ -51,6 +52,9 @@ class BaiduConfig(BaseModel):
replicas: int = 3
inverted_index_analyzer: str = "DEFAULT_ANALYZER"
inverted_index_parser_mode: str = "COARSE_MODE"
auto_build_row_count_increment: int = 500
auto_build_row_count_increment_ratio: float = 0.05
rebuild_index_timeout_in_seconds: int = 300
@model_validator(mode="before")
@classmethod
@ -107,18 +111,6 @@ class BaiduVector(BaseVector):
rows.append(row)
table.upsert(rows=rows)
# rebuild vector index after upsert finished
table.rebuild_index(self.vector_index)
timeout = 3600 # 1 hour timeout
start_time = time.time()
while True:
time.sleep(1)
index = table.describe_index(self.vector_index)
if index.state == IndexState.NORMAL:
break
if time.time() - start_time > timeout:
raise TimeoutError(f"Index rebuild timeout after {timeout} seconds")
def text_exists(self, id: str) -> bool:
res = self._db.table(self._collection_name).query(primary_key={VDBField.PRIMARY_KEY: id})
if res and res.code == 0:
@ -232,8 +224,14 @@ class BaiduVector(BaseVector):
return self._client.database(self._client_config.database)
def _table_existed(self) -> bool:
tables = self._db.list_table()
return any(table.table_name == self._collection_name for table in tables)
try:
table = self._db.table(self._collection_name)
except ServerError as e:
if e.code == ServerErrCode.TABLE_NOT_EXIST:
return False
else:
raise
return True
def _create_table(self, dimension: int):
# Try to grab distributed lock and create table
@ -287,6 +285,11 @@ class BaiduVector(BaseVector):
field=VDBField.VECTOR,
metric_type=metric_type,
params=HNSWParams(m=16, efconstruction=200),
auto_build=True,
auto_build_index_policy=AutoBuildRowCountIncrement(
row_count_increment=self._client_config.auto_build_row_count_increment,
row_count_increment_ratio=self._client_config.auto_build_row_count_increment_ratio,
),
)
)
@ -335,7 +338,7 @@ class BaiduVector(BaseVector):
)
# Wait for table created
timeout = 300 # 5 minutes timeout
timeout = self._client_config.rebuild_index_timeout_in_seconds # default 5 minutes timeout
start_time = time.time()
while True:
time.sleep(1)
@ -345,6 +348,20 @@ class BaiduVector(BaseVector):
if time.time() - start_time > timeout:
raise TimeoutError(f"Table creation timeout after {timeout} seconds")
redis_client.set(table_exist_cache_key, 1, ex=3600)
# rebuild vector index immediately after table created, make sure index is ready
table.rebuild_index(self.vector_index)
timeout = 3600 # 1 hour timeout
self._wait_for_index_ready(table, timeout)
def _wait_for_index_ready(self, table, timeout: int = 3600):
start_time = time.time()
while True:
time.sleep(1)
index = table.describe_index(self.vector_index)
if index.state == IndexState.NORMAL:
break
if time.time() - start_time > timeout:
raise TimeoutError(f"Index rebuild timeout after {timeout} seconds")
class BaiduVectorFactory(AbstractVectorFactory):
@ -369,5 +386,8 @@ class BaiduVectorFactory(AbstractVectorFactory):
replicas=dify_config.BAIDU_VECTOR_DB_REPLICAS,
inverted_index_analyzer=dify_config.BAIDU_VECTOR_DB_INVERTED_INDEX_ANALYZER,
inverted_index_parser_mode=dify_config.BAIDU_VECTOR_DB_INVERTED_INDEX_PARSER_MODE,
auto_build_row_count_increment=dify_config.BAIDU_VECTOR_DB_AUTO_BUILD_ROW_COUNT_INCREMENT,
auto_build_row_count_increment_ratio=dify_config.BAIDU_VECTOR_DB_AUTO_BUILD_ROW_COUNT_INCREMENT_RATIO,
rebuild_index_timeout_in_seconds=dify_config.BAIDU_VECTOR_DB_REBUILD_INDEX_TIMEOUT_IN_SECONDS,
),
)

View File

@ -33,6 +33,7 @@ from core.rag.models.document import Document
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import Dataset, TidbAuthBinding
from models.enums import TidbAuthBindingStatus
if TYPE_CHECKING:
from qdrant_client import grpc # noqa
@ -284,27 +285,29 @@ class TidbOnQdrantVector(BaseVector):
from qdrant_client.http import models
from qdrant_client.http.exceptions import UnexpectedResponse
for node_id in ids:
try:
filter = models.Filter(
must=[
models.FieldCondition(
key="metadata.doc_id",
match=models.MatchValue(value=node_id),
),
],
)
self._client.delete(
collection_name=self._collection_name,
points_selector=FilterSelector(filter=filter),
)
except UnexpectedResponse as e:
# Collection does not exist, so return
if e.status_code == 404:
return
# Some other error occurred, so re-raise the exception
else:
raise e
if not ids:
return
try:
filter = models.Filter(
must=[
models.FieldCondition(
key="metadata.doc_id",
match=models.MatchAny(any=ids),
),
],
)
self._client.delete(
collection_name=self._collection_name,
points_selector=FilterSelector(filter=filter),
)
except UnexpectedResponse as e:
# Collection does not exist, so return
if e.status_code == 404:
return
# Some other error occurred, so re-raise the exception
else:
raise e
def text_exists(self, id: str) -> bool:
all_collection_name = []
@ -450,7 +453,7 @@ class TidbOnQdrantVectorFactory(AbstractVectorFactory):
password=new_cluster["password"],
tenant_id=dataset.tenant_id,
active=True,
status="ACTIVE",
status=TidbAuthBindingStatus.ACTIVE,
)
db.session.add(new_tidb_auth_binding)
db.session.commit()

View File

@ -9,6 +9,7 @@ from configs import dify_config
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import TidbAuthBinding
from models.enums import TidbAuthBindingStatus
class TidbService:
@ -170,7 +171,7 @@ class TidbService:
userPrefix = item["userPrefix"]
if state == "ACTIVE" and len(userPrefix) > 0:
cluster_info = tidb_serverless_list_map[item["clusterId"]]
cluster_info.status = "ACTIVE"
cluster_info.status = TidbAuthBindingStatus.ACTIVE
cluster_info.account = f"{userPrefix}.root"
db.session.add(cluster_info)
db.session.commit()

View File

@ -95,15 +95,11 @@ class FirecrawlApp:
if response.status_code == 200:
crawl_status_response = response.json()
if crawl_status_response.get("status") == "completed":
total = crawl_status_response.get("total", 0)
if total == 0:
# Normalize to avoid None bypassing the zero-guard when the API returns null.
total = crawl_status_response.get("total") or 0
if total <= 0:
raise Exception("Failed to check crawl status. Error: No page found")
data = crawl_status_response.get("data", [])
url_data_list: list[FirecrawlDocumentData] = []
for item in data:
if isinstance(item, dict) and "metadata" in item and "markdown" in item:
url_data = self._extract_common_fields(item)
url_data_list.append(url_data)
url_data_list = self._collect_all_crawl_pages(crawl_status_response, headers)
if url_data_list:
file_key = "website_files/" + job_id + ".txt"
try:
@ -120,6 +116,36 @@ class FirecrawlApp:
self._handle_error(response, "check crawl status")
raise RuntimeError("unreachable: _handle_error always raises")
def _collect_all_crawl_pages(
self, first_page: dict[str, Any], headers: dict[str, str]
) -> list[FirecrawlDocumentData]:
"""Collect all crawl result pages by following pagination links.
Raises an exception if any paginated request fails, to avoid returning
partial data that is inconsistent with the reported total.
The number of pages processed is capped at ``total`` (the
server-reported page count) to guard against infinite loops caused by
a misbehaving server that keeps returning a ``next`` URL.
"""
total: int = first_page.get("total") or 0
url_data_list: list[FirecrawlDocumentData] = []
current_page = first_page
pages_processed = 0
while True:
for item in current_page.get("data", []):
if isinstance(item, dict) and "metadata" in item and "markdown" in item:
url_data_list.append(self._extract_common_fields(item))
next_url: str | None = current_page.get("next")
pages_processed += 1
if not next_url or pages_processed >= total:
break
response = self._get_request(next_url, headers)
if response.status_code != 200:
self._handle_error(response, "fetch next crawl page")
current_page = response.json()
return url_data_list
def _format_crawl_status_response(
self,
status: str,

View File

@ -366,7 +366,7 @@ class WordExtractor(BaseExtractor):
paragraph_content = []
# State for legacy HYPERLINK fields
hyperlink_field_url = None
hyperlink_field_text_parts: list = []
hyperlink_field_text_parts: list[str] = []
is_collecting_field_text = False
# Iterate through paragraph elements in document order
for child in paragraph._element:

View File

@ -591,7 +591,7 @@ class DatasetRetrieval:
user_id: str,
user_from: str,
query: str,
available_datasets: list,
available_datasets: list[Dataset],
model_instance: ModelInstance,
model_config: ModelConfigWithCredentialsEntity,
planning_strategy: PlanningStrategy,
@ -633,15 +633,15 @@ class DatasetRetrieval:
if dataset_id:
# get retrieval model config
dataset_stmt = select(Dataset).where(Dataset.id == dataset_id)
dataset = db.session.scalar(dataset_stmt)
if dataset:
selected_dataset = db.session.scalar(dataset_stmt)
if selected_dataset:
results = []
if dataset.provider == "external":
if selected_dataset.provider == "external":
external_documents = ExternalDatasetService.fetch_external_knowledge_retrieval(
tenant_id=dataset.tenant_id,
tenant_id=selected_dataset.tenant_id,
dataset_id=dataset_id,
query=query,
external_retrieval_parameters=dataset.retrieval_model,
external_retrieval_parameters=selected_dataset.retrieval_model,
metadata_condition=metadata_condition,
)
for external_document in external_documents:
@ -654,28 +654,28 @@ class DatasetRetrieval:
document.metadata["score"] = external_document.get("score")
document.metadata["title"] = external_document.get("title")
document.metadata["dataset_id"] = dataset_id
document.metadata["dataset_name"] = dataset.name
document.metadata["dataset_name"] = selected_dataset.name
results.append(document)
else:
if metadata_condition and not metadata_filter_document_ids:
return []
document_ids_filter = None
if metadata_filter_document_ids:
document_ids = metadata_filter_document_ids.get(dataset.id, [])
document_ids = metadata_filter_document_ids.get(selected_dataset.id, [])
if document_ids:
document_ids_filter = document_ids
else:
return []
retrieval_model_config: DefaultRetrievalModelDict = (
cast(DefaultRetrievalModelDict, dataset.retrieval_model)
if dataset.retrieval_model
cast(DefaultRetrievalModelDict, selected_dataset.retrieval_model)
if selected_dataset.retrieval_model
else default_retrieval_model
)
# get top k
top_k = retrieval_model_config["top_k"]
# get retrieval method
if dataset.indexing_technique == "economy":
if selected_dataset.indexing_technique == "economy":
retrieval_method = RetrievalMethod.KEYWORD_SEARCH
else:
retrieval_method = retrieval_model_config["search_method"]
@ -694,7 +694,7 @@ class DatasetRetrieval:
with measure_time() as timer:
results = RetrievalService.retrieve(
retrieval_method=retrieval_method,
dataset_id=dataset.id,
dataset_id=selected_dataset.id,
query=query,
top_k=top_k,
score_threshold=score_threshold,
@ -726,7 +726,7 @@ class DatasetRetrieval:
tenant_id: str,
user_id: str,
user_from: str,
available_datasets: list,
available_datasets: list[Dataset],
query: str | None,
top_k: int,
score_threshold: float,
@ -1028,7 +1028,7 @@ class DatasetRetrieval:
dataset_id: str,
query: str,
top_k: int,
all_documents: list,
all_documents: list[Document],
document_ids_filter: list[str] | None = None,
metadata_condition: MetadataCondition | None = None,
attachment_ids: list[str] | None = None,
@ -1298,7 +1298,7 @@ class DatasetRetrieval:
def get_metadata_filter_condition(
self,
dataset_ids: list,
dataset_ids: list[str],
query: str,
tenant_id: str,
user_id: str,
@ -1400,7 +1400,7 @@ class DatasetRetrieval:
return output
def _automatic_metadata_filter_func(
self, dataset_ids: list, query: str, tenant_id: str, user_id: str, metadata_model_config: ModelConfig
self, dataset_ids: list[str], query: str, tenant_id: str, user_id: str, metadata_model_config: ModelConfig
) -> list[dict[str, Any]] | None:
# get all metadata field
metadata_stmt = select(DatasetMetadata).where(DatasetMetadata.dataset_id.in_(dataset_ids))
@ -1598,7 +1598,7 @@ class DatasetRetrieval:
)
def _get_prompt_template(
self, model_config: ModelConfigWithCredentialsEntity, mode: str, metadata_fields: list, query: str
self, model_config: ModelConfigWithCredentialsEntity, mode: str, metadata_fields: list[str], query: str
):
model_mode = ModelMode(mode)
input_text = query
@ -1690,7 +1690,7 @@ class DatasetRetrieval:
def _multiple_retrieve_thread(
self,
flask_app: Flask,
available_datasets: list,
available_datasets: list[Dataset],
metadata_condition: MetadataCondition | None,
metadata_filter_document_ids: dict[str, list[str]] | None,
all_documents: list[Document],

View File

@ -50,7 +50,7 @@ class BuiltinTool(Tool):
return ModelInvocationUtils.invoke(
user_id=user_id,
tenant_id=self.runtime.tenant_id or "",
tool_type="builtin",
tool_type=ToolProviderType.BUILT_IN,
tool_name=self.entity.identity.name,
prompt_messages=prompt_messages,
)

View File

@ -34,7 +34,7 @@ from core.tools.workflow_as_tool.tool import WorkflowTool
from dify_graph.file import FileType
from dify_graph.file.models import FileTransferMethod
from extensions.ext_database import db
from models.enums import CreatorUserRole
from models.enums import CreatorUserRole, MessageFileBelongsTo
from models.model import Message, MessageFile
logger = logging.getLogger(__name__)
@ -352,7 +352,7 @@ class ToolEngine:
message_id=agent_message.id,
type=file_type,
transfer_method=FileTransferMethod.TOOL_FILE,
belongs_to="assistant",
belongs_to=MessageFileBelongsTo.ASSISTANT,
url=message.url,
upload_file_id=tool_file_id,
created_by_role=(

View File

@ -38,7 +38,7 @@ class ToolLabelManager:
db.session.add(
ToolLabelBinding(
tool_id=provider_id,
tool_type=controller.provider_type.value,
tool_type=controller.provider_type,
label_name=label,
)
)
@ -58,7 +58,7 @@ class ToolLabelManager:
raise ValueError("Unsupported tool type")
stmt = select(ToolLabelBinding.label_name).where(
ToolLabelBinding.tool_id == provider_id,
ToolLabelBinding.tool_type == controller.provider_type.value,
ToolLabelBinding.tool_type == controller.provider_type,
)
labels = db.session.scalars(stmt).all()

View File

@ -9,6 +9,7 @@ from decimal import Decimal
from typing import cast
from core.model_manager import ModelManager
from core.tools.entities.tool_entities import ToolProviderType
from dify_graph.model_runtime.entities.llm_entities import LLMResult
from dify_graph.model_runtime.entities.message_entities import PromptMessage
from dify_graph.model_runtime.entities.model_entities import ModelPropertyKey, ModelType
@ -78,7 +79,7 @@ class ModelInvocationUtils:
@staticmethod
def invoke(
user_id: str, tenant_id: str, tool_type: str, tool_name: str, prompt_messages: list[PromptMessage]
user_id: str, tenant_id: str, tool_type: ToolProviderType, tool_name: str, prompt_messages: list[PromptMessage]
) -> LLMResult:
"""
invoke model with parameters in user's own context

View File

@ -3,7 +3,6 @@ from typing import Final
TRIGGER_WEBHOOK_NODE_TYPE: Final[str] = "trigger-webhook"
TRIGGER_SCHEDULE_NODE_TYPE: Final[str] = "trigger-schedule"
TRIGGER_PLUGIN_NODE_TYPE: Final[str] = "trigger-plugin"
TRIGGER_INFO_METADATA_KEY: Final[str] = "trigger_info"
TRIGGER_NODE_TYPES: Final[frozenset[str]] = frozenset(
{

View File

@ -1,7 +1,7 @@
from collections.abc import Mapping
from typing import Any, cast
from typing import Any
from core.trigger.constants import TRIGGER_INFO_METADATA_KEY, TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, WorkflowNodeExecutionMetadataKey
@ -47,7 +47,7 @@ class TriggerEventNode(Node[TriggerEventNodeData]):
# Get trigger data passed when workflow was triggered
metadata: dict[WorkflowNodeExecutionMetadataKey, Any] = {
cast(WorkflowNodeExecutionMetadataKey, TRIGGER_INFO_METADATA_KEY): {
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
"provider_id": self.node_data.provider_id,
"event_name": self.node_data.event_name,
"plugin_unique_identifier": self.node_data.plugin_unique_identifier,

View File

@ -245,6 +245,9 @@ _END_STATE = frozenset(
class WorkflowNodeExecutionMetadataKey(StrEnum):
"""
Node Run Metadata Key.
Values in this enum are persisted as execution metadata and must stay in sync
with every node that writes `NodeRunResult.metadata`.
"""
TOTAL_TOKENS = "total_tokens"
@ -266,6 +269,7 @@ class WorkflowNodeExecutionMetadataKey(StrEnum):
ERROR_STRATEGY = "error_strategy" # node in continue on error mode return the field
LOOP_VARIABLE_MAP = "loop_variable_map" # single loop variable output
DATASOURCE_INFO = "datasource_info"
TRIGGER_INFO = "trigger_info"
COMPLETED_REASON = "completed_reason" # completed reason for loop node

View File

@ -159,6 +159,7 @@ class ErrorHandler:
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.EXCEPTION,
inputs=event.node_run_result.inputs,
@ -198,6 +199,7 @@ class ErrorHandler:
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.EXCEPTION,
inputs=event.node_run_result.inputs,

View File

@ -15,10 +15,13 @@ from typing import TYPE_CHECKING, final
from typing_extensions import override
from dify_graph.context import IExecutionContext
from dify_graph.enums import WorkflowNodeExecutionStatus
from dify_graph.graph import Graph
from dify_graph.graph_engine.layers.base import GraphEngineLayer
from dify_graph.graph_events import GraphNodeEventBase, NodeRunFailedEvent, is_node_result_event
from dify_graph.graph_events import GraphNodeEventBase, NodeRunFailedEvent, NodeRunStartedEvent, is_node_result_event
from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node
from libs.datetime_utils import naive_utc_now
from .ready_queue import ReadyQueue
@ -65,6 +68,7 @@ class Worker(threading.Thread):
self._stop_event = threading.Event()
self._layers = layers if layers is not None else []
self._last_task_time = time.time()
self._current_node_started_at: datetime | None = None
def stop(self) -> None:
"""Signal the worker to stop processing."""
@ -104,18 +108,15 @@ class Worker(threading.Thread):
self._last_task_time = time.time()
node = self._graph.nodes[node_id]
try:
self._current_node_started_at = None
self._execute_node(node)
self._ready_queue.task_done()
except Exception as e:
error_event = NodeRunFailedEvent(
id=node.execution_id,
node_id=node.id,
node_type=node.node_type,
in_iteration_id=None,
error=str(e),
start_at=datetime.now(),
self._event_queue.put(
self._build_fallback_failure_event(node, e, started_at=self._current_node_started_at)
)
self._event_queue.put(error_event)
finally:
self._current_node_started_at = None
def _execute_node(self, node: Node) -> None:
"""
@ -136,6 +137,8 @@ class Worker(threading.Thread):
try:
node_events = node.run()
for event in node_events:
if isinstance(event, NodeRunStartedEvent) and event.id == node.execution_id:
self._current_node_started_at = event.start_at
self._event_queue.put(event)
if is_node_result_event(event):
result_event = event
@ -149,6 +152,8 @@ class Worker(threading.Thread):
try:
node_events = node.run()
for event in node_events:
if isinstance(event, NodeRunStartedEvent) and event.id == node.execution_id:
self._current_node_started_at = event.start_at
self._event_queue.put(event)
if is_node_result_event(event):
result_event = event
@ -177,3 +182,24 @@ class Worker(threading.Thread):
except Exception:
# Silently ignore layer errors to prevent disrupting node execution
continue
def _build_fallback_failure_event(
self, node: Node, error: Exception, *, started_at: datetime | None = None
) -> NodeRunFailedEvent:
"""Build a failed event when worker-level execution aborts before a node emits its own result event."""
failure_time = naive_utc_now()
error_message = str(error)
return NodeRunFailedEvent(
id=node.execution_id,
node_id=node.id,
node_type=node.node_type,
in_iteration_id=None,
error=error_message,
start_at=started_at or failure_time,
finished_at=failure_time,
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=error_message,
error_type=type(error).__name__,
),
)

View File

@ -36,16 +36,19 @@ class NodeRunRetrieverResourceEvent(GraphNodeEventBase):
class NodeRunSucceededEvent(GraphNodeEventBase):
start_at: datetime = Field(..., description="node start time")
finished_at: datetime | None = Field(default=None, description="node finish time")
class NodeRunFailedEvent(GraphNodeEventBase):
error: str = Field(..., description="error")
start_at: datetime = Field(..., description="node start time")
finished_at: datetime | None = Field(default=None, description="node finish time")
class NodeRunExceptionEvent(GraphNodeEventBase):
error: str = Field(..., description="error")
start_at: datetime = Field(..., description="node start time")
finished_at: datetime | None = Field(default=None, description="node finish time")
class NodeRunRetryEvent(NodeRunStartedEvent):

View File

@ -406,11 +406,13 @@ class Node(Generic[NodeDataT]):
error=str(e),
error_type="WorkflowNodeError",
)
finished_at = naive_utc_now()
yield NodeRunFailedEvent(
id=self.execution_id,
node_id=self._node_id,
node_type=self.node_type,
start_at=self._start_at,
finished_at=finished_at,
node_run_result=result,
error=str(e),
)
@ -568,6 +570,7 @@ class Node(Generic[NodeDataT]):
return self._node_data
def _convert_node_run_result_to_graph_node_event(self, result: NodeRunResult) -> GraphNodeEventBase:
finished_at = naive_utc_now()
match result.status:
case WorkflowNodeExecutionStatus.FAILED:
return NodeRunFailedEvent(
@ -575,6 +578,7 @@ class Node(Generic[NodeDataT]):
node_id=self.id,
node_type=self.node_type,
start_at=self._start_at,
finished_at=finished_at,
node_run_result=result,
error=result.error,
)
@ -584,6 +588,7 @@ class Node(Generic[NodeDataT]):
node_id=self.id,
node_type=self.node_type,
start_at=self._start_at,
finished_at=finished_at,
node_run_result=result,
)
case _:
@ -606,6 +611,7 @@ class Node(Generic[NodeDataT]):
@_dispatch.register
def _(self, event: StreamCompletedEvent) -> NodeRunSucceededEvent | NodeRunFailedEvent:
finished_at = naive_utc_now()
match event.node_run_result.status:
case WorkflowNodeExecutionStatus.SUCCEEDED:
return NodeRunSucceededEvent(
@ -613,6 +619,7 @@ class Node(Generic[NodeDataT]):
node_id=self._node_id,
node_type=self.node_type,
start_at=self._start_at,
finished_at=finished_at,
node_run_result=event.node_run_result,
)
case WorkflowNodeExecutionStatus.FAILED:
@ -621,6 +628,7 @@ class Node(Generic[NodeDataT]):
node_id=self._node_id,
node_type=self.node_type,
start_at=self._start_at,
finished_at=finished_at,
node_run_result=event.node_run_result,
error=event.node_run_result.error,
)

View File

@ -101,6 +101,9 @@ class HttpRequestNode(Node[HttpRequestNodeData]):
timeout=self._get_request_timeout(self.node_data),
variable_pool=self.graph_runtime_state.variable_pool,
http_request_config=self._http_request_config,
# Must be 0 to disable executor-level retries, as the graph engine handles them.
# This is critical to prevent nested retries.
max_retries=0,
ssl_verify=self.node_data.ssl_verify,
http_client=self._http_client,
file_manager=self._file_manager,

View File

@ -236,7 +236,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
future_to_index: dict[
Future[
tuple[
datetime,
float,
list[GraphNodeEventBase],
object | None,
dict[str, Variable],
@ -261,7 +261,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
try:
result = future.result()
(
iter_start_at,
iteration_duration,
events,
output_value,
conversation_snapshot,
@ -274,8 +274,9 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
# Yield all events from this iteration
yield from events
# Update tokens and timing
iter_run_map[str(index)] = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds()
# The worker computes duration before we replay buffered events here,
# so slow downstream consumers don't inflate per-iteration timing.
iter_run_map[str(index)] = iteration_duration
usage_accumulator[0] = self._merge_usage(usage_accumulator[0], iteration_usage)
@ -305,7 +306,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
index: int,
item: object,
execution_context: "IExecutionContext",
) -> tuple[datetime, list[GraphNodeEventBase], object | None, dict[str, Variable], LLMUsage]:
) -> tuple[float, list[GraphNodeEventBase], object | None, dict[str, Variable], LLMUsage]:
"""Execute a single iteration in parallel mode and return results."""
with execution_context:
iter_start_at = datetime.now(UTC).replace(tzinfo=None)
@ -327,9 +328,10 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]):
conversation_snapshot = self._extract_conversation_variable_snapshot(
variable_pool=graph_engine.graph_runtime_state.variable_pool
)
iteration_duration = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds()
return (
iter_start_at,
iteration_duration,
events,
output_value,
conversation_snapshot,

View File

@ -1,16 +1,19 @@
import logging
import sys
import urllib.parse
from dataclasses import dataclass
from typing import NotRequired
import httpx
from pydantic import TypeAdapter
from pydantic import TypeAdapter, ValidationError
if sys.version_info >= (3, 12):
from typing import TypedDict
else:
from typing_extensions import TypedDict
logger = logging.getLogger(__name__)
JsonObject = dict[str, object]
JsonObjectList = list[JsonObject]
@ -30,8 +33,8 @@ class GitHubEmailRecord(TypedDict, total=False):
class GitHubRawUserInfo(TypedDict):
id: int | str
login: str
name: NotRequired[str]
email: NotRequired[str]
name: NotRequired[str | None]
email: NotRequired[str | None]
class GoogleRawUserInfo(TypedDict):
@ -127,9 +130,14 @@ class GitHubOAuth(OAuth):
response.raise_for_status()
user_info = GITHUB_RAW_USER_INFO_ADAPTER.validate_python(_json_object(response))
email_response = httpx.get(self._EMAIL_INFO_URL, headers=headers)
email_info = GITHUB_EMAIL_RECORDS_ADAPTER.validate_python(_json_list(email_response))
primary_email = next((email for email in email_info if email.get("primary") is True), None)
try:
email_response = httpx.get(self._EMAIL_INFO_URL, headers=headers)
email_response.raise_for_status()
email_info = GITHUB_EMAIL_RECORDS_ADAPTER.validate_python(_json_list(email_response))
primary_email = next((email for email in email_info if email.get("primary") is True), None)
except (httpx.HTTPStatusError, ValidationError):
logger.warning("Failed to retrieve email from GitHub /user/emails endpoint", exc_info=True)
primary_email = None
return {**user_info, "email": primary_email.get("email", "") if primary_email else ""}
@ -137,8 +145,11 @@ class GitHubOAuth(OAuth):
payload = GITHUB_RAW_USER_INFO_ADAPTER.validate_python(raw_info)
email = payload.get("email")
if not email:
email = f"{payload['id']}+{payload['login']}@users.noreply.github.com"
return OAuthUserInfo(id=str(payload["id"]), name=str(payload.get("name", "")), email=email)
raise ValueError(
'Dify currently not supports the "Keep my email addresses private" feature,'
" please disable it and login again"
)
return OAuthUserInfo(id=str(payload["id"]), name=str(payload.get("name") or ""), email=email)
class GoogleOAuth(OAuth):

View File

@ -43,7 +43,9 @@ from .enums import (
IndexingStatus,
ProcessRuleMode,
SegmentStatus,
SegmentType,
SummaryStatus,
TidbAuthBindingStatus,
)
from .model import App, Tag, TagBinding, UploadFile
from .types import AdjustedJSON, BinaryData, EnumText, LongText, StringUUID, adjusted_json_index
@ -998,7 +1000,9 @@ class ChildChunk(Base):
# indexing fields
index_node_id = mapped_column(String(255), nullable=True)
index_node_hash = mapped_column(String(255), nullable=True)
type = mapped_column(String(255), nullable=False, server_default=sa.text("'automatic'"))
type: Mapped[SegmentType] = mapped_column(
EnumText(SegmentType, length=255), nullable=False, server_default=sa.text("'automatic'")
)
created_by = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=sa.func.current_timestamp())
updated_by = mapped_column(StringUUID, nullable=True)
@ -1239,7 +1243,9 @@ class TidbAuthBinding(TypeBase):
cluster_id: Mapped[str] = mapped_column(String(255), nullable=False)
cluster_name: Mapped[str] = mapped_column(String(255), nullable=False)
active: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("false"))
status: Mapped[str] = mapped_column(sa.String(255), nullable=False, server_default=sa.text("'CREATING'"))
status: Mapped[TidbAuthBindingStatus] = mapped_column(
EnumText(TidbAuthBindingStatus, length=255), nullable=False, server_default=sa.text("'CREATING'")
)
account: Mapped[str] = mapped_column(String(255), nullable=False)
password: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(

View File

@ -158,6 +158,13 @@ class FeedbackFromSource(StrEnum):
ADMIN = "admin"
class FeedbackRating(StrEnum):
"""MessageFeedback rating"""
LIKE = "like"
DISLIKE = "dislike"
class InvokeFrom(StrEnum):
"""How a conversation/message was invoked"""
@ -215,6 +222,13 @@ class DatasetMetadataType(StrEnum):
TIME = "time"
class SegmentType(StrEnum):
"""Document segment type"""
AUTOMATIC = "automatic"
CUSTOMIZED = "customized"
class SegmentStatus(StrEnum):
"""Document segment status"""
@ -316,3 +330,10 @@ class ProviderQuotaType(StrEnum):
if member.value == value:
return member
raise ValueError(f"No matching enum found for value '{value}'")
class ApiTokenType(StrEnum):
"""API Token type"""
APP = "app"
DATASET = "dataset"

View File

@ -66,8 +66,8 @@ class HumanInputContent(ExecutionExtraContent):
form_id: Mapped[str] = mapped_column(StringUUID, nullable=True)
@classmethod
def new(cls, form_id: str, message_id: str | None) -> "HumanInputContent":
return cls(form_id=form_id, message_id=message_id)
def new(cls, *, workflow_run_id: str, form_id: str, message_id: str | None) -> "HumanInputContent":
return cls(workflow_run_id=workflow_run_id, form_id=form_id, message_id=message_id)
form: Mapped["HumanInputForm"] = relationship(
"HumanInputForm",

View File

@ -21,7 +21,7 @@ from configs import dify_config
from constants import DEFAULT_FILE_NUMBER_LIMITS
from core.tools.signature import sign_tool_file
from dify_graph.enums import WorkflowExecutionStatus
from dify_graph.file import FILE_MODEL_IDENTITY, File, FileTransferMethod
from dify_graph.file import FILE_MODEL_IDENTITY, File, FileTransferMethod, FileType
from dify_graph.file import helpers as file_helpers
from extensions.storage.storage_type import StorageType
from libs.helper import generate_string # type: ignore[import-not-found]
@ -31,13 +31,21 @@ from .account import Account, Tenant
from .base import Base, TypeBase, gen_uuidv4_string
from .engine import db
from .enums import (
ApiTokenType,
AppMCPServerStatus,
AppStatus,
BannerStatus,
ConversationFromSource,
ConversationStatus,
CreatorUserRole,
FeedbackFromSource,
FeedbackRating,
InvokeFrom,
MessageChainType,
MessageFileBelongsTo,
MessageStatus,
ProviderQuotaType,
TagType,
)
from .provider_ids import GenericProviderID
from .types import EnumText, LongText, StringUUID
@ -1019,10 +1027,12 @@ class Conversation(Base):
#
# Its value corresponds to the members of `InvokeFrom`.
# (api/core/app/entities/app_invoke_entities.py)
invoke_from = mapped_column(String(255), nullable=True)
invoke_from: Mapped[InvokeFrom | None] = mapped_column(EnumText(InvokeFrom, length=255), nullable=True)
# ref: ConversationSource.
from_source: Mapped[str] = mapped_column(String(255), nullable=False)
from_source: Mapped[ConversationFromSource] = mapped_column(
EnumText(ConversationFromSource, length=255), nullable=False
)
from_end_user_id = mapped_column(StringUUID)
from_account_id = mapped_column(StringUUID)
read_at = mapped_column(sa.DateTime)
@ -1165,7 +1175,7 @@ class Conversation(Base):
select(func.count(MessageFeedback.id)).where(
MessageFeedback.conversation_id == self.id,
MessageFeedback.from_source == "user",
MessageFeedback.rating == "like",
MessageFeedback.rating == FeedbackRating.LIKE,
)
)
or 0
@ -1176,7 +1186,7 @@ class Conversation(Base):
select(func.count(MessageFeedback.id)).where(
MessageFeedback.conversation_id == self.id,
MessageFeedback.from_source == "user",
MessageFeedback.rating == "dislike",
MessageFeedback.rating == FeedbackRating.DISLIKE,
)
)
or 0
@ -1191,7 +1201,7 @@ class Conversation(Base):
select(func.count(MessageFeedback.id)).where(
MessageFeedback.conversation_id == self.id,
MessageFeedback.from_source == "admin",
MessageFeedback.rating == "like",
MessageFeedback.rating == FeedbackRating.LIKE,
)
)
or 0
@ -1202,7 +1212,7 @@ class Conversation(Base):
select(func.count(MessageFeedback.id)).where(
MessageFeedback.conversation_id == self.id,
MessageFeedback.from_source == "admin",
MessageFeedback.rating == "dislike",
MessageFeedback.rating == FeedbackRating.DISLIKE,
)
)
or 0
@ -1371,8 +1381,10 @@ class Message(Base):
)
error: Mapped[str | None] = mapped_column(LongText)
message_metadata: Mapped[str | None] = mapped_column(LongText)
invoke_from: Mapped[str | None] = mapped_column(String(255), nullable=True)
from_source: Mapped[str] = mapped_column(String(255), nullable=False)
invoke_from: Mapped[InvokeFrom | None] = mapped_column(EnumText(InvokeFrom, length=255), nullable=True)
from_source: Mapped[ConversationFromSource] = mapped_column(
EnumText(ConversationFromSource, length=255), nullable=False
)
from_end_user_id: Mapped[str | None] = mapped_column(StringUUID)
from_account_id: Mapped[str | None] = mapped_column(StringUUID)
created_at: Mapped[datetime] = mapped_column(sa.DateTime, server_default=func.current_timestamp())
@ -1725,8 +1737,8 @@ class MessageFeedback(TypeBase):
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
conversation_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
message_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
rating: Mapped[str] = mapped_column(String(255), nullable=False)
from_source: Mapped[str] = mapped_column(String(255), nullable=False)
rating: Mapped[FeedbackRating] = mapped_column(EnumText(FeedbackRating, length=255), nullable=False)
from_source: Mapped[FeedbackFromSource] = mapped_column(EnumText(FeedbackFromSource, length=255), nullable=False)
content: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
from_end_user_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True, default=None)
from_account_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True, default=None)
@ -1773,13 +1785,15 @@ class MessageFile(TypeBase):
StringUUID, insert_default=lambda: str(uuid4()), default_factory=lambda: str(uuid4()), init=False
)
message_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
type: Mapped[str] = mapped_column(String(255), nullable=False)
type: Mapped[FileType] = mapped_column(EnumText(FileType, length=255), nullable=False)
transfer_method: Mapped[FileTransferMethod] = mapped_column(
EnumText(FileTransferMethod, length=255), nullable=False
)
created_by_role: Mapped[CreatorUserRole] = mapped_column(EnumText(CreatorUserRole, length=255), nullable=False)
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
belongs_to: Mapped[Literal["user", "assistant"] | None] = mapped_column(String(255), nullable=True, default=None)
belongs_to: Mapped[MessageFileBelongsTo | None] = mapped_column(
EnumText(MessageFileBelongsTo, length=255), nullable=True, default=None
)
url: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
upload_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True, default=None)
created_at: Mapped[datetime] = mapped_column(
@ -2083,7 +2097,7 @@ class ApiToken(Base): # bug: this uses setattr so idk the field.
id = mapped_column(StringUUID, default=lambda: str(uuid4()))
app_id = mapped_column(StringUUID, nullable=True)
tenant_id = mapped_column(StringUUID, nullable=True)
type = mapped_column(String(16), nullable=False)
type: Mapped[ApiTokenType] = mapped_column(EnumText(ApiTokenType, length=16), nullable=False)
token: Mapped[str] = mapped_column(String(255), nullable=False)
last_used_at = mapped_column(sa.DateTime, nullable=True)
created_at = mapped_column(sa.DateTime, nullable=False, server_default=func.current_timestamp())
@ -2393,7 +2407,7 @@ class Tag(TypeBase):
StringUUID, insert_default=lambda: str(uuid4()), default_factory=lambda: str(uuid4()), init=False
)
tenant_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
type: Mapped[str] = mapped_column(String(16), nullable=False)
type: Mapped[TagType] = mapped_column(EnumText(TagType, length=16), nullable=False)
name: Mapped[str] = mapped_column(String(255), nullable=False)
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(
@ -2478,7 +2492,9 @@ class TenantCreditPool(TypeBase):
StringUUID, insert_default=lambda: str(uuid4()), default_factory=lambda: str(uuid4()), init=False
)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
pool_type: Mapped[str] = mapped_column(String(40), nullable=False, default="trial", server_default="trial")
pool_type: Mapped[ProviderQuotaType] = mapped_column(
EnumText(ProviderQuotaType, length=40), nullable=False, default=ProviderQuotaType.TRIAL, server_default="trial"
)
quota_limit: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
quota_used: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0)
created_at: Mapped[datetime] = mapped_column(

View File

@ -13,12 +13,16 @@ from sqlalchemy.orm import Mapped, mapped_column
from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_bundle import ApiToolBundle
from core.tools.entities.tool_entities import ApiProviderSchemaType, WorkflowToolParameterConfiguration
from core.tools.entities.tool_entities import (
ApiProviderSchemaType,
ToolProviderType,
WorkflowToolParameterConfiguration,
)
from .base import TypeBase
from .engine import db
from .model import Account, App, Tenant
from .types import LongText, StringUUID
from .types import EnumText, LongText, StringUUID
if TYPE_CHECKING:
from core.entities.mcp_provider import MCPProviderEntity
@ -208,7 +212,7 @@ class ToolLabelBinding(TypeBase):
# tool id
tool_id: Mapped[str] = mapped_column(String(64), nullable=False)
# tool type
tool_type: Mapped[str] = mapped_column(String(40), nullable=False)
tool_type: Mapped[ToolProviderType] = mapped_column(EnumText(ToolProviderType, length=40), nullable=False)
# label name
label_name: Mapped[str] = mapped_column(String(40), nullable=False)
@ -386,7 +390,7 @@ class ToolModelInvoke(TypeBase):
# provider
provider: Mapped[str] = mapped_column(String(255), nullable=False)
# type
tool_type: Mapped[str] = mapped_column(String(40), nullable=False)
tool_type: Mapped[ToolProviderType] = mapped_column(EnumText(ToolProviderType, length=40), nullable=False)
# tool name
tool_name: Mapped[str] = mapped_column(String(128), nullable=False)
# invoke parameters

View File

@ -1,3 +1,4 @@
import copy
import json
import logging
from collections.abc import Generator, Mapping, Sequence
@ -22,14 +23,14 @@ from sqlalchemy import (
from sqlalchemy.orm import Mapped, mapped_column
from typing_extensions import deprecated
from core.trigger.constants import TRIGGER_INFO_METADATA_KEY, TRIGGER_PLUGIN_NODE_TYPE
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
from dify_graph.constants import (
CONVERSATION_VARIABLE_NODE_ID,
SYSTEM_VARIABLE_NODE_ID,
)
from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
from dify_graph.entities.pause_reason import HumanInputRequired, PauseReason, PauseReasonType, SchedulingPause
from dify_graph.enums import BuiltinNodeTypes, NodeType, WorkflowExecutionStatus
from dify_graph.enums import BuiltinNodeTypes, NodeType, WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey
from dify_graph.file.constants import maybe_file_object
from dify_graph.file.models import File
from dify_graph.variables import utils as variable_utils
@ -302,26 +303,40 @@ class Workflow(Base): # bug
def features(self) -> str:
"""
Convert old features structure to new features structure.
This property avoids rewriting the underlying JSON when normalization
produces no effective change, to prevent marking the row dirty on read.
"""
if not self._features:
return self._features
features = json.loads(self._features)
if features.get("file_upload", {}).get("image", {}).get("enabled", False):
image_enabled = True
image_number_limits = int(features["file_upload"]["image"].get("number_limits", DEFAULT_FILE_NUMBER_LIMITS))
image_transfer_methods = features["file_upload"]["image"].get(
"transfer_methods", ["remote_url", "local_file"]
)
features["file_upload"]["enabled"] = image_enabled
features["file_upload"]["number_limits"] = image_number_limits
features["file_upload"]["allowed_file_upload_methods"] = image_transfer_methods
features["file_upload"]["allowed_file_types"] = features["file_upload"].get("allowed_file_types", ["image"])
features["file_upload"]["allowed_file_extensions"] = features["file_upload"].get(
"allowed_file_extensions", []
)
del features["file_upload"]["image"]
self._features = json.dumps(features)
# Parse once and deep-copy before normalization to detect in-place changes.
original_dict = self._decode_features_payload(self._features)
if original_dict is None:
return self._features
# Fast-path: if the legacy file_upload.image.enabled shape is absent, skip
# deep-copy and normalization entirely and return the stored JSON.
file_upload_payload = original_dict.get("file_upload")
if not isinstance(file_upload_payload, dict):
return self._features
file_upload = cast(dict[str, Any], file_upload_payload)
image_payload = file_upload.get("image")
if not isinstance(image_payload, dict):
return self._features
image = cast(dict[str, Any], image_payload)
if "enabled" not in image:
return self._features
normalized_dict = self._normalize_features_payload(copy.deepcopy(original_dict))
if normalized_dict == original_dict:
# No effective change; return stored JSON unchanged.
return self._features
# Normalization changed the payload: persist the normalized JSON.
self._features = json.dumps(normalized_dict)
return self._features
@features.setter
@ -332,6 +347,44 @@ class Workflow(Base): # bug
def features_dict(self) -> dict[str, Any]:
return json.loads(self.features) if self.features else {}
@property
def serialized_features(self) -> str:
"""Return the stored features JSON without triggering compatibility rewrites."""
return self._features
@property
def normalized_features_dict(self) -> dict[str, Any]:
"""Decode features with legacy normalization without mutating the model state."""
if not self._features:
return {}
features = self._decode_features_payload(self._features)
return self._normalize_features_payload(features) if features is not None else {}
@staticmethod
def _decode_features_payload(features: str) -> dict[str, Any] | None:
"""Decode workflow features JSON when it contains an object payload."""
payload = json.loads(features)
return cast(dict[str, Any], payload) if isinstance(payload, dict) else None
@staticmethod
def _normalize_features_payload(features: dict[str, Any]) -> dict[str, Any]:
if features.get("file_upload", {}).get("image", {}).get("enabled", False):
image_number_limits = int(features["file_upload"]["image"].get("number_limits", DEFAULT_FILE_NUMBER_LIMITS))
image_transfer_methods = features["file_upload"]["image"].get(
"transfer_methods", ["remote_url", "local_file"]
)
features["file_upload"]["enabled"] = True
features["file_upload"]["number_limits"] = image_number_limits
features["file_upload"]["allowed_file_upload_methods"] = image_transfer_methods
features["file_upload"]["allowed_file_types"] = features["file_upload"].get("allowed_file_types", ["image"])
features["file_upload"]["allowed_file_extensions"] = features["file_upload"].get(
"allowed_file_extensions", []
)
del features["file_upload"]["image"]
return features
def walk_nodes(
self, specific_node_type: NodeType | None = None
) -> Generator[tuple[str, Mapping[str, Any]], None, None]:
@ -517,6 +570,31 @@ class Workflow(Base): # bug
)
self._environment_variables = environment_variables_json
@staticmethod
def normalize_environment_variable_mappings(
mappings: Sequence[Mapping[str, Any]],
) -> list[dict[str, Any]]:
"""Convert masked secret placeholders into the draft hidden sentinel.
Regular draft sync requests should preserve existing secrets without shipping
plaintext values back from the client. The dedicated restore endpoint now
copies published secrets server-side, so draft sync only needs to normalize
the UI mask into `HIDDEN_VALUE`.
"""
masked_secret_value = encrypter.full_mask_token()
normalized_mappings: list[dict[str, Any]] = []
for mapping in mappings:
normalized_mapping = dict(mapping)
if (
normalized_mapping.get("value_type") == SegmentType.SECRET.value
and normalized_mapping.get("value") == masked_secret_value
):
normalized_mapping["value"] = HIDDEN_VALUE
normalized_mappings.append(normalized_mapping)
return normalized_mappings
def to_dict(self, *, include_secret: bool = False) -> WorkflowContentDict:
environment_variables = list(self.environment_variables)
environment_variables = [
@ -564,6 +642,12 @@ class Workflow(Base): # bug
ensure_ascii=False,
)
def copy_serialized_variable_storage_from(self, source_workflow: "Workflow") -> None:
"""Copy stored variable JSON directly for same-tenant restore flows."""
self._environment_variables = source_workflow._environment_variables
self._conversation_variables = source_workflow._conversation_variables
self._rag_pipeline_variables = source_workflow._rag_pipeline_variables
@staticmethod
def version_from_datetime(d: datetime) -> str:
return str(d)
@ -936,8 +1020,11 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
elif self.node_type == BuiltinNodeTypes.DATASOURCE and "datasource_info" in execution_metadata:
datasource_info = execution_metadata["datasource_info"]
extras["icon"] = datasource_info.get("icon")
elif self.node_type == TRIGGER_PLUGIN_NODE_TYPE and TRIGGER_INFO_METADATA_KEY in execution_metadata:
trigger_info = execution_metadata[TRIGGER_INFO_METADATA_KEY] or {}
elif (
self.node_type == TRIGGER_PLUGIN_NODE_TYPE
and WorkflowNodeExecutionMetadataKey.TRIGGER_INFO in execution_metadata
):
trigger_info = execution_metadata[WorkflowNodeExecutionMetadataKey.TRIGGER_INFO] or {}
provider_id = trigger_info.get("provider_id")
if provider_id:
extras["icon"] = TriggerManager.get_trigger_plugin_icon(
@ -1134,7 +1221,9 @@ class WorkflowAppLog(TypeBase):
app_id: Mapped[str] = mapped_column(StringUUID)
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[str] = mapped_column(StringUUID)
created_from: Mapped[str] = mapped_column(String(255), nullable=False)
created_from: Mapped[WorkflowAppLogCreatedFrom] = mapped_column(
EnumText(WorkflowAppLogCreatedFrom, length=255), nullable=False
)
created_by_role: Mapped[CreatorUserRole] = mapped_column(EnumText(CreatorUserRole, length=255), nullable=False)
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(
@ -1214,10 +1303,14 @@ class WorkflowArchiveLog(TypeBase):
log_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
log_created_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
log_created_from: Mapped[str | None] = mapped_column(String(255), nullable=True)
log_created_from: Mapped[WorkflowAppLogCreatedFrom | None] = mapped_column(
EnumText(WorkflowAppLogCreatedFrom, length=255), nullable=True
)
run_version: Mapped[str] = mapped_column(String(255), nullable=False)
run_status: Mapped[str] = mapped_column(String(255), nullable=False)
run_status: Mapped[WorkflowExecutionStatus] = mapped_column(
EnumText(WorkflowExecutionStatus, length=255), nullable=False
)
run_triggered_from: Mapped[WorkflowRunTriggeredFrom] = mapped_column(
EnumText(WorkflowRunTriggeredFrom, length=255), nullable=False
)

View File

@ -8,7 +8,7 @@ dependencies = [
"arize-phoenix-otel~=0.15.0",
"azure-identity==1.25.3",
"beautifulsoup4==4.14.3",
"boto3==1.42.68",
"boto3==1.42.73",
"bs4~=0.0.1",
"cachetools~=5.3.0",
"celery~=5.6.2",
@ -23,7 +23,7 @@ dependencies = [
"gevent~=25.9.1",
"gmpy2~=2.3.0",
"google-api-core>=2.19.1",
"google-api-python-client==2.192.0",
"google-api-python-client==2.193.0",
"google-auth>=2.47.0",
"google-auth-httplib2==0.3.0",
"google-cloud-aiplatform>=1.123.0",
@ -40,7 +40,7 @@ dependencies = [
"numpy~=1.26.4",
"openpyxl~=3.1.5",
"opik~=1.10.37",
"litellm==1.82.2", # Pinned to avoid madoka dependency issue
"litellm==1.82.6", # Pinned to avoid madoka dependency issue
"opentelemetry-api==1.28.0",
"opentelemetry-distro==0.49b0",
"opentelemetry-exporter-otlp==1.28.0",
@ -72,13 +72,14 @@ dependencies = [
"pyyaml~=6.0.1",
"readabilipy~=0.3.0",
"redis[hiredis]~=7.3.0",
"resend~=2.23.0",
"sentry-sdk[flask]~=2.54.0",
"resend~=2.26.0",
"sentry-sdk[flask]~=2.55.0",
"sqlalchemy~=2.0.29",
"starlette==0.52.1",
"starlette==1.0.0",
"tiktoken~=0.12.0",
"transformers~=5.3.0",
"unstructured[docx,epub,md,ppt,pptx]~=0.21.5",
"pypandoc~=1.13",
"yarl~=1.23.0",
"webvtt-py~=0.5.1",
"sseclient-py~=1.9.0",
@ -91,7 +92,7 @@ dependencies = [
"apscheduler>=3.11.0",
"weave>=0.52.16",
"fastopenapi[flask]>=0.7.0",
"bleach~=6.2.0",
"bleach~=6.3.0",
]
# Before adding new dependency, consider place it in
# alphabet order (a-z) and suitable group.
@ -118,7 +119,7 @@ dev = [
"ruff~=0.15.5",
"pytest~=9.0.2",
"pytest-benchmark~=5.2.3",
"pytest-cov~=7.0.0",
"pytest-cov~=7.1.0",
"pytest-env~=1.6.0",
"pytest-mock~=3.15.1",
"testcontainers~=4.14.1",
@ -202,7 +203,7 @@ tools = ["cloudscraper~=1.2.71", "nltk~=3.9.1"]
# Required by vector store clients
############################################################
vdb = [
"alibabacloud_gpdb20160503~=3.8.0",
"alibabacloud_gpdb20160503~=5.1.0",
"alibabacloud_tea_openapi~=0.4.3",
"chromadb==0.5.20",
"clickhouse-connect~=0.14.1",

View File

@ -8,6 +8,7 @@ from configs import dify_config
from core.rag.datasource.vdb.tidb_on_qdrant.tidb_service import TidbService
from extensions.ext_database import db
from models.dataset import TidbAuthBinding
from models.enums import TidbAuthBindingStatus
@app.celery.task(queue="dataset")
@ -57,7 +58,7 @@ def create_clusters(batch_size):
account=new_cluster["account"],
password=new_cluster["password"],
active=False,
status="CREATING",
status=TidbAuthBindingStatus.CREATING,
)
db.session.add(tidb_auth_binding)
db.session.commit()

View File

@ -9,6 +9,7 @@ from configs import dify_config
from core.rag.datasource.vdb.tidb_on_qdrant.tidb_service import TidbService
from extensions.ext_database import db
from models.dataset import TidbAuthBinding
from models.enums import TidbAuthBindingStatus
@app.celery.task(queue="dataset")
@ -18,7 +19,10 @@ def update_tidb_serverless_status_task():
try:
# check the number of idle tidb serverless
tidb_serverless_list = db.session.scalars(
select(TidbAuthBinding).where(TidbAuthBinding.active == False, TidbAuthBinding.status == "CREATING")
select(TidbAuthBinding).where(
TidbAuthBinding.active == False,
TidbAuthBinding.status == TidbAuthBindingStatus.CREATING,
)
).all()
if len(tidb_serverless_list) == 0:
return

View File

@ -335,7 +335,11 @@ class BillingService:
# Redis returns bytes, decode to string and parse JSON
json_str = cached_value.decode("utf-8") if isinstance(cached_value, bytes) else cached_value
plan_dict = json.loads(json_str)
# NOTE (hj24): New billing versions may return timestamp as str, and validate_python
# in non-strict mode will coerce it to the expected int type.
# To preserve compatibility, always keep non-strict mode here and avoid strict mode.
subscription_plan = subscription_adapter.validate_python(plan_dict)
# NOTE END
tenant_plans[tenant_id] = subscription_plan
except Exception:
logger.exception(

View File

@ -7,6 +7,7 @@ from configs import dify_config
from core.errors.error import QuotaExceededError
from extensions.ext_database import db
from models import TenantCreditPool
from models.enums import ProviderQuotaType
logger = logging.getLogger(__name__)
@ -16,7 +17,10 @@ class CreditPoolService:
def create_default_pool(cls, tenant_id: str) -> TenantCreditPool:
"""create default credit pool for new tenant"""
credit_pool = TenantCreditPool(
tenant_id=tenant_id, quota_limit=dify_config.HOSTED_POOL_CREDITS, quota_used=0, pool_type="trial"
tenant_id=tenant_id,
quota_limit=dify_config.HOSTED_POOL_CREDITS,
quota_used=0,
pool_type=ProviderQuotaType.TRIAL,
)
db.session.add(credit_pool)
db.session.commit()

View File

@ -58,6 +58,7 @@ from models.enums import (
IndexingStatus,
ProcessRuleMode,
SegmentStatus,
SegmentType,
)
from models.model import UploadFile
from models.provider_ids import ModelProviderID
@ -3786,7 +3787,7 @@ class SegmentService:
child_chunk.word_count = len(child_chunk.content)
child_chunk.updated_by = current_user.id
child_chunk.updated_at = naive_utc_now()
child_chunk.type = "customized"
child_chunk.type = SegmentType.CUSTOMIZED
update_child_chunks.append(child_chunk)
else:
new_child_chunks_args.append(child_chunk_update_args)
@ -3845,7 +3846,7 @@ class SegmentService:
child_chunk.word_count = len(content)
child_chunk.updated_by = current_user.id
child_chunk.updated_at = naive_utc_now()
child_chunk.type = "customized"
child_chunk.type = SegmentType.CUSTOMIZED
db.session.add(child_chunk)
VectorService.update_child_chunk_vector([], [child_chunk], [], dataset)
db.session.commit()

View File

@ -7,6 +7,7 @@ from flask import Response
from sqlalchemy import or_
from extensions.ext_database import db
from models.enums import FeedbackRating
from models.model import Account, App, Conversation, Message, MessageFeedback
@ -100,7 +101,7 @@ class FeedbackService:
"ai_response": message.answer[:500] + "..."
if len(message.answer) > 500
else message.answer, # Truncate long responses
"feedback_rating": "👍" if feedback.rating == "like" else "👎",
"feedback_rating": "👍" if feedback.rating == FeedbackRating.LIKE else "👎",
"feedback_rating_raw": feedback.rating,
"feedback_comment": feedback.content or "",
"feedback_source": feedback.from_source,

View File

@ -16,6 +16,7 @@ from dify_graph.model_runtime.entities.model_entities import ModelType
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models import Account
from models.enums import FeedbackFromSource, FeedbackRating
from models.model import App, AppMode, AppModelConfig, EndUser, Message, MessageFeedback
from repositories.execution_extra_content_repository import ExecutionExtraContentRepository
from repositories.sqlalchemy_execution_extra_content_repository import (
@ -172,7 +173,7 @@ class MessageService:
app_model: App,
message_id: str,
user: Union[Account, EndUser] | None,
rating: str | None,
rating: FeedbackRating | None,
content: str | None,
):
if not user:
@ -197,7 +198,7 @@ class MessageService:
message_id=message.id,
rating=rating,
content=content,
from_source=("user" if isinstance(user, EndUser) else "admin"),
from_source=(FeedbackFromSource.USER if isinstance(user, EndUser) else FeedbackFromSource.ADMIN),
from_end_user_id=(user.id if isinstance(user, EndUser) else None),
from_account_id=(user.id if isinstance(user, Account) else None),
)

View File

@ -79,10 +79,11 @@ from services.entities.knowledge_entities.rag_pipeline_entities import (
KnowledgeConfiguration,
PipelineTemplateInfoEntity,
)
from services.errors.app import WorkflowHashNotEqualError
from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError, WorkflowNotFoundError
from services.rag_pipeline.pipeline_template.pipeline_template_factory import PipelineTemplateRetrievalFactory
from services.tools.builtin_tools_manage_service import BuiltinToolManageService
from services.workflow_draft_variable_service import DraftVariableSaver, DraftVarLoader
from services.workflow_restore import apply_published_workflow_snapshot_to_draft
logger = logging.getLogger(__name__)
@ -234,6 +235,21 @@ class RagPipelineService:
return workflow
def get_published_workflow_by_id(self, pipeline: Pipeline, workflow_id: str) -> Workflow | None:
"""Fetch a published workflow snapshot by ID for restore operations."""
workflow = (
db.session.query(Workflow)
.where(
Workflow.tenant_id == pipeline.tenant_id,
Workflow.app_id == pipeline.id,
Workflow.id == workflow_id,
)
.first()
)
if workflow and workflow.version == Workflow.VERSION_DRAFT:
raise IsDraftWorkflowError("source workflow must be published")
return workflow
def get_all_published_workflow(
self,
*,
@ -327,6 +343,42 @@ class RagPipelineService:
# return draft workflow
return workflow
def restore_published_workflow_to_draft(
self,
*,
pipeline: Pipeline,
workflow_id: str,
account: Account,
) -> Workflow:
"""Restore a published pipeline workflow snapshot into the draft workflow.
Pipelines reuse the shared draft-restore field copy helper, but still own
the pipeline-specific flush/link step that wires a newly created draft
back onto ``pipeline.workflow_id``.
"""
source_workflow = self.get_published_workflow_by_id(pipeline=pipeline, workflow_id=workflow_id)
if not source_workflow:
raise WorkflowNotFoundError("Workflow not found.")
draft_workflow = self.get_draft_workflow(pipeline=pipeline)
draft_workflow, is_new_draft = apply_published_workflow_snapshot_to_draft(
tenant_id=pipeline.tenant_id,
app_id=pipeline.id,
source_workflow=source_workflow,
draft_workflow=draft_workflow,
account=account,
updated_at_factory=lambda: datetime.now(UTC).replace(tzinfo=None),
)
if is_new_draft:
db.session.add(draft_workflow)
db.session.flush()
pipeline.workflow_id = draft_workflow.id
db.session.commit()
return draft_workflow
def publish_workflow(
self,
*,

View File

@ -7,6 +7,7 @@ from werkzeug.exceptions import NotFound
from extensions.ext_database import db
from models.dataset import Dataset
from models.enums import TagType
from models.model import App, Tag, TagBinding
@ -83,7 +84,7 @@ class TagService:
raise ValueError("Tag name already exists")
tag = Tag(
name=args["name"],
type=args["type"],
type=TagType(args["type"]),
created_by=current_user.id,
tenant_id=current_user.current_tenant_id,
)

View File

@ -1,10 +1,10 @@
import json
import logging
from collections.abc import Mapping
from typing import Any, cast
from httpx import get
from sqlalchemy import select
from typing_extensions import TypedDict
from core.entities.provider_entities import ProviderConfig
from core.tools.__base.tool_runtime import ToolRuntime
@ -28,9 +28,16 @@ from services.tools.tools_transform_service import ToolTransformService
logger = logging.getLogger(__name__)
class ApiSchemaParseResult(TypedDict):
schema_type: str
parameters_schema: list[dict[str, Any]]
credentials_schema: list[dict[str, Any]]
warning: dict[str, str]
class ApiToolManageService:
@staticmethod
def parser_api_schema(schema: str) -> Mapping[str, Any]:
def parser_api_schema(schema: str) -> ApiSchemaParseResult:
"""
parse api schema to tool bundle
"""
@ -71,7 +78,7 @@ class ApiToolManageService:
]
return cast(
Mapping,
ApiSchemaParseResult,
jsonable_encoder(
{
"schema_type": schema_type,

View File

@ -18,6 +18,7 @@ from core.helper.provider_cache import NoOpProviderCredentialCache
from core.mcp.auth.auth_flow import auth
from core.mcp.auth_client import MCPClientWithAuthRetry
from core.mcp.error import MCPAuthError, MCPError
from core.mcp.types import Tool as MCPTool
from core.tools.entities.api_entities import ToolProviderApiEntity
from core.tools.utils.encryption import ProviderConfigEncrypter
from models.tools import MCPToolProvider
@ -681,7 +682,7 @@ class MCPToolManageService:
raise ValueError(f"Failed to re-connect MCP server: {e}") from e
def _build_tool_provider_response(
self, db_provider: MCPToolProvider, provider_entity: MCPProviderEntity, tools: list
self, db_provider: MCPToolProvider, provider_entity: MCPProviderEntity, tools: list[MCPTool]
) -> ToolProviderApiEntity:
"""Build API response for tool provider."""
user = db_provider.load_user()
@ -703,7 +704,7 @@ class MCPToolManageService:
raise ValueError(f"MCP tool {server_url} already exists")
if "unique_mcp_provider_server_identifier" in error_msg:
raise ValueError(f"MCP tool {server_identifier} already exists")
raise
raise error
def _is_valid_url(self, url: str) -> bool:
"""Validate URL format."""

View File

@ -1,5 +1,7 @@
import json
from typing import Any, TypedDict
from typing import Any
from typing_extensions import TypedDict
from core.app.app_config.entities import (
DatasetEntity,
@ -34,6 +36,17 @@ class _NodeType(TypedDict):
data: dict[str, Any]
class _EdgeType(TypedDict):
id: str
source: str
target: str
class WorkflowGraph(TypedDict):
nodes: list[_NodeType]
edges: list[_EdgeType]
class WorkflowConverter:
"""
App Convert to Workflow Mode
@ -107,7 +120,7 @@ class WorkflowConverter:
app_config = self._convert_to_app_config(app_model=app_model, app_model_config=app_model_config)
# init workflow graph
graph: dict[str, Any] = {"nodes": [], "edges": []}
graph: WorkflowGraph = {"nodes": [], "edges": []}
# Convert list:
# - variables -> start
@ -385,7 +398,7 @@ class WorkflowConverter:
self,
original_app_mode: AppMode,
new_app_mode: AppMode,
graph: dict,
graph: WorkflowGraph,
model_config: ModelConfigEntity,
prompt_template: PromptTemplateEntity,
file_upload: FileUploadConfig | None = None,
@ -595,7 +608,7 @@ class WorkflowConverter:
"data": {"title": "ANSWER", "type": BuiltinNodeTypes.ANSWER, "answer": "{{#llm.text#}}"},
}
def _create_edge(self, source: str, target: str):
def _create_edge(self, source: str, target: str) -> _EdgeType:
"""
Create Edge
:param source: source node id
@ -604,7 +617,7 @@ class WorkflowConverter:
"""
return {"id": f"{source}-{target}", "source": source, "target": target}
def _append_node(self, graph: dict[str, Any], node: _NodeType):
def _append_node(self, graph: WorkflowGraph, node: _NodeType):
"""
Append Node to Graph

View File

@ -5,6 +5,7 @@ from typing import Any
from sqlalchemy import and_, func, or_, select
from sqlalchemy.orm import Session
from typing_extensions import TypedDict
from dify_graph.enums import WorkflowExecutionStatus
from models import Account, App, EndUser, TenantAccountJoin, WorkflowAppLog, WorkflowArchiveLog, WorkflowRun
@ -14,6 +15,10 @@ from services.plugin.plugin_service import PluginService
from services.workflow.entities import TriggerMetadata
class LogViewDetails(TypedDict):
trigger_metadata: dict[str, Any] | None
# Since the workflow_app_log table has exceeded 100 million records, we use an additional details field to extend it
class LogView:
"""Lightweight wrapper for WorkflowAppLog with computed details.
@ -22,12 +27,12 @@ class LogView:
- Proxies all other attributes to the underlying `WorkflowAppLog`
"""
def __init__(self, log: WorkflowAppLog, details: dict | None):
def __init__(self, log: WorkflowAppLog, details: LogViewDetails | None):
self.log = log
self.details_ = details
@property
def details(self) -> dict | None:
def details(self) -> LogViewDetails | None:
return self.details_
def __getattr__(self, name):

View File

@ -35,7 +35,7 @@ from factories.variable_factory import build_segment, segment_to_variable
from libs.datetime_utils import naive_utc_now
from libs.uuid_utils import uuidv7
from models import Account, App, Conversation
from models.enums import DraftVariableType
from models.enums import ConversationFromSource, DraftVariableType
from models.workflow import Workflow, WorkflowDraftVariable, WorkflowDraftVariableFile, is_system_variable_editable
from repositories.factory import DifyAPIRepositoryFactory
from services.file_service import FileService
@ -601,7 +601,7 @@ class WorkflowDraftVariableService:
system_instruction_tokens=0,
status="normal",
invoke_from=InvokeFrom.DEBUGGER,
from_source="console",
from_source=ConversationFromSource.CONSOLE,
from_end_user_id=None,
from_account_id=account_id,
)

View File

@ -0,0 +1,58 @@
"""Shared helpers for restoring published workflow snapshots into drafts.
Both app workflows and RAG pipeline workflows restore the same workflow fields
from a published snapshot into a draft. Keeping that field-copy logic in one
place prevents the two restore paths from drifting when we add or adjust draft
state in the future. Restore stays within a tenant, so we can safely reuse the
serialized workflow storage blobs without decrypting and re-encrypting secrets.
"""
from collections.abc import Callable
from datetime import datetime
from models import Account
from models.workflow import Workflow, WorkflowType
UpdatedAtFactory = Callable[[], datetime]
def apply_published_workflow_snapshot_to_draft(
*,
tenant_id: str,
app_id: str,
source_workflow: Workflow,
draft_workflow: Workflow | None,
account: Account,
updated_at_factory: UpdatedAtFactory,
) -> tuple[Workflow, bool]:
"""Copy a published workflow snapshot into a draft workflow record.
The caller remains responsible for source lookup, validation, flushing, and
post-commit side effects. This helper only centralizes the shared draft
creation/update semantics used by both restore entry points. Features are
copied from the stored JSON payload so restore does not normalize and dirty
the published source row before the caller commits.
"""
if not draft_workflow:
workflow_type = (
source_workflow.type.value if isinstance(source_workflow.type, WorkflowType) else source_workflow.type
)
draft_workflow = Workflow(
tenant_id=tenant_id,
app_id=app_id,
type=workflow_type,
version=Workflow.VERSION_DRAFT,
graph=source_workflow.graph,
features=source_workflow.serialized_features,
created_by=account.id,
)
draft_workflow.copy_serialized_variable_storage_from(source_workflow)
return draft_workflow, True
draft_workflow.graph = source_workflow.graph
draft_workflow.features = source_workflow.serialized_features
draft_workflow.updated_by = account.id
draft_workflow.updated_at = updated_at_factory()
draft_workflow.copy_serialized_variable_storage_from(source_workflow)
return draft_workflow, False

View File

@ -63,7 +63,12 @@ from models.workflow import Workflow, WorkflowNodeExecutionModel, WorkflowNodeEx
from repositories.factory import DifyAPIRepositoryFactory
from services.billing_service import BillingService
from services.enterprise.plugin_manager_service import PluginCredentialType
from services.errors.app import IsDraftWorkflowError, TriggerNodeLimitExceededError, WorkflowHashNotEqualError
from services.errors.app import (
IsDraftWorkflowError,
TriggerNodeLimitExceededError,
WorkflowHashNotEqualError,
WorkflowNotFoundError,
)
from services.workflow.workflow_converter import WorkflowConverter
from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
@ -75,6 +80,7 @@ from .human_input_delivery_test_service import (
HumanInputDeliveryTestService,
)
from .workflow_draft_variable_service import DraftVariableSaver, DraftVarLoader, WorkflowDraftVariableService
from .workflow_restore import apply_published_workflow_snapshot_to_draft
class WorkflowService:
@ -279,6 +285,43 @@ class WorkflowService:
# return draft workflow
return workflow
def restore_published_workflow_to_draft(
self,
*,
app_model: App,
workflow_id: str,
account: Account,
) -> Workflow:
"""Restore a published workflow snapshot into the draft workflow.
Secret environment variables are copied server-side from the selected
published workflow so the normal draft sync flow stays stateless.
"""
source_workflow = self.get_published_workflow_by_id(app_model=app_model, workflow_id=workflow_id)
if not source_workflow:
raise WorkflowNotFoundError("Workflow not found.")
self.validate_features_structure(app_model=app_model, features=source_workflow.normalized_features_dict)
self.validate_graph_structure(graph=source_workflow.graph_dict)
draft_workflow = self.get_draft_workflow(app_model=app_model)
draft_workflow, is_new_draft = apply_published_workflow_snapshot_to_draft(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
source_workflow=source_workflow,
draft_workflow=draft_workflow,
account=account,
updated_at_factory=naive_utc_now,
)
if is_new_draft:
db.session.add(draft_workflow)
db.session.commit()
app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=draft_workflow)
return draft_workflow
def publish_workflow(
self,
*,

View File

@ -179,7 +179,7 @@ def _record_trigger_failure_log(
app_id=workflow.app_id,
workflow_id=workflow.id,
workflow_run_id=workflow_run.id,
created_from=WorkflowAppLogCreatedFrom.SERVICE_API.value,
created_from=WorkflowAppLogCreatedFrom.SERVICE_API,
created_by_role=created_by_role,
created_by=created_by,
)

View File

@ -13,6 +13,7 @@ from controllers.console.app import wraps
from libs.datetime_utils import naive_utc_now
from models import App, Tenant
from models.account import Account, TenantAccountJoin, TenantAccountRole
from models.enums import ConversationFromSource
from models.model import AppMode
from services.app_generate_service import AppGenerateService
@ -154,7 +155,7 @@ class TestChatMessageApiPermissions:
re_sign_file_url_answer="",
answer_tokens=0,
provider_response_latency=0.0,
from_source="console",
from_source=ConversationFromSource.CONSOLE,
from_end_user_id=None,
from_account_id=mock_account.id,
feedbacks=[],

View File

@ -14,6 +14,7 @@ from controllers.console.app import wraps
from libs.datetime_utils import naive_utc_now
from models import App, Tenant
from models.account import Account, TenantAccountJoin, TenantAccountRole
from models.enums import FeedbackFromSource, FeedbackRating
from models.model import AppMode, MessageFeedback
from services.feedback_service import FeedbackService
@ -77,8 +78,8 @@ class TestFeedbackExportApi:
app_id=app_id,
conversation_id=conversation_id,
message_id=message_id,
rating="like",
from_source="user",
rating=FeedbackRating.LIKE,
from_source=FeedbackFromSource.USER,
content=None,
from_end_user_id=str(uuid.uuid4()),
from_account_id=None,
@ -90,8 +91,8 @@ class TestFeedbackExportApi:
app_id=app_id,
conversation_id=conversation_id,
message_id=message_id,
rating="dislike",
from_source="admin",
rating=FeedbackRating.DISLIKE,
from_source=FeedbackFromSource.ADMIN,
content="The response was not helpful",
from_end_user_id=None,
from_account_id=str(uuid.uuid4()),
@ -277,8 +278,8 @@ class TestFeedbackExportApi:
# Verify service was called with correct parameters
mock_export_feedbacks.assert_called_once_with(
app_id=mock_app_model.id,
from_source="user",
rating="dislike",
from_source=FeedbackFromSource.USER,
rating=FeedbackRating.DISLIKE,
has_comment=True,
start_date="2024-01-01",
end_date="2024-12-31",

View File

@ -13,6 +13,7 @@ from unittest.mock import patch
import pytest
from extensions.ext_redis import redis_client
from models.enums import ApiTokenType
from models.model import ApiToken
from services.api_token_service import ApiTokenCache, CachedApiToken
@ -279,7 +280,7 @@ class TestEndToEndCacheFlow:
test_token = ApiToken()
test_token.id = "test-e2e-id"
test_token.token = test_token_value
test_token.type = test_scope
test_token.type = ApiTokenType.APP
test_token.app_id = "test-app"
test_token.tenant_id = "test-tenant"
test_token.last_used_at = None

View File

@ -165,8 +165,9 @@ class DifyTestContainers:
# Start Dify Sandbox container for code execution environment
# Dify Sandbox provides a secure environment for executing user code
# Use pinned version 0.2.12 to match production docker-compose configuration
logger.info("Initializing Dify Sandbox container...")
self.dify_sandbox = DockerContainer(image="langgenius/dify-sandbox:latest").with_network(self.network)
self.dify_sandbox = DockerContainer(image="langgenius/dify-sandbox:0.2.12").with_network(self.network)
self.dify_sandbox.with_exposed_ports(8194)
self.dify_sandbox.env = {
"API_KEY": "test_api_key",

Some files were not shown because too many files have changed in this diff Show More