diff --git a/api/core/llm_generator/llm_generator.py b/api/core/llm_generator/llm_generator.py index f8741729f9..880c0142c2 100644 --- a/api/core/llm_generator/llm_generator.py +++ b/api/core/llm_generator/llm_generator.py @@ -471,7 +471,7 @@ class LLMGenerator: prompt_messages=complete_messages, output_model=CodeNodeStructuredOutput, model_parameters=model_parameters, - stream=False, + stream=True, tenant_id=tenant_id, ) @@ -560,7 +560,7 @@ class LLMGenerator: prompt_messages=prompt_messages, output_model=SuggestedQuestionsOutput, model_parameters=completion_params, - stream=False, + stream=True, tenant_id=tenant_id, ) @@ -849,7 +849,7 @@ Generate {language} code to extract/transform available variables for the target prompt_messages=list(prompt_messages), output_model=InstructionModifyOutput, model_parameters=model_parameters, - stream=False, + stream=True, ) return response.model_dump(mode="python") except InvokeError as e: diff --git a/api/core/llm_generator/output_parser/structured_output.py b/api/core/llm_generator/output_parser/structured_output.py index d26c8b3cc3..9122519854 100644 --- a/api/core/llm_generator/output_parser/structured_output.py +++ b/api/core/llm_generator/output_parser/structured_output.py @@ -262,7 +262,7 @@ def invoke_llm_with_pydantic_model( model_parameters: Mapping | None = None, tools: Sequence[PromptMessageTool] | None = None, stop: list[str] | None = None, - stream: Literal[False] = False, + stream: bool = True, # Some model plugin implementations don't support stream=False user: str | None = None, callbacks: list[Callback] | None = None, tenant_id: str | None = None, @@ -271,12 +271,46 @@ def invoke_llm_with_pydantic_model( Invoke large language model with a Pydantic output model. This helper generates a JSON schema from the Pydantic model, invokes the - structured-output LLM path, and validates the result in non-streaming mode. - """ - if stream: - raise ValueError("invoke_llm_with_pydantic_model only supports stream=False") + structured-output LLM path, and validates the result. + The stream parameter controls the underlying LLM invocation mode: + - stream=True (default): Uses streaming LLM call, consumes the generator internally + - stream=False: Uses non-streaming LLM call + + In both cases, the function returns the validated Pydantic model directly. + """ json_schema = _schema_from_pydantic(output_model) + + if stream: + result_generator = invoke_llm_with_structured_output( + provider=provider, + model_schema=model_schema, + model_instance=model_instance, + prompt_messages=prompt_messages, + json_schema=json_schema, + model_parameters=model_parameters, + tools=tools, + stop=stop, + stream=True, + user=user, + callbacks=callbacks, + tenant_id=tenant_id, + ) + + # Consume the generator to get the final chunk with structured_output + last_chunk: LLMResultChunkWithStructuredOutput | None = None + for chunk in result_generator: + last_chunk = chunk + + if last_chunk is None: + raise OutputParserError("No chunks received from LLM") + + structured_output = last_chunk.structured_output + if structured_output is None: + raise OutputParserError("Structured output is empty") + + return _validate_structured_output(output_model, structured_output) + result = invoke_llm_with_structured_output( provider=provider, model_schema=model_schema, @@ -296,8 +330,7 @@ def invoke_llm_with_pydantic_model( if structured_output is None: raise OutputParserError("Structured output is empty") - validated_output = _validate_structured_output(output_model, structured_output) - return output_model.model_validate(validated_output) + return _validate_structured_output(output_model, structured_output) def _schema_from_pydantic(output_model: type[BaseModel]) -> dict[str, Any]: diff --git a/api/tests/unit_tests/utils/structured_output_parser/test_structured_output_parser.py b/api/tests/unit_tests/utils/structured_output_parser/test_structured_output_parser.py index e6e3448534..52af2e8d7a 100644 --- a/api/tests/unit_tests/utils/structured_output_parser/test_structured_output_parser.py +++ b/api/tests/unit_tests/utils/structured_output_parser/test_structured_output_parser.py @@ -475,7 +475,7 @@ class ExampleOutput(BaseModel): name: str -def test_structured_output_with_pydantic_model(): +def test_structured_output_with_pydantic_model_non_streaming(): model_schema = get_model_entity("openai", "gpt-4o", support_structure_output=True) model_instance = get_model_instance() model_instance.invoke_llm.return_value = LLMResult( @@ -499,19 +499,45 @@ def test_structured_output_with_pydantic_model(): assert result.name == "test" -def test_structured_output_with_pydantic_model_streaming_rejected(): +def test_structured_output_with_pydantic_model_streaming(): model_schema = get_model_entity("openai", "gpt-4o", support_structure_output=True) model_instance = get_model_instance() - with pytest.raises(ValueError): - invoke_llm_with_pydantic_model( - provider="openai", - model_schema=model_schema, - model_instance=model_instance, + def mock_streaming_response(): + yield LLMResultChunk( + model="gpt-4o", prompt_messages=[UserPromptMessage(content="test")], - output_model=ExampleOutput, - stream=True, + system_fingerprint="test", + delta=LLMResultChunkDelta( + index=0, + message=AssistantPromptMessage(content='{"name":'), + usage=create_mock_usage(prompt_tokens=8, completion_tokens=2), + ), ) + yield LLMResultChunk( + model="gpt-4o", + prompt_messages=[UserPromptMessage(content="test")], + system_fingerprint="test", + delta=LLMResultChunkDelta( + index=0, + message=AssistantPromptMessage(content=' "test"}'), + usage=create_mock_usage(prompt_tokens=8, completion_tokens=4), + ), + ) + + model_instance.invoke_llm.return_value = mock_streaming_response() + + result = invoke_llm_with_pydantic_model( + provider="openai", + model_schema=model_schema, + model_instance=model_instance, + prompt_messages=[UserPromptMessage(content="Return a JSON object with name.")], + output_model=ExampleOutput, + stream=True, + ) + + assert isinstance(result, ExampleOutput) + assert result.name == "test" def test_structured_output_with_pydantic_model_validation_error():