diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index f7b5030d33..cb7ec91115 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -188,6 +188,19 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): if stream_response.metadata: extras["metadata"] = stream_response.metadata + answer = self._task_state.answer + + # Fallback: if no text chunks were captured during streaming but the + # graph runtime state contains an "answer" output (set by the Answer + # node via _update_response_outputs), use that instead. This covers + # scenarios where the response-coordinator does not emit streaming + # chunks for certain variable types (e.g. workflow-tool outputs fed + # directly into an Answer node). + if not answer and self._graph_runtime_state is not None: + runtime_answer = self._graph_runtime_state.get_output("answer") + if runtime_answer and isinstance(runtime_answer, str): + answer = runtime_answer + return ChatbotAppBlockingResponse( task_id=stream_response.task_id, data=ChatbotAppBlockingResponse.Data( @@ -195,7 +208,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): mode=self._conversation_mode, conversation_id=self._conversation_id, message_id=self._message_id, - answer=self._task_state.answer, + answer=answer, created_at=self._message_created_at, **extras, ), diff --git a/api/core/app/apps/base_app_queue_manager.py b/api/core/app/apps/base_app_queue_manager.py index 5addd41815..7580b1e498 100644 --- a/api/core/app/apps/base_app_queue_manager.py +++ b/api/core/app/apps/base_app_queue_manager.py @@ -90,7 +90,14 @@ class AppQueueManager(ABC): """ self._clear_task_belong_cache() self._q.put(None) - self._graph_runtime_state = None # Release reference to allow GC to reclaim memory + # NOTE: Do NOT clear self._graph_runtime_state here. + # stop_listen() is called from the worker thread (via _publish) immediately + # after enqueueing a terminal event. The consumer thread may not have processed + # the preceding events yet and still needs to read graph_runtime_state via + # _resolve_graph_runtime_state(). Clearing it here causes a race condition + # where the consumer sees None and either raises ValueError or produces an + # empty response. The reference will be released when the queue manager is + # garbage-collected after both threads are done. def _clear_task_belong_cache(self) -> None: """