perf: remove redundant EndUser DB query in workflow generators

This commit is contained in:
fangshiyuan.fsy@alibaba-inc.com 2026-03-23 18:46:10 +08:00 committed by shiyuanfang2nd
parent 0c3d11f920
commit 7bd814c4c2
6 changed files with 42 additions and 27 deletions

View File

@ -186,6 +186,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
files=list(file_objs),
parent_message_id=args.get("parent_message_id") if invoke_from != InvokeFrom.SERVICE_API else UUID_NIL,
user_id=user.id,
user_session_id=user.session_id if isinstance(user, EndUser) else user.id,
stream=streaming,
invoke_from=invoke_from,
extras=extras,
@ -298,6 +299,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
query="",
files=[],
user_id=user.id,
user_session_id=user.session_id if isinstance(user, EndUser) else user.id,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
@ -384,6 +386,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
query="",
files=[],
user_id=user.id,
user_session_id=user.session_id if isinstance(user, EndUser) else user.id,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
@ -595,13 +598,13 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
InvokeFrom.SERVICE_API,
}
if is_external_api_call:
# For external API calls, use end user's session ID
end_user = session.scalar(select(EndUser).where(EndUser.id == application_generate_entity.user_id))
system_user_id = end_user.session_id if end_user else ""
else:
# For internal calls, use the original user ID
system_user_id = application_generate_entity.user_id
# For external API calls, use end user's session ID
# For internal calls, use the original user ID
system_user_id = (
application_generate_entity.user_session_id
if is_external_api_call
else application_generate_entity.user_id
)
app = session.scalar(select(App).where(App.id == application_generate_entity.app_config.app_id))
if app is None:

View File

@ -188,6 +188,7 @@ class PipelineGenerator(BaseAppGenerator):
),
files=[],
user_id=user.id,
user_session_id=user.session_id if isinstance(user, EndUser) else user.id,
stream=streaming,
invoke_from=invoke_from,
call_depth=call_depth,
@ -392,6 +393,7 @@ class PipelineGenerator(BaseAppGenerator):
inputs={},
files=[],
user_id=user.id,
user_session_id=user.session_id if isinstance(user, EndUser) else user.id,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
@ -489,6 +491,7 @@ class PipelineGenerator(BaseAppGenerator):
inputs={},
files=[],
user_id=user.id,
user_session_id=user.session_id if isinstance(user, EndUser) else user.id,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
@ -576,15 +579,14 @@ class PipelineGenerator(BaseAppGenerator):
InvokeFrom.SERVICE_API,
}
if is_external_api_call:
# For external API calls, use end user's session ID
end_user = session.scalar(
select(EndUser).where(EndUser.id == application_generate_entity.user_id)
)
system_user_id = end_user.session_id if end_user else ""
else:
# For internal calls, use the original user ID
system_user_id = application_generate_entity.user_id
# For external API calls, use end user's session ID
# For internal calls, use the original user ID
system_user_id = (
application_generate_entity.user_session_id
if is_external_api_call
else application_generate_entity.user_id
)
# workflow app
runner = PipelineRunner(
application_generate_entity=application_generate_entity,

View File

@ -180,6 +180,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
inputs=inputs,
files=list(system_files),
user_id=user.id,
user_session_id=user.session_id if isinstance(user, EndUser) else user.id,
stream=streaming,
invoke_from=invoke_from,
call_depth=call_depth,
@ -384,6 +385,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
inputs={},
files=[],
user_id=user.id,
user_session_id=user.session_id if isinstance(user, EndUser) else user.id,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
@ -470,6 +472,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
inputs={},
files=[],
user_id=user.id,
user_session_id=user.session_id if isinstance(user, EndUser) else user.id,
stream=streaming,
invoke_from=InvokeFrom.DEBUGGER,
extras={"auto_generate_conversation_name": False},
@ -557,13 +560,13 @@ class WorkflowAppGenerator(BaseAppGenerator):
InvokeFrom.SERVICE_API,
}
if is_external_api_call:
# For external API calls, use end user's session ID
end_user = session.scalar(select(EndUser).where(EndUser.id == application_generate_entity.user_id))
system_user_id = end_user.session_id if end_user else ""
else:
# For internal calls, use the original user ID
system_user_id = application_generate_entity.user_id
# For external API calls, use end user's session ID
# For internal calls, use the original user ID
system_user_id = (
application_generate_entity.user_session_id
if is_external_api_call
else application_generate_entity.user_id
)
runner = WorkflowAppRunner(
application_generate_entity=application_generate_entity,

View File

@ -118,6 +118,11 @@ class AppGenerateEntity(BaseModel):
# Note: The `user_id` field does not indicate whether the user is a platform user or an end user.
user_id: str
# Session identifier used as system_user_id in workflow execution.
# For external API calls (SERVICE_API / WEB_APP): holds EndUser.session_id.
# For internal calls (DEBUGGER / EXPLORE): holds Account.id (same as user_id).
user_session_id: str = ""
# extras
stream: bool
invoke_from: InvokeFrom

View File

@ -283,10 +283,11 @@ def test_generate_worker_handles_errors(generator, mocker):
app_config=SimpleNamespace(tenant_id="tenant", app_id="pipe", workflow_id="wf"),
invoke_from=InvokeFrom.WEB_APP,
user_id="user",
user_session_id="session",
)
session = DummySession()
session.scalar.side_effect = [MagicMock(), MagicMock(session_id="session")]
session.scalar.side_effect = [MagicMock()]
_patch_session(mocker, session)
runner_instance = MagicMock()
@ -318,10 +319,11 @@ def test_generate_worker_sets_system_user_id_for_external_call(generator, mocker
app_config=SimpleNamespace(tenant_id="tenant", app_id="pipe", workflow_id="wf"),
invoke_from=InvokeFrom.WEB_APP,
user_id="user",
user_session_id="session",
)
session = DummySession()
session.scalar.side_effect = [MagicMock(), MagicMock(session_id="session")]
session.scalar.side_effect = [MagicMock()]
_patch_session(mocker, session)
runner_instance = MagicMock()

View File

@ -146,13 +146,12 @@ def test_resume_path_runs_worker_with_runtime_state(mocker):
workflow = SimpleNamespace(
id="workflow", tenant_id="tenant", app_id="app", graph_dict={}, type="workflow", version="1"
)
end_user = SimpleNamespace(session_id="end-user-session")
app_record = SimpleNamespace(id="app")
session = MagicMock()
session.__enter__.return_value = session
session.__exit__.return_value = False
session.scalar.side_effect = [workflow, end_user, app_record]
session.scalar.side_effect = [workflow, app_record]
mocker.patch("core.app.apps.workflow.app_generator.session_factory", return_value=session)
runner_instance = MagicMock()
@ -188,6 +187,7 @@ def test_resume_path_runs_worker_with_runtime_state(mocker):
application_generate_entity = SimpleNamespace(
task_id="task",
user_id="user",
user_session_id="end-user-session",
invoke_from="service-api",
app_config=app_config,
files=[],