feat: Parse the expression to get the input parameters for the evaluation workflow.

This commit is contained in:
FFXN
2026-03-13 09:45:13 +08:00
parent 4555c98d30
commit 18198b88ff
3 changed files with 139 additions and 125 deletions

View File

@@ -1,4 +1,3 @@
import json
import logging
from abc import ABC, abstractmethod
from collections.abc import Mapping
@@ -6,7 +5,6 @@ from typing import Any
from core.evaluation.entities.evaluation_entity import (
CustomizedMetrics,
DefaultMetric,
EvaluationCategory,
EvaluationItemInput,
EvaluationItemResult,
@@ -74,15 +72,17 @@ class BaseEvaluationInstance(ABC):
becomes the score.
Args:
items: Evaluation items with inputs, expected_output, context.
results: Results from Phase 1 (with actual_output populated).
customized_metrics: Must contain ``evaluation_workflow_id``
pointing to a published WORKFLOW-type App.
node_run_result_mapping_list: One mapping per test-data item,
where each mapping is ``{node_id: NodeRunResult}`` from the
target execution.
customized_metrics: Contains ``evaluation_workflow_id`` (the
published evaluator workflow) and ``input_fields`` (value
sources for the evaluator's input variables).
tenant_id: Tenant scope.
Returns:
A list of ``EvaluationItemResult`` with metrics extracted from
the workflow outputs.
the evaluator workflow's output variables.
"""
from sqlalchemy.orm import Session
@@ -93,7 +93,7 @@ class BaseEvaluationInstance(ABC):
from models.model import App
from services.workflow_service import WorkflowService
workflow_id = customized_metrics.get("evaluation_workflow_id")
workflow_id = customized_metrics.evaluation_workflow_id
if not workflow_id:
raise ValueError(
"customized_metrics must contain 'evaluation_workflow_id' for customized evaluator"
@@ -118,9 +118,11 @@ class BaseEvaluationInstance(ABC):
)
eval_results: list[EvaluationItemResult] = []
for node_run_result_mapping in node_run_result_mapping_list:
for idx, node_run_result_mapping in enumerate(node_run_result_mapping_list):
try:
workflow_inputs = self._build_workflow_inputs(customized_metrics.input_fields, node_run_result_mapping)
workflow_inputs = self._build_workflow_inputs(
customized_metrics.input_fields, node_run_result_mapping,
)
generator = WorkflowAppGenerator()
response: Mapping[str, Any] = generator.generate(
@@ -130,25 +132,23 @@ class BaseEvaluationInstance(ABC):
args={"inputs": workflow_inputs},
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
call_depth=0,
)
metrics = self._extract_workflow_metrics(response)
eval_results.append(
EvaluationItemResult(
index=item.index,
index=idx,
metrics=metrics,
metadata={
"workflow_response": _safe_serialize(response),
},
)
)
except Exception:
logger.exception(
"Customized evaluator failed for item %d with workflow %s",
item.index,
idx,
workflow_id,
)
eval_results.append(EvaluationItemResult(index=item.index))
eval_results.append(EvaluationItemResult(index=idx))
return eval_results
@@ -157,72 +157,126 @@ class BaseEvaluationInstance(ABC):
input_fields: dict[str, Any],
node_run_result_mapping: dict[str, NodeRunResult],
) -> dict[str, Any]:
"""Build workflow input dict from evaluation data.
"""Build customized workflow inputs by resolving value sources.
Each entry in ``input_fields`` maps a workflow input variable name
to its value source, which can be:
- **Constant**: a plain string without ``{{#…#}}`` used as-is.
- **Expression**: a string containing one or more
``{{#node_id.output_key#}}`` selectors (same format as
``VariableTemplateParser``) resolved from
``node_run_result_mapping``.
Maps evaluation data to conventional workflow input variable names:
- ``actual_output``: The target's actual output (from ``result``).
- ``expected_output``: The expected/reference output.
- ``inputs``: The original evaluation inputs as a JSON string.
- ``context``: All context strings joined by newlines.
"""
from core.workflow.nodes.base.variable_template_parser import REGEX as VARIABLE_REGEX
workflow_inputs: dict[str, Any] = {}
if result and result.actual_output:
workflow_inputs["actual_output"] = result.actual_output
for field_name, value_source in input_fields.items():
if not isinstance(value_source, str):
# Non-string values (numbers, bools, dicts) are used directly.
workflow_inputs[field_name] = value_source
continue
if item.expected_output:
workflow_inputs["expected_output"] = item.expected_output
if item.inputs:
workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False)
if item.context:
workflow_inputs["context"] = "\n\n".join(item.context)
# Check if the entire value is a single expression.
full_match = VARIABLE_REGEX.fullmatch(value_source)
if full_match:
workflow_inputs[field_name] = _resolve_variable_selector(
full_match.group(1), node_run_result_mapping,
)
elif VARIABLE_REGEX.search(value_source):
# Mixed template: interpolate all expressions as strings.
workflow_inputs[field_name] = VARIABLE_REGEX.sub(
lambda m: str(
_resolve_variable_selector(m.group(1), node_run_result_mapping)
),
value_source,
)
else:
# Plain constant — no expression markers.
workflow_inputs[field_name] = value_source
return workflow_inputs
@staticmethod
def _extract_workflow_metrics(
response: Mapping[str, Any],
response: Mapping[str, object],
) -> list[EvaluationMetric]:
"""Extract evaluation metrics from workflow output variables.
Each output variable is treated as a metric. The variable name
becomes the metric name, and its value becomes the score.
Non-numeric values are recorded with ``score=0.0`` and the raw
value stored in ``details``.
becomes the metric name, and its value is stored as-is regardless
of type (numeric, string, dict, etc.).
"""
metrics: list[EvaluationMetric] = []
data = response.get("data", {})
data = response.get("data")
if not isinstance(data, Mapping):
logger.warning("Unexpected workflow response format: missing 'data' dict")
return metrics
outputs = data.get("outputs", {})
if not isinstance(outputs, Mapping):
outputs = data.get("outputs")
if not isinstance(outputs, dict):
logger.warning(
"Unexpected workflow response format: 'outputs' is not a dict"
)
return metrics
for key, value in outputs.items():
try:
score = float(value)
metrics.append(EvaluationMetric(name=key, score=score))
except (TypeError, ValueError):
metrics.append(
EvaluationMetric(
name=key, score=0.0, details={"raw_value": value}
)
)
for key, raw_value in outputs.items():
if not isinstance(key, str):
continue
metrics.append(EvaluationMetric(name=key, value=raw_value))
return metrics
def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]:
"""Safely serialize workflow response for metadata storage."""
try:
return dict(response)
except Exception:
return {"raw": str(response)}
def _resolve_variable_selector(
selector_raw: str,
node_run_result_mapping: dict[str, NodeRunResult],
) -> object:
"""Resolve a ``#node_id.output_key#`` selector against node run results.
Returns the resolved value in its original type, or an empty string
if the node or any key along the path is not found.
"""
# "#node_id.output_key#" → "node_id.output_key"
cleaned = selector_raw.strip("#")
parts = cleaned.split(".")
if len(parts) < 2:
logger.warning(
"Selector '%s' must have at least node_id.output_key", selector_raw,
)
return ""
node_id = parts[0]
output_path = parts[1:]
node_result = node_run_result_mapping.get(node_id)
if not node_result or not node_result.outputs:
logger.warning(
"Selector '%s': node '%s' not found or has no outputs",
selector_raw, node_id,
)
return ""
# Traverse the output path to support nested keys.
current: object = node_result.outputs
for key in output_path:
if isinstance(current, Mapping):
next_val = current.get(key)
if next_val is None:
logger.warning(
"Selector '%s': key '%s' not found in node '%s' outputs",
selector_raw, key, node_id,
)
return ""
current = next_val
else:
logger.warning(
"Selector '%s': cannot traverse into non-dict value at key '%s'",
selector_raw, key,
)
return ""
return current if current is not None else ""

View File

@@ -39,6 +39,8 @@ class EvaluationItemResult(BaseModel):
index: int
actual_output: str | None = None
metrics: list[EvaluationMetric] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
judgment: JudgmentResult | None = None
error: str | None = None
@property

View File

@@ -11,7 +11,6 @@ Orchestrates the evaluation lifecycle in four phases:
import json
import logging
from abc import ABC, abstractmethod
from typing import Any
from sqlalchemy.orm import Session
@@ -19,12 +18,10 @@ from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.evaluation_entity import (
CustomizedMetrics,
DefaultMetric,
EvaluationItemInput,
EvaluationItemResult,
)
from core.evaluation.entities.judgment_entity import JudgmentConfig
from core.evaluation.judgment.processor import JudgmentProcessor
from core.workflow.enums import WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from libs.datetime_utils import naive_utc_now
from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus
@@ -79,7 +76,7 @@ class BaseEvaluationRunner(ABC):
evaluation_run.started_at = naive_utc_now()
self.session.commit()
results: list[EvaluationItemResult] = []
results_by_index: dict[int, EvaluationItemResult] = {}
# Phase 1: run evaluation
if default_metric and node_run_result_list:
@@ -93,22 +90,30 @@ class BaseEvaluationRunner(ABC):
model_name=model_name,
tenant_id=tenant_id,
)
# Merge evaluated metrics back into results
evaluated_by_index = {r.index: r for r in evaluated_results}
for i, result in enumerate(results):
if result.index in evaluated_by_index:
results[i] = evaluated_by_index[result.index]
for r in evaluated_results:
results_by_index[r.index] = r
except Exception:
logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id)
if customized_metrics and node_run_result_mapping_list:
try:
evaluated_results = self._evaluate_customized(
customized_results = self.evaluation_instance.evaluate_with_customized_workflow(
node_run_result_mapping_list=node_run_result_mapping_list,
customized_metrics=customized_metrics,
tenant_id=tenant_id,
)
for r in customized_results:
existing = results_by_index.get(r.index)
if existing:
# Merge: combine metrics from both sources into one result
results_by_index[r.index] = existing.model_copy(
update={"metrics": existing.metrics + r.metrics}
)
else:
results_by_index[r.index] = r
except Exception:
logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id)
logger.exception("Failed to compute customized metrics for evaluation run %s", evaluation_run_id)
results = list(results_by_index.values())
# Phase 4: Persist individual items
for result in results:
@@ -132,79 +137,32 @@ class BaseEvaluationRunner(ABC):
return results
def _evaluate_customized(
self,
node_run_result_mapping_list: list[dict[str, NodeRunResult]],
customized_metrics: CustomizedMetrics,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Delegate to the instance's customized workflow evaluator.
Unlike the framework path (which merges ``actual_output`` into
``context``), here we pass ``results`` directly — the instance's
``evaluate_with_customized_workflow()`` reads ``actual_output``
from each ``EvaluationItemResult``.
"""
evaluated_results = self.evaluation_instance.evaluate_with_customized_workflow(
node_run_result_mapping_list=node_run_result_mapping_list,
customized_metrics=customized_metrics,
tenant_id=tenant_id,
)
# Merge metrics back preserving actual_output and metadata from Phase 1
eval_by_index = {r.index: r for r in evaluated}
final_results: list[EvaluationItemResult] = []
for result in results:
if result.index in eval_by_index:
eval_result = eval_by_index[result.index]
final_results.append(
EvaluationItemResult(
index=result.index,
actual_output=result.actual_output,
metrics=eval_result.metrics,
metadata={**result.metadata, **eval_result.metadata},
error=eval_result.error,
)
)
else:
final_results.append(result)
return final_results
@staticmethod
def _apply_judgment(
results: list[EvaluationItemResult],
items: list[EvaluationItemInput],
judgment_config: JudgmentConfig,
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None = None,
) -> list[EvaluationItemResult]:
"""Apply judgment conditions to each result's metrics.
Builds a metric_name → value mapping from each result's metrics,
and a variable_values dict from the evaluation target's runtime data
(inputs, actual_output, expected_output) for variable-type conditions.
Results with errors are skipped.
"""
items_by_index = {item.index: item for item in items}
judged_results: list[EvaluationItemResult] = []
for result in results:
for idx, result in enumerate(results):
if result.error is not None or not result.metrics:
judged_results.append(result)
continue
metric_values: dict[str, object] = {m.name: m.score for m in result.metrics}
# Left side: only metrics
metric_values: dict[str, object] = {m.name: m.value for m in result.metrics}
# Build variable pool from the evaluation target's runtime data.
# These variables can be referenced in conditions with value_source="variable".
item_input = items_by_index.get(result.index)
variable_values: dict[str, object] = {}
if item_input:
variable_values.update(item_input.inputs)
if item_input.expected_output is not None:
variable_values["expected_output"] = item_input.expected_output
if item_input.context:
variable_values["context"] = "; ".join(item_input.context)
if result.actual_output is not None:
variable_values["actual_output"] = result.actual_output
# Right side variable pool: metrics + intermediate node run results
variable_values: dict[str, object] = dict(metric_values)
if node_run_result_mapping_list and idx < len(node_run_result_mapping_list):
node_run_result_mapping = node_run_result_mapping_list[idx]
for node_id, node_result in node_run_result_mapping.items():
if node_result.outputs:
for output_key, output_value in node_result.outputs.items():
variable_values[f"{node_id}.{output_key}"] = output_value
judgment_result = JudgmentProcessor.evaluate(
metric_values, judgment_config, variable_values=variable_values