feat(tasks): isolate summary generation to dedicated dataset_summary queue (#32972)

This commit is contained in:
eux 2026-03-06 14:35:28 +08:00 committed by GitHub
parent 0490756ab2
commit e74cda6535
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 49 additions and 8 deletions

View File

@ -7,7 +7,7 @@ cd web && pnpm install
pipx install uv
echo "alias start-api=\"cd $WORKSPACE_ROOT/api && uv run python -m flask run --host 0.0.0.0 --port=5001 --debug\"" >> ~/.bashrc
echo "alias start-worker=\"cd $WORKSPACE_ROOT/api && uv run python -m celery -A app.celery worker -P threads -c 1 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention\"" >> ~/.bashrc
echo "alias start-worker=\"cd $WORKSPACE_ROOT/api && uv run python -m celery -A app.celery worker -P threads -c 1 --loglevel INFO -Q dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention\"" >> ~/.bashrc
echo "alias start-web=\"cd $WORKSPACE_ROOT/web && pnpm dev:inspect\"" >> ~/.bashrc
echo "alias start-web-prod=\"cd $WORKSPACE_ROOT/web && pnpm build && pnpm start\"" >> ~/.bashrc
echo "alias start-containers=\"cd $WORKSPACE_ROOT/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env up -d\"" >> ~/.bashrc

View File

@ -37,7 +37,7 @@
"-c",
"1",
"-Q",
"dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution",
"dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution",
"--loglevel",
"INFO"
],

View File

@ -35,10 +35,10 @@ if [[ "${MODE}" == "worker" ]]; then
if [[ -z "${CELERY_QUEUES}" ]]; then
if [[ "${EDITION}" == "CLOUD" ]]; then
# Cloud edition: separate queues for dataset and trigger tasks
DEFAULT_QUEUES="api_token,dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution"
DEFAULT_QUEUES="api_token,dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution"
else
# Community edition (SELF_HOSTED): dataset, pipeline and workflow have separate queues
DEFAULT_QUEUES="api_token,dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution"
DEFAULT_QUEUES="api_token,dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution"
fi
else
DEFAULT_QUEUES="${CELERY_QUEUES}"

View File

@ -14,7 +14,7 @@ from services.summary_index_service import SummaryIndexService
logger = logging.getLogger(__name__)
@shared_task(queue="dataset")
@shared_task(queue="dataset_summary")
def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids: list[str] | None = None):
"""
Async generate summary index for document segments.

View File

@ -16,7 +16,7 @@ from services.summary_index_service import SummaryIndexService
logger = logging.getLogger(__name__)
@shared_task(queue="dataset")
@shared_task(queue="dataset_summary")
def regenerate_summary_index_task(
dataset_id: str,
regenerate_reason: str = "summary_model_changed",

View File

@ -0,0 +1,40 @@
"""
Unit tests for summary index task queue isolation.
These tasks must NOT run on the shared 'dataset' queue because they invoke LLMs
for each document segment and can occupy all worker slots for hours, blocking
document indexing tasks.
"""
import pytest
from tasks.generate_summary_index_task import generate_summary_index_task
from tasks.regenerate_summary_index_task import regenerate_summary_index_task
SUMMARY_QUEUE = "dataset_summary"
INDEXING_QUEUE = "dataset"
def _task_queue(task) -> str | None:
# Celery's @shared_task(queue=...) stores the routing key on the task instance
# at runtime, but type stubs don't declare it; use getattr to stay type-clean.
return getattr(task, "queue", None)
@pytest.mark.parametrize(
("task", "task_name"),
[
(generate_summary_index_task, "generate_summary_index_task"),
(regenerate_summary_index_task, "regenerate_summary_index_task"),
],
)
def test_summary_task_uses_dedicated_queue(task, task_name):
"""Summary tasks must use the dataset_summary queue, not the shared dataset queue.
Summary generation is LLM-heavy and will block document indexing if placed
on the shared queue.
"""
assert _task_queue(task) == SUMMARY_QUEUE, (
f"{task_name} must run on '{SUMMARY_QUEUE}' queue (not '{INDEXING_QUEUE}'). "
"Summary generation is LLM-heavy and will block document indexing if placed on the shared queue."
)

View File

@ -21,6 +21,7 @@ show_help() {
echo ""
echo "Available queues:"
echo " dataset - RAG indexing and document processing"
echo " dataset_summary - LLM-heavy summary index generation (isolated from indexing)"
echo " workflow - Workflow triggers (community edition)"
echo " workflow_professional - Professional tier workflows (cloud edition)"
echo " workflow_team - Team tier workflows (cloud edition)"
@ -106,10 +107,10 @@ if [[ -z "${QUEUES}" ]]; then
# Configure queues based on edition
if [[ "${EDITION}" == "CLOUD" ]]; then
# Cloud edition: separate queues for dataset and trigger tasks
QUEUES="dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution"
QUEUES="dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution"
else
# Community edition (SELF_HOSTED): dataset and workflow have separate queues
QUEUES="dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution"
QUEUES="dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution"
fi
echo "No queues specified, using edition-based defaults: ${QUEUES}"