This commit is contained in:
Ethan T. 2026-03-24 09:18:12 +08:00 committed by GitHub
commit b54cb0e402
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 22 additions and 2 deletions

View File

@ -188,6 +188,19 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
if stream_response.metadata:
extras["metadata"] = stream_response.metadata
answer = self._task_state.answer
# Fallback: if no text chunks were captured during streaming but the
# graph runtime state contains an "answer" output (set by the Answer
# node via _update_response_outputs), use that instead. This covers
# scenarios where the response-coordinator does not emit streaming
# chunks for certain variable types (e.g. workflow-tool outputs fed
# directly into an Answer node).
if not answer and self._graph_runtime_state is not None:
runtime_answer = self._graph_runtime_state.get_output("answer")
if runtime_answer and isinstance(runtime_answer, str):
answer = runtime_answer
return ChatbotAppBlockingResponse(
task_id=stream_response.task_id,
data=ChatbotAppBlockingResponse.Data(
@ -195,7 +208,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
mode=self._conversation_mode,
conversation_id=self._conversation_id,
message_id=self._message_id,
answer=self._task_state.answer,
answer=answer,
created_at=self._message_created_at,
**extras,
),

View File

@ -90,7 +90,14 @@ class AppQueueManager(ABC):
"""
self._clear_task_belong_cache()
self._q.put(None)
self._graph_runtime_state = None # Release reference to allow GC to reclaim memory
# NOTE: Do NOT clear self._graph_runtime_state here.
# stop_listen() is called from the worker thread (via _publish) immediately
# after enqueueing a terminal event. The consumer thread may not have processed
# the preceding events yet and still needs to read graph_runtime_state via
# _resolve_graph_runtime_state(). Clearing it here causes a race condition
# where the consumer sees None and either raises ValueError or produces an
# empty response. The reference will be released when the queue manager is
# garbage-collected after both threads are done.
def _clear_task_belong_cache(self) -> None:
"""