From e73816d3b399d3d1d44901fbcd8c7b154d20fd18 Mon Sep 17 00:00:00 2001 From: gambletan Date: Thu, 19 Mar 2026 21:56:47 +0800 Subject: [PATCH] fix: resolve empty answer in blocking mode API responses Fixes langgenius/dify#33721 Two root causes addressed: 1. Race condition in AppQueueManager.stop_listen(): the method eagerly set graph_runtime_state to None for GC, but stop_listen() is called from the worker thread immediately after enqueueing a terminal event. The consumer thread may not have cached the runtime state reference yet, causing _resolve_graph_runtime_state() to see None and either raise ValueError or produce empty outputs. 2. Chatflow (advanced chat) blocking mode built the answer exclusively from QueueTextChunkEvent streaming chunks. When the response coordinator did not emit text chunks for certain variable types (e.g. workflow-tool outputs fed directly into an Answer node), the accumulated answer stayed empty despite the graph runtime state containing the correct value. Added a fallback that reads the answer from graph_runtime_state.outputs when text chunks are missing. Signed-off-by: Tan --- .../apps/advanced_chat/generate_task_pipeline.py | 15 ++++++++++++++- api/core/app/apps/base_app_queue_manager.py | 9 ++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index a1cb375e24..d228840d66 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -188,6 +188,19 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): if stream_response.metadata: extras["metadata"] = stream_response.metadata + answer = self._task_state.answer + + # Fallback: if no text chunks were captured during streaming but the + # graph runtime state contains an "answer" output (set by the Answer + # node via _update_response_outputs), use that instead. This covers + # scenarios where the response-coordinator does not emit streaming + # chunks for certain variable types (e.g. workflow-tool outputs fed + # directly into an Answer node). + if not answer and self._graph_runtime_state is not None: + runtime_answer = self._graph_runtime_state.get_output("answer") + if runtime_answer and isinstance(runtime_answer, str): + answer = runtime_answer + return ChatbotAppBlockingResponse( task_id=stream_response.task_id, data=ChatbotAppBlockingResponse.Data( @@ -195,7 +208,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): mode=self._conversation_mode, conversation_id=self._conversation_id, message_id=self._message_id, - answer=self._task_state.answer, + answer=answer, created_at=self._message_created_at, **extras, ), diff --git a/api/core/app/apps/base_app_queue_manager.py b/api/core/app/apps/base_app_queue_manager.py index 5addd41815..7580b1e498 100644 --- a/api/core/app/apps/base_app_queue_manager.py +++ b/api/core/app/apps/base_app_queue_manager.py @@ -90,7 +90,14 @@ class AppQueueManager(ABC): """ self._clear_task_belong_cache() self._q.put(None) - self._graph_runtime_state = None # Release reference to allow GC to reclaim memory + # NOTE: Do NOT clear self._graph_runtime_state here. + # stop_listen() is called from the worker thread (via _publish) immediately + # after enqueueing a terminal event. The consumer thread may not have processed + # the preceding events yet and still needs to read graph_runtime_state via + # _resolve_graph_runtime_state(). Clearing it here causes a race condition + # where the consumer sees None and either raises ValueError or produces an + # empty response. The reference will be released when the queue manager is + # garbage-collected after both threads are done. def _clear_task_belong_cache(self) -> None: """