diff --git a/api/controllers/console/evaluation/evaluation.py b/api/controllers/console/evaluation/evaluation.py index 42b18eda664..dff9e3230a8 100644 --- a/api/controllers/console/evaluation/evaluation.py +++ b/api/controllers/console/evaluation/evaluation.py @@ -155,6 +155,29 @@ available_evaluation_workflow_pagination_model = console_ns.model( available_evaluation_workflow_pagination_fields, ) +evaluation_default_metric_node_info_fields = { + "node_id": fields.String, + "type": fields.String, + "title": fields.String, +} +evaluation_default_metric_item_fields = { + "metric": fields.String, + "value_type": fields.String, + "node_info_list": fields.List( + fields.Nested( + console_ns.model("EvaluationDefaultMetricNodeInfo", evaluation_default_metric_node_info_fields), + ), + ), +} +evaluation_default_metrics_response_model = console_ns.model( + "EvaluationDefaultMetricsResponse", + { + "default_metrics": fields.List( + fields.Nested(console_ns.model("EvaluationDefaultMetricItem", evaluation_default_metric_item_fields)), + ), + }, +) + def get_evaluation_target(view_func: Callable[P, R]): """ @@ -517,6 +540,32 @@ class EvaluationMetricsApi(Resource): return {"metrics": result} +@console_ns.route("///evaluation/default-metrics") +class EvaluationDefaultMetricsApi(Resource): + @console_ns.doc( + "get_evaluation_default_metrics_with_nodes", + description=( + "List default metrics supported by the current evaluation framework with matching nodes " + "from the target's published workflow only (draft is ignored)." + ), + ) + @console_ns.response( + 200, + "Default metrics and node candidates for the published workflow", + evaluation_default_metrics_response_model, + ) + @setup_required + @login_required + @account_initialization_required + @get_evaluation_target + def get(self, target: Union[App, CustomizedSnippet], target_type: str): + default_metrics = EvaluationService.get_default_metrics_with_nodes_for_published_target( + target=target, + target_type=target_type, + ) + return {"default_metrics": [m.model_dump() for m in default_metrics]} + + @console_ns.route("///evaluation/node-info") class EvaluationNodeInfoApi(Resource): @console_ns.doc("get_evaluation_node_info") @@ -706,6 +755,71 @@ class AvailableEvaluationWorkflowsApi(Resource): ) +@console_ns.route("/workspaces/current/evaluation-workflows//associated-targets") +class EvaluationWorkflowAssociatedTargetsApi(Resource): + @console_ns.doc("list_evaluation_workflow_associated_targets") + @console_ns.doc( + description="List targets (apps / snippets / knowledge bases) that use the given workflow as customized metrics" + ) + @setup_required + @login_required + @account_initialization_required + @edit_permission_required + def get(self, workflow_id: str): + """Return all evaluation targets that reference this workflow as customized metrics.""" + _, current_tenant_id = current_account_with_tenant() + + with Session(db.engine) as session: + configs = EvaluationService.list_targets_by_customized_workflow( + session=session, + tenant_id=current_tenant_id, + customized_workflow_id=workflow_id, + ) + + target_ids_by_type: dict[str, list[str]] = {} + for cfg in configs: + target_ids_by_type.setdefault(cfg.target_type, []).append(cfg.target_id) + + app_names: dict[str, str] = {} + if "app" in target_ids_by_type: + apps = session.scalars(select(App).where(App.id.in_(target_ids_by_type["app"]))).all() + app_names = {a.id: a.name for a in apps} + + snippet_names: dict[str, str] = {} + if "snippets" in target_ids_by_type: + snippets = session.scalars( + select(CustomizedSnippet).where(CustomizedSnippet.id.in_(target_ids_by_type["snippets"])) + ).all() + snippet_names = {s.id: s.name for s in snippets} + + dataset_names: dict[str, str] = {} + if "knowledge_base" in target_ids_by_type: + datasets = session.scalars( + select(Dataset).where(Dataset.id.in_(target_ids_by_type["knowledge_base"])) + ).all() + dataset_names = {d.id: d.name for d in datasets} + + items = [] + for cfg in configs: + name = "" + if cfg.target_type == "app": + name = app_names.get(cfg.target_id, "") + elif cfg.target_type == "snippets": + name = snippet_names.get(cfg.target_id, "") + elif cfg.target_type == "knowledge_base": + name = dataset_names.get(cfg.target_id, "") + + items.append( + { + "target_type": cfg.target_type, + "target_id": cfg.target_id, + "target_name": name, + } + ) + + return {"items": items}, 200 + + # ---- Serialization Helpers ---- diff --git a/api/core/evaluation/base_evaluation_instance.py b/api/core/evaluation/base_evaluation_instance.py index 2f114ec8491..67fbf0374cf 100644 --- a/api/core/evaluation/base_evaluation_instance.py +++ b/api/core/evaluation/base_evaluation_instance.py @@ -9,6 +9,7 @@ from core.evaluation.entities.evaluation_entity import ( EvaluationItemInput, EvaluationItemResult, EvaluationMetric, + NodeInfo, ) from graphon.node_events.base import NodeRunResult @@ -128,7 +129,7 @@ class BaseEvaluationInstance(ABC): call_depth=0, ) - metrics = self._extract_workflow_metrics(response) + metrics = self._extract_workflow_metrics(response, workflow_id) eval_results.append( EvaluationItemResult( index=idx, @@ -194,9 +195,16 @@ class BaseEvaluationInstance(ABC): @staticmethod def _extract_workflow_metrics( response: Mapping[str, object], + evaluation_workflow_id: str, ) -> list[EvaluationMetric]: - """Extract evaluation metrics from workflow output variables.""" + """Extract evaluation metrics from workflow output variables. + + Each metric's ``node_info`` is set with *evaluation_workflow_id* as + the ``node_id``, so that judgment conditions can reference customized + metrics via ``variable_selector: [evaluation_workflow_id, metric_name]``. + """ metrics: list[EvaluationMetric] = [] + node_info = NodeInfo(node_id=evaluation_workflow_id, type="customized", title="customized") data = response.get("data") if not isinstance(data, Mapping): @@ -211,7 +219,7 @@ class BaseEvaluationInstance(ABC): for key, raw_value in outputs.items(): if not isinstance(key, str): continue - metrics.append(EvaluationMetric(name=key, value=raw_value)) + metrics.append(EvaluationMetric(name=key, value=raw_value, node_info=node_info)) return metrics diff --git a/api/core/evaluation/entities/evaluation_entity.py b/api/core/evaluation/entities/evaluation_entity.py index 37ae184b8f0..a87354b5267 100644 --- a/api/core/evaluation/entities/evaluation_entity.py +++ b/api/core/evaluation/entities/evaluation_entity.py @@ -129,11 +129,30 @@ METRIC_NODE_TYPE_MAPPING: dict[str, str] = { **{m.value: "agent" for m in AGENT_METRIC_NAMES}, } +METRIC_VALUE_TYPE_MAPPING: dict[str, str] = { + EvaluationMetricName.FAITHFULNESS: "number", + EvaluationMetricName.ANSWER_RELEVANCY: "number", + EvaluationMetricName.ANSWER_CORRECTNESS: "number", + EvaluationMetricName.SEMANTIC_SIMILARITY: "number", + EvaluationMetricName.CONTEXT_PRECISION: "number", + EvaluationMetricName.CONTEXT_RECALL: "number", + EvaluationMetricName.CONTEXT_RELEVANCE: "number", + EvaluationMetricName.TOOL_CORRECTNESS: "number", + EvaluationMetricName.TASK_COMPLETION: "number", +} + + +class NodeInfo(BaseModel): + node_id: str + type: str + title: str + class EvaluationMetric(BaseModel): name: str value: Any details: dict[str, Any] = Field(default_factory=dict) + node_info: NodeInfo | None = None class EvaluationItemInput(BaseModel): @@ -159,14 +178,9 @@ class EvaluationItemResult(BaseModel): error: str | None = None -class NodeInfo(BaseModel): - node_id: str - type: str - title: str - - class DefaultMetric(BaseModel): metric: str + value_type: str = "" node_info_list: list[NodeInfo] diff --git a/api/core/evaluation/entities/judgment_entity.py b/api/core/evaluation/entities/judgment_entity.py index ee9bd64a2d2..4a59879c06b 100644 --- a/api/core/evaluation/entities/judgment_entity.py +++ b/api/core/evaluation/entities/judgment_entity.py @@ -1,85 +1,52 @@ """Judgment condition entities for evaluation metric assessment. -Key concepts: - - **condition_type**: Determines operator semantics and type coercion. - - "string": string operators (contains, is, start with, …). - - "number": numeric operators (>, <, =, ≠, ≥, ≤). - - "datetime": temporal operators (before, after). +Condition structure mirrors the workflow if-else ``Condition`` model from +``graphon.utils.condition.entities``. The left-hand side uses +``variable_selector`` — a two-element list ``[node_id, metric_name]`` — to +uniquely identify an evaluation metric (different nodes may produce metrics +with the same name). + +Operators reuse ``SupportedComparisonOperator`` from the workflow engine so +that type semantics stay consistent across the platform. + +Typical usage:: -Typical usage: judgment_config = JudgmentConfig( logical_operator="and", conditions=[ JudgmentCondition( - metric_name="faithfulness", + variable_selector=["node_abc", "faithfulness"], comparison_operator=">", - condition_value="0.8", - condition_type="number", + value="0.8", ) ], ) """ -from enum import StrEnum +from collections.abc import Sequence from typing import Any, Literal from pydantic import BaseModel, Field - -class JudgmentConditionType(StrEnum): - """Category of the condition, controls operator semantics and type coercion.""" - - STRING = "string" - NUMBER = "number" - DATETIME = "datetime" - - -# Supported comparison operators for judgment conditions. -JudgmentComparisonOperator = Literal[ - # string - "contains", - "not contains", - "start with", - "end with", - "is", - "is not", - "empty", - "not empty", - "in", - "not in", - # number - "=", - "≠", - ">", - "<", - "≥", - "≤", - # datetime - "before", - "after", - # universal - "null", - "not null", -] +from graphon.utils.condition.entities import SupportedComparisonOperator class JudgmentCondition(BaseModel): """A single judgment condition that checks one metric value. + Mirrors ``graphon.utils.condition.entities.Condition`` with the left-hand + side being a metric selector instead of a workflow variable selector. + Attributes: - metric_name: The name of the evaluation metric to check (left side). - Must match an EvaluationMetric.name in the results. - comparison_operator: The comparison operator to apply. - condition_value: The comparison target (right side). For unary operators - such as ``empty`` or ``null`` this can be ``None``. - condition_type: Controls type coercion and which operators are valid. - "string" (default), "number", or "datetime". + variable_selector: ``[node_id, metric_name]`` identifying the metric. + comparison_operator: Reuses workflow's ``SupportedComparisonOperator``. + value: The comparison target (right side). For unary operators such + as ``empty`` or ``null`` this can be ``None``. """ - metric_name: str - comparison_operator: JudgmentComparisonOperator - condition_value: Any | None = None - condition_type: JudgmentConditionType = JudgmentConditionType.STRING + variable_selector: list[str] + comparison_operator: SupportedComparisonOperator + value: str | Sequence[str] | bool | None = None class JudgmentConfig(BaseModel): @@ -99,15 +66,15 @@ class JudgmentConditionResult(BaseModel): """Result of evaluating a single judgment condition. Attributes: - metric_name: Which metric was checked. + variable_selector: ``[node_id, metric_name]`` that was checked. comparison_operator: The operator that was applied. - expected_value: The resolved comparison value (after variable resolution). + expected_value: The resolved comparison value. actual_value: The actual metric value that was evaluated. passed: Whether this individual condition passed. error: Error message if the condition evaluation failed. """ - metric_name: str + variable_selector: list[str] comparison_operator: str expected_value: Any = None actual_value: Any = None diff --git a/api/core/evaluation/judgment/processor.py b/api/core/evaluation/judgment/processor.py index 6b56f1cd7f0..7a0ce38b75f 100644 --- a/api/core/evaluation/judgment/processor.py +++ b/api/core/evaluation/judgment/processor.py @@ -1,25 +1,22 @@ """Judgment condition processor for evaluation metrics. Evaluates pass/fail judgment conditions against evaluation metric values. -Each condition uses: - - ``metric_name`` as the left-hand side lookup key from ``metric_values`` - - ``comparison_operator`` as the operator - - ``condition_value`` as the right-hand side comparison value +Each condition uses ``variable_selector`` (``[node_id, metric_name]``) to +look up the metric value, then delegates the actual comparison to the +workflow condition engine (``graphon.utils.condition.processor``). The processor is intentionally decoupled from evaluation frameworks and -runners. It operates on plain ``dict`` mappings and can be invoked anywhere -that already has per-item metric results. +runners. It operates on plain ``dict`` mappings and can be invoked +anywhere that already has per-item metric results. """ import logging from collections.abc import Sequence -from datetime import datetime from typing import Any, cast from core.evaluation.entities.judgment_entity import ( JudgmentCondition, JudgmentConditionResult, - JudgmentConditionType, JudgmentConfig, JudgmentResult, ) @@ -28,22 +25,22 @@ from graphon.utils.condition.processor import _evaluate_condition # pyright: ig logger = logging.getLogger(__name__) -# Operators that do not need a comparison value (unary operators). _UNARY_OPERATORS = frozenset({"null", "not null", "empty", "not empty"}) class JudgmentProcessor: @staticmethod def evaluate( - metric_values: dict[str, Any], + metric_values: dict[tuple[str, str], Any], config: JudgmentConfig, ) -> JudgmentResult: """Evaluate all judgment conditions against the given metric values. Args: - metric_values: Mapping of metric name → metric value - (e.g. ``{"faithfulness": 0.85, "status": "success"}``). - config: The judgment configuration with logical_operator and conditions. + metric_values: Mapping of ``(node_id, metric_name)`` → metric + value (e.g. ``{("node_abc", "faithfulness"): 0.85}``). + config: The judgment configuration with logical_operator and + conditions. Returns: JudgmentResult with overall pass/fail and per-condition details. @@ -74,7 +71,6 @@ class JudgmentProcessor: condition_results=condition_results, ) - # All conditions evaluated if config.logical_operator == "and": final_passed = all(r.passed for r in condition_results) else: @@ -88,207 +84,77 @@ class JudgmentProcessor: @staticmethod def _evaluate_single_condition( - metric_values: dict[str, Any], + metric_values: dict[tuple[str, str], Any], condition: JudgmentCondition, ) -> JudgmentConditionResult: """Evaluate a single judgment condition. Steps: - 1. Look up the metric value (left side) by ``metric_name``. - 2. Read ``condition_value`` as the comparison value (right side). - 3. Dispatch to the correct type handler (string / number / datetime). + 1. Extract ``(node_id, metric_name)`` from ``variable_selector``. + 2. Look up the metric value from ``metric_values``. + 3. Delegate comparison to the workflow condition engine. """ - metric_name = condition.metric_name - actual_value = metric_values.get(metric_name) - - # Handle metric not found — skip for unary operators that work on None - if actual_value is None and condition.comparison_operator not in _UNARY_OPERATORS: + selector = condition.variable_selector + if len(selector) < 2: return JudgmentConditionResult( - metric_name=metric_name, + variable_selector=selector, comparison_operator=condition.comparison_operator, - expected_value=condition.condition_value, + expected_value=condition.value, actual_value=None, passed=False, - error=f"Metric '{metric_name}' not found in evaluation results", + error=f"variable_selector must have at least 2 elements, got {len(selector)}", ) - resolved_value = condition.condition_value + node_id, metric_name = selector[0], selector[1] + actual_value = metric_values.get((node_id, metric_name)) + + if actual_value is None and condition.comparison_operator not in _UNARY_OPERATORS: + return JudgmentConditionResult( + variable_selector=selector, + comparison_operator=condition.comparison_operator, + expected_value=condition.value, + actual_value=None, + passed=False, + error=f"Metric '{metric_name}' on node '{node_id}' not found in evaluation results", + ) - # Dispatch to the appropriate type handler try: - match condition.condition_type: - case JudgmentConditionType.DATETIME: - passed = _evaluate_datetime_condition(actual_value, condition.comparison_operator, resolved_value) - case JudgmentConditionType.NUMBER: - passed = _evaluate_number_condition(actual_value, condition.comparison_operator, resolved_value) - case _: # STRING (default) — delegate to workflow engine - if condition.comparison_operator in {"before", "after"}: - raise ValueError( - f"Operator '{condition.comparison_operator}' is not supported for string conditions" - ) - passed = _evaluate_condition( - operator=cast(SupportedComparisonOperator, condition.comparison_operator), - value=actual_value, - expected=resolved_value, - ) + expected = condition.value + # Numeric operators need the actual value coerced to int/float + # so that the workflow engine's numeric assertions work correctly. + coerced_actual: object = actual_value + if ( + condition.comparison_operator in {"=", "≠", ">", "<", "≥", "≤"} + and actual_value is not None + and not isinstance(actual_value, (int, float, bool)) + ): + coerced_actual = float(actual_value) + + passed = _evaluate_condition( + operator=cast(SupportedComparisonOperator, condition.comparison_operator), + value=coerced_actual, + expected=cast(str | Sequence[str] | bool | Sequence[bool] | None, expected), + ) return JudgmentConditionResult( - metric_name=metric_name, + variable_selector=selector, comparison_operator=condition.comparison_operator, - expected_value=resolved_value, + expected_value=expected, actual_value=actual_value, passed=passed, ) except Exception as e: logger.warning( - "Judgment condition evaluation failed for metric '%s': %s", + "Judgment condition evaluation failed for [%s, %s]: %s", + node_id, metric_name, str(e), ) return JudgmentConditionResult( - metric_name=metric_name, + variable_selector=selector, comparison_operator=condition.comparison_operator, - expected_value=resolved_value, + expected_value=condition.value, actual_value=actual_value, passed=False, error=str(e), ) - - -_DATETIME_FORMATS = [ - "%Y-%m-%dT%H:%M:%S", - "%Y-%m-%dT%H:%M:%S.%f", - "%Y-%m-%dT%H:%M:%SZ", - "%Y-%m-%dT%H:%M:%S.%fZ", - "%Y-%m-%dT%H:%M:%S%z", - "%Y-%m-%d %H:%M:%S", - "%Y-%m-%d", -] - - -def _parse_datetime(value: object) -> datetime: - """Parse a value into a datetime object. - - Accepts datetime instances, numeric timestamps (int/float), and common - ISO 8601 string formats. - - Raises: - ValueError: If the value cannot be parsed as a datetime. - """ - if isinstance(value, datetime): - return value - if isinstance(value, (int, float)): - return datetime.fromtimestamp(value) - if not isinstance(value, str): - raise ValueError(f"Cannot parse '{value}' (type={type(value).__name__}) as datetime") - - for fmt in _DATETIME_FORMATS: - try: - return datetime.strptime(value, fmt) - except ValueError: - continue - - raise ValueError( - f"Cannot parse datetime string '{value}'. " - f"Supported formats: ISO 8601, 'YYYY-MM-DD HH:MM:SS', 'YYYY-MM-DD', or numeric timestamp." - ) - - -def _evaluate_datetime_condition( - actual: object, - operator: str, - expected: object, -) -> bool: - """Evaluate a datetime comparison condition. - - Also supports the universal unary operators (null, not null, empty, not empty) - and the numeric-style operators (=, ≠, >, <, ≥, ≤) for datetime values. - - Args: - actual: The actual metric value (left side). - operator: The comparison operator. - expected: The expected/threshold value (right side). - - Returns: - True if the condition passes. - - Raises: - ValueError: If values cannot be parsed or operator is unsupported. - """ - # Handle unary operators first - if operator == "null": - return actual is None - if operator == "not null": - return actual is not None - if operator == "empty": - return not actual - if operator == "not empty": - return bool(actual) - - if actual is None: - return False - - actual_dt = _parse_datetime(actual) - expected_dt = _parse_datetime(expected) if expected is not None else None - - if expected_dt is None: - raise ValueError(f"Expected datetime value is required for operator '{operator}'") - - match operator: - case "before" | "<": - return actual_dt < expected_dt - case "after" | ">": - return actual_dt > expected_dt - case "=" | "is": - return actual_dt == expected_dt - case "≠" | "is not": - return actual_dt != expected_dt - case "≥": - return actual_dt >= expected_dt - case "≤": - return actual_dt <= expected_dt - case _: - raise ValueError(f"Unsupported datetime operator: '{operator}'") - - -def _evaluate_number_condition( - actual: object, - operator: str, - expected: object, -) -> bool: - """Evaluate a numeric comparison condition. - - Ensures proper numeric type coercion before delegating to the workflow - condition engine. This avoids string-vs-number comparison pitfalls - (e.g. comparing float metric 0.85 against string threshold "0.8"). - - For unary operators (null, not null, empty, not empty), delegates directly. - """ - # Unary operators — delegate to workflow engine as-is - if operator in _UNARY_OPERATORS: - return _evaluate_condition( - operator=cast(SupportedComparisonOperator, operator), - value=actual, - expected=cast(str | Sequence[str] | bool | Sequence[bool] | None, expected), - ) - - if actual is None: - return False - - # Coerce actual to numeric - if not isinstance(actual, (int, float)): - try: - actual = float(cast(str | int | float, actual)) - except (TypeError, ValueError) as e: - raise ValueError(f"Cannot convert actual value '{actual}' to number") from e - - # Coerce expected to numeric string for the workflow engine - # (the workflow engine's _normalize_numeric_values handles str → float) - if expected is not None and not isinstance(expected, str): - expected = str(expected) - - return _evaluate_condition( - operator=cast(SupportedComparisonOperator, operator), - value=actual, - expected=expected, - ) diff --git a/api/core/evaluation/runners/agent_evaluation_runner.py b/api/core/evaluation/runners/agent_evaluation_runner.py index 3d99bcf76e6..ef3bbe704c5 100644 --- a/api/core/evaluation/runners/agent_evaluation_runner.py +++ b/api/core/evaluation/runners/agent_evaluation_runner.py @@ -2,77 +2,28 @@ import logging from collections.abc import Mapping from typing import Any -from sqlalchemy.orm import Session - from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( - CustomizedMetrics, DefaultMetric, EvaluationItemInput, EvaluationItemResult, ) from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner from graphon.node_events import NodeRunResult -from models.model import App logger = logging.getLogger(__name__) class AgentEvaluationRunner(BaseEvaluationRunner): - """Runner for agent evaluation: executes agent-type App, collects tool calls and final output.""" + """Runner for agent evaluation: collects tool calls and final output.""" - def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): - super().__init__(evaluation_instance, session) - - def execute_target( - self, - tenant_id: str, - target_id: str, - target_type: str, - item: EvaluationItemInput, - ) -> EvaluationItemResult: - """Execute agent app and collect response with tool call information.""" - from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator - from core.app.entities.app_invoke_entities import InvokeFrom - from core.evaluation.runners import get_service_account_for_app - - app = self.session.query(App).filter_by(id=target_id).first() - if not app: - raise ValueError(f"App {target_id} not found") - - service_account = get_service_account_for_app(self.session, target_id) - - query = self._extract_query(item.inputs) - args: dict[str, Any] = { - "inputs": item.inputs, - "query": query, - } - - generator = AgentChatAppGenerator() - # Agent chat requires streaming - collect full response - response_generator = generator.generate( - app_model=app, - user=service_account, - args=args, - invoke_from=InvokeFrom.SERVICE_API, - streaming=True, - ) - - # Consume the stream to get the full response - actual_output, tool_calls = self._consume_agent_stream(response_generator) - - return EvaluationItemResult( - index=item.index, - actual_output=actual_output, - metadata={"tool_calls": tool_calls}, - ) + def __init__(self, evaluation_instance: BaseEvaluationInstance): + super().__init__(evaluation_instance) def evaluate_metrics( self, - node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, - node_run_result_list: list[NodeRunResult] | None, - default_metric: DefaultMetric | None, - customized_metrics: CustomizedMetrics | None, + node_run_result_list: list[NodeRunResult], + default_metric: DefaultMetric, model_provider: str, model_name: str, tenant_id: str, @@ -80,8 +31,6 @@ class AgentEvaluationRunner(BaseEvaluationRunner): """Compute agent evaluation metrics.""" if not node_run_result_list: return [] - if not default_metric: - raise ValueError("Default metric is required for agent evaluation") merged_items = self._merge_results_into_items(node_run_result_list) return self.evaluation_instance.evaluate_agent( merged_items, [default_metric.metric], model_provider, model_name, tenant_id @@ -102,47 +51,6 @@ class AgentEvaluationRunner(BaseEvaluationRunner): ) return merged - @staticmethod - def _extract_query(inputs: dict[str, Any]) -> str: - for key in ("query", "question", "input", "text"): - if key in inputs: - return str(inputs[key]) - values = list(inputs.values()) - return str(values[0]) if values else "" - - @staticmethod - def _consume_agent_stream(response_generator: Any) -> tuple[str, list[dict]]: - """Consume agent streaming response and extract final answer + tool calls.""" - answer_parts: list[str] = [] - tool_calls: list[dict] = [] - - try: - for chunk in response_generator: - if isinstance(chunk, Mapping): - event = chunk.get("event") - if event == "agent_thought": - thought = chunk.get("thought", "") - if thought: - answer_parts.append(thought) - tool = chunk.get("tool") - if tool: - tool_calls.append( - { - "tool": tool, - "tool_input": chunk.get("tool_input", ""), - } - ) - elif event == "message": - answer = chunk.get("answer", "") - if answer: - answer_parts.append(answer) - elif isinstance(chunk, str): - answer_parts.append(chunk) - except Exception: - logger.exception("Error consuming agent stream") - - return "".join(answer_parts), tool_calls - def _extract_agent_output(outputs: Mapping[str, Any]) -> str: """Extract the primary output text from agent NodeRunResult.outputs.""" diff --git a/api/core/evaluation/runners/base_evaluation_runner.py b/api/core/evaluation/runners/base_evaluation_runner.py index 715853759d3..9046c2ddad2 100644 --- a/api/core/evaluation/runners/base_evaluation_runner.py +++ b/api/core/evaluation/runners/base_evaluation_runner.py @@ -1,179 +1,51 @@ """Base evaluation runner. -Orchestrates the evaluation lifecycle in four phases: - 1. execute_target — run the target and collect actual outputs (abstract) - 2. evaluate_metrics — compute metrics via framework or customized workflow - 3. apply_judgment — evaluate pass/fail judgment conditions on metrics - 4. persist — save results to the database +Provides the abstract interface for metric computation. Each concrete runner +(LLM, Retrieval, Agent, Workflow, Snippet) implements ``evaluate_metrics`` +to compute scores for a specific node type. -The persisted ``EvaluationRunItem.judgment`` payload must reflect the final -judgment result for each evaluated item, so judgment evaluation happens before -the persistence phase whenever a ``JudgmentConfig`` is supplied. +Orchestration (merging results from multiple runners, applying judgment, and +persisting to the database) is handled by the evaluation task, not the runner. """ -import json import logging from abc import ABC, abstractmethod -from sqlalchemy.orm import Session - from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( - CustomizedMetrics, DefaultMetric, - EvaluationDatasetInput, EvaluationItemResult, ) -from core.evaluation.entities.judgment_entity import JudgmentConfig -from core.evaluation.judgment.processor import JudgmentProcessor from graphon.node_events import NodeRunResult -from libs.datetime_utils import naive_utc_now -from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus logger = logging.getLogger(__name__) class BaseEvaluationRunner(ABC): - """Abstract base class for evaluation runners.""" + """Abstract base class for evaluation runners. - def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): + Runners are stateless metric calculators: they receive node execution + results and a metric specification, then return scored results. They + do **not** touch the database or apply judgment logic. + """ + + def __init__(self, evaluation_instance: BaseEvaluationInstance): self.evaluation_instance = evaluation_instance - self.session = session @abstractmethod def evaluate_metrics( self, - node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, - node_run_result_list: list[NodeRunResult] | None, - default_metric: DefaultMetric | None, - customized_metrics: CustomizedMetrics | None, + node_run_result_list: list[NodeRunResult], + default_metric: DefaultMetric, model_provider: str, model_name: str, tenant_id: str, ) -> list[EvaluationItemResult]: - """Compute evaluation metrics on the collected results.""" - ... + """Compute evaluation metrics on the collected results. - def run( - self, - evaluation_run_id: str, - tenant_id: str, - target_id: str, - target_type: str, - node_run_result_list: list[NodeRunResult] | None = None, - default_metric: DefaultMetric | None = None, - customized_metrics: CustomizedMetrics | None = None, - model_provider: str = "", - model_name: str = "", - node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None = None, - judgment_config: JudgmentConfig | None = None, - input_list: list[EvaluationDatasetInput] | None = None, - ) -> list[EvaluationItemResult]: - """Orchestrate target execution + metric evaluation + judgment for all items.""" - evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first() - if not evaluation_run: - raise ValueError(f"EvaluationRun {evaluation_run_id} not found") - - if not default_metric and not customized_metrics: - raise ValueError("Either default_metric or customized_metrics must be provided") - - # Update status to running - evaluation_run.status = EvaluationRunStatus.RUNNING - evaluation_run.started_at = naive_utc_now() - self.session.commit() - - results_by_index: dict[int, EvaluationItemResult] = {} - - # Phase 1: run evaluation - if default_metric and node_run_result_list: - try: - evaluated_results = self.evaluate_metrics( - node_run_result_mapping_list=node_run_result_mapping_list, - node_run_result_list=node_run_result_list, - default_metric=default_metric, - customized_metrics=customized_metrics, - model_provider=model_provider, - model_name=model_name, - tenant_id=tenant_id, - ) - for r in evaluated_results: - results_by_index[r.index] = r - except Exception: - logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id) - if customized_metrics and node_run_result_mapping_list: - try: - customized_results = self.evaluation_instance.evaluate_with_customized_workflow( - node_run_result_mapping_list=node_run_result_mapping_list, - customized_metrics=customized_metrics, - tenant_id=tenant_id, - ) - for r in customized_results: - existing = results_by_index.get(r.index) - if existing: - # Merge: combine metrics from both sources into one result - results_by_index[r.index] = existing.model_copy( - update={"metrics": existing.metrics + r.metrics} - ) - else: - results_by_index[r.index] = r - except Exception: - logger.exception("Failed to compute customized metrics for evaluation run %s", evaluation_run_id) - - results = list(results_by_index.values()) - - if judgment_config is not None: - results = self._apply_judgment( - results=results, - judgment_config=judgment_config, - node_run_result_mapping_list=node_run_result_mapping_list, - ) - - # Phase 4: Persist individual items - dataset_items = input_list or [] - for result in results: - item_input = next((item for item in dataset_items if item.index == result.index), None) - run_item = EvaluationRunItem( - evaluation_run_id=evaluation_run_id, - item_index=result.index, - inputs=json.dumps(item_input.inputs) if item_input else None, - expected_output=item_input.expected_output if item_input else None, - context=json.dumps(item_input.context) if item_input and getattr(item_input, "context", None) else None, - actual_output=result.actual_output, - metrics=json.dumps([m.model_dump() for m in result.metrics]) if result.metrics else None, - judgment=json.dumps(result.judgment.model_dump()) if result.judgment else None, - metadata_json=json.dumps(result.metadata) if result.metadata else None, - error=result.error, - overall_score=getattr(result, "overall_score", None), - ) - self.session.add(run_item) - - self.session.commit() - - return results - - @staticmethod - def _apply_judgment( - results: list[EvaluationItemResult], - judgment_config: JudgmentConfig, - node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None = None, - ) -> list[EvaluationItemResult]: - """Apply judgment conditions to each result's metrics. - - Judgment is computed only from the per-item metric values and the - supplied ``JudgmentConfig``. ``metric_name`` selects the left-hand side - metric, and ``condition_value`` is used as the comparison target. + The returned ``EvaluationItemResult.index`` values are positional + (0-based) and correspond to the order of *node_run_result_list*. + The caller is responsible for mapping them back to the original + dataset indices. """ - - judged_results: list[EvaluationItemResult] = [] - - for result in results: - if result.error is not None or not result.metrics: - judged_results.append(result) - continue - - # Left side: only metrics - metric_values: dict[str, object] = {m.name: m.value for m in result.metrics} - judgment_result = JudgmentProcessor.evaluate(metric_values, judgment_config) - - judged_results.append(result.model_copy(update={"judgment": judgment_result})) - return judged_results + ... diff --git a/api/core/evaluation/runners/llm_evaluation_runner.py b/api/core/evaluation/runners/llm_evaluation_runner.py index 0d8bb06d1ea..4b1c2448381 100644 --- a/api/core/evaluation/runners/llm_evaluation_runner.py +++ b/api/core/evaluation/runners/llm_evaluation_runner.py @@ -1,12 +1,9 @@ import logging from collections.abc import Mapping -from typing import Any, Union - -from sqlalchemy.orm import Session +from typing import Any from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( - CustomizedMetrics, DefaultMetric, EvaluationItemInput, EvaluationItemResult, @@ -18,59 +15,27 @@ logger = logging.getLogger(__name__) class LLMEvaluationRunner(BaseEvaluationRunner): - """Runner for LLM evaluation: executes App to get responses, then evaluates.""" + """Runner for LLM evaluation: extracts prompts/outputs then evaluates.""" - def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): - super().__init__(evaluation_instance, session) + def __init__(self, evaluation_instance: BaseEvaluationInstance): + super().__init__(evaluation_instance) def evaluate_metrics( self, - node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, - node_run_result_list: list[NodeRunResult] | None, - default_metric: DefaultMetric | None, - customized_metrics: CustomizedMetrics | None, + node_run_result_list: list[NodeRunResult], + default_metric: DefaultMetric, model_provider: str, model_name: str, tenant_id: str, ) -> list[EvaluationItemResult]: """Use the evaluation instance to compute LLM metrics.""" - # Merge actual_output into items for evaluation if not node_run_result_list: return [] - if not default_metric: - raise ValueError("Default metric is required for LLM evaluation") merged_items = self._merge_results_into_items(node_run_result_list) return self.evaluation_instance.evaluate_llm( merged_items, [default_metric.metric], model_provider, model_name, tenant_id ) - @staticmethod - def _extract_query(inputs: dict[str, Any]) -> str: - """Extract query from inputs.""" - for key in ("query", "question", "input", "text"): - if key in inputs: - return str(inputs[key]) - values = list(inputs.values()) - return str(values[0]) if values else "" - - @staticmethod - def _extract_output(response: Union[Mapping[str, Any], Any]) -> str: - """Extract text output from app response.""" - if isinstance(response, Mapping): - # Workflow response - if "data" in response and isinstance(response["data"], Mapping): - outputs = response["data"].get("outputs", {}) - if isinstance(outputs, Mapping): - values = list(outputs.values()) - return str(values[0]) if values else "" - return str(outputs) - # Completion response - if "answer" in response: - return str(response["answer"]) - if "text" in response: - return str(response["text"]) - return str(response) - @staticmethod def _merge_results_into_items( items: list[NodeRunResult], @@ -114,6 +79,5 @@ def _extract_llm_output(outputs: Mapping[str, Any]) -> str: return str(outputs["text"]) if "answer" in outputs: return str(outputs["answer"]) - # Fallback: first value values = list(outputs.values()) return str(values[0]) if values else "" diff --git a/api/core/evaluation/runners/retrieval_evaluation_runner.py b/api/core/evaluation/runners/retrieval_evaluation_runner.py index 28c0968824d..66b8ab7360c 100644 --- a/api/core/evaluation/runners/retrieval_evaluation_runner.py +++ b/api/core/evaluation/runners/retrieval_evaluation_runner.py @@ -1,11 +1,8 @@ import logging from typing import Any -from sqlalchemy.orm import Session - from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( - CustomizedMetrics, DefaultMetric, EvaluationItemInput, EvaluationItemResult, @@ -19,15 +16,13 @@ logger = logging.getLogger(__name__) class RetrievalEvaluationRunner(BaseEvaluationRunner): """Runner for retrieval evaluation: performs knowledge base retrieval, then evaluates.""" - def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): - super().__init__(evaluation_instance, session) + def __init__(self, evaluation_instance: BaseEvaluationInstance): + super().__init__(evaluation_instance) def evaluate_metrics( self, - node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, - node_run_result_list: list[NodeRunResult] | None, - default_metric: DefaultMetric | None, - customized_metrics: CustomizedMetrics | None, + node_run_result_list: list[NodeRunResult], + default_metric: DefaultMetric, model_provider: str, model_name: str, tenant_id: str, @@ -38,10 +33,8 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner): merged_items = [] for i, node_result in enumerate(node_run_result_list): - # Extract retrieved contexts from outputs outputs = node_result.outputs query = self._extract_query(dict(node_result.inputs)) - # Extract retrieved content from result list result_list = outputs.get("result", []) contexts = [item.get("content", "") for item in result_list if item.get("content")] output = "\n---\n".join(contexts) @@ -56,7 +49,7 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner): ) return self.evaluation_instance.evaluate_retrieval( - merged_items, [default_metric.metric]if default_metric else [], model_provider, model_name, tenant_id + merged_items, [default_metric.metric], model_provider, model_name, tenant_id ) @staticmethod diff --git a/api/core/evaluation/runners/snippet_evaluation_runner.py b/api/core/evaluation/runners/snippet_evaluation_runner.py index 7893a3e796b..bc516f9ee8e 100644 --- a/api/core/evaluation/runners/snippet_evaluation_runner.py +++ b/api/core/evaluation/runners/snippet_evaluation_runner.py @@ -1,118 +1,42 @@ """Runner for Snippet evaluation. -Executes a published Snippet workflow in non-streaming mode, collects the -actual outputs and per-node execution records, then delegates to the -evaluation instance for metric computation. - +Snippets are essentially workflows, so we reuse ``evaluate_workflow`` from +the evaluation instance for metric computation. """ -import json import logging -from collections.abc import Mapping, Sequence +from collections.abc import Mapping from typing import Any -from sqlalchemy import asc, select -from sqlalchemy.orm import Session - from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( - CustomizedMetrics, DefaultMetric, EvaluationItemInput, EvaluationItemResult, ) from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner from graphon.node_events import NodeRunResult -from models.snippet import CustomizedSnippet -from models.workflow import WorkflowNodeExecutionModel logger = logging.getLogger(__name__) class SnippetEvaluationRunner(BaseEvaluationRunner): - """Runner for snippet evaluation: executes a published Snippet workflow.""" + """Runner for snippet evaluation: evaluates a published Snippet workflow.""" - def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): - super().__init__(evaluation_instance, session) - - def execute_target( - self, - tenant_id: str, - target_id: str, - target_type: str, - item: EvaluationItemInput, - ) -> EvaluationItemResult: - """Execute a published Snippet workflow and collect outputs. - - Steps: - 1. Delegate execution to ``SnippetGenerateService.run_published``. - 2. Extract ``workflow_run_id`` from the blocking response. - 3. Query ``workflow_node_executions`` by ``workflow_run_id`` to get - each node's inputs, outputs, status, elapsed_time, etc. - 4. Return result with actual_output and node_executions metadata. - """ - from core.app.entities.app_invoke_entities import InvokeFrom - from core.evaluation.runners import get_service_account_for_snippet - from services.snippet_generate_service import SnippetGenerateService - - snippet = self.session.query(CustomizedSnippet).filter_by(id=target_id).first() - if not snippet: - raise ValueError(f"Snippet {target_id} not found") - - if not snippet.is_published: - raise ValueError(f"Snippet {target_id} is not published") - - service_account = get_service_account_for_snippet(self.session, target_id) - - response = SnippetGenerateService.run_published( - snippet=snippet, - user=service_account, - args={"inputs": item.inputs}, - invoke_from=InvokeFrom.SERVICE_API, - ) - - actual_output = self._extract_output(response) - - # Retrieve per-node execution records from DB - workflow_run_id = self._extract_workflow_run_id(response) - node_executions = ( - self._query_node_executions( - tenant_id=tenant_id, - app_id=target_id, - workflow_run_id=workflow_run_id, - ) - if workflow_run_id - else [] - ) - - return EvaluationItemResult( - index=item.index, - actual_output=actual_output, - metadata={ - "workflow_run_id": workflow_run_id or "", - "node_executions": node_executions, - }, - ) + def __init__(self, evaluation_instance: BaseEvaluationInstance): + super().__init__(evaluation_instance) def evaluate_metrics( self, - node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, - node_run_result_list: list[NodeRunResult] | None, - default_metric: DefaultMetric | None, - customized_metrics: CustomizedMetrics | None, + node_run_result_list: list[NodeRunResult], + default_metric: DefaultMetric, model_provider: str, model_name: str, tenant_id: str, ) -> list[EvaluationItemResult]: - """Compute evaluation metrics for snippet outputs. - - Snippets are essentially workflows, so we reuse evaluate_workflow from - the evaluation instance. - """ + """Compute evaluation metrics for snippet outputs.""" if not node_run_result_list: return [] - if not default_metric: - raise ValueError("Default metric is required for snippet evaluation") merged_items = self._merge_results_into_items(node_run_result_list) return self.evaluation_instance.evaluate_workflow( merged_items, [default_metric.metric], model_provider, model_name, tenant_id @@ -133,94 +57,6 @@ class SnippetEvaluationRunner(BaseEvaluationRunner): ) return merged - @staticmethod - def _extract_output(response: Mapping[str, Any]) -> str: - """Extract text output from the blocking workflow response. - - The blocking response ``data.outputs`` is a dict of output variables. - We take the first value as the primary output text. - """ - if "data" in response and isinstance(response["data"], Mapping): - outputs = response["data"].get("outputs", {}) - if isinstance(outputs, Mapping): - values = list(outputs.values()) - return str(values[0]) if values else "" - return str(outputs) - return str(response) - - @staticmethod - def _extract_workflow_run_id(response: Mapping[str, Any]) -> str | None: - """Extract workflow_run_id from the blocking response. - - The blocking response has ``workflow_run_id`` at the top level and - also ``data.id`` (same value). - """ - wf_run_id = response.get("workflow_run_id") - if wf_run_id: - return str(wf_run_id) - # Fallback to data.id - data = response.get("data") - if isinstance(data, Mapping) and data.get("id"): - return str(data["id"]) - return None - - def _query_node_executions( - self, - tenant_id: str, - app_id: str, - workflow_run_id: str, - ) -> list[dict[str, Any]]: - """Query per-node execution records from the DB after workflow completes. - - Node executions are persisted during workflow execution. We read them - back via the ``workflow_run_id`` to get each node's inputs, outputs, - status, elapsed_time, etc. - - Returns a list of serialisable dicts for storage in ``metadata``. - """ - stmt = ( - WorkflowNodeExecutionModel.preload_offload_data(select(WorkflowNodeExecutionModel)) - .where( - WorkflowNodeExecutionModel.tenant_id == tenant_id, - WorkflowNodeExecutionModel.app_id == app_id, - WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id, - ) - .order_by(asc(WorkflowNodeExecutionModel.created_at)) - ) - - node_models: Sequence[WorkflowNodeExecutionModel] = self.session.execute(stmt).scalars().all() - - return [self._serialize_node_execution(node) for node in node_models] - - @staticmethod - def _serialize_node_execution(node: WorkflowNodeExecutionModel) -> dict[str, Any]: - """Convert a WorkflowNodeExecutionModel to a serialisable dict. - - Includes the node's id, type, title, inputs/outputs (parsed from JSON), - status, error, and elapsed_time. The virtual Start node injected by - SnippetGenerateService is filtered out by the caller if needed. - """ - - def _safe_parse_json(value: str | None) -> Any: - if not value: - return None - try: - return json.loads(value) - except (json.JSONDecodeError, TypeError): - return value - - return { - "id": node.id, - "node_id": node.node_id, - "node_type": node.node_type, - "title": node.title, - "inputs": _safe_parse_json(node.inputs), - "outputs": _safe_parse_json(node.outputs), - "status": node.status, - "error": node.error, - "elapsed_time": node.elapsed_time, - } - def _extract_snippet_output(outputs: Mapping[str, Any]) -> str: """Extract the primary output text from snippet NodeRunResult.outputs.""" diff --git a/api/core/evaluation/runners/workflow_evaluation_runner.py b/api/core/evaluation/runners/workflow_evaluation_runner.py index 4a4ec68828e..e1cc9defdb8 100644 --- a/api/core/evaluation/runners/workflow_evaluation_runner.py +++ b/api/core/evaluation/runners/workflow_evaluation_runner.py @@ -2,11 +2,8 @@ import logging from collections.abc import Mapping from typing import Any -from sqlalchemy.orm import Session - from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( - CustomizedMetrics, DefaultMetric, EvaluationItemInput, EvaluationItemResult, @@ -20,15 +17,13 @@ logger = logging.getLogger(__name__) class WorkflowEvaluationRunner(BaseEvaluationRunner): """Runner for workflow evaluation: executes workflow App in non-streaming mode.""" - def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): - super().__init__(evaluation_instance, session) + def __init__(self, evaluation_instance: BaseEvaluationInstance): + super().__init__(evaluation_instance) def evaluate_metrics( self, - node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, - node_run_result_list: list[NodeRunResult] | None, - default_metric: DefaultMetric | None, - customized_metrics: CustomizedMetrics | None, + node_run_result_list: list[NodeRunResult], + default_metric: DefaultMetric, model_provider: str, model_name: str, tenant_id: str, @@ -36,8 +31,6 @@ class WorkflowEvaluationRunner(BaseEvaluationRunner): """Compute workflow evaluation metrics (end-to-end).""" if not node_run_result_list: return [] - if not default_metric: - raise ValueError("Default metric is required for workflow evaluation") merged_items = self._merge_results_into_items(node_run_result_list) return self.evaluation_instance.evaluate_workflow( merged_items, [default_metric.metric], model_provider, model_name, tenant_id @@ -58,25 +51,6 @@ class WorkflowEvaluationRunner(BaseEvaluationRunner): ) return merged - @staticmethod - def _extract_output(response: Mapping[str, Any]) -> str: - """Extract text output from workflow response.""" - if "data" in response and isinstance(response["data"], Mapping): - outputs = response["data"].get("outputs", {}) - if isinstance(outputs, Mapping): - values = list(outputs.values()) - return str(values[0]) if values else "" - return str(outputs) - return str(response) - - @staticmethod - def _extract_node_executions(response: Mapping[str, Any]) -> list[dict]: - """Extract node execution trace from workflow response.""" - data = response.get("data", {}) - if isinstance(data, Mapping): - return data.get("node_executions", []) - return [] - def _extract_workflow_output(outputs: Mapping[str, Any]) -> str: """Extract the primary output text from workflow NodeRunResult.outputs.""" diff --git a/api/models/evaluation.py b/api/models/evaluation.py index 6932d6a346e..42445462719 100644 --- a/api/models/evaluation.py +++ b/api/models/evaluation.py @@ -36,6 +36,7 @@ class EvaluationConfiguration(Base): __table_args__ = ( sa.PrimaryKeyConstraint("id", name="evaluation_configuration_pkey"), sa.Index("evaluation_configuration_target_idx", "tenant_id", "target_type", "target_id"), + sa.Index("evaluation_configuration_workflow_idx", "customized_workflow_id"), sa.UniqueConstraint("tenant_id", "target_type", "target_id", name="evaluation_configuration_unique"), ) @@ -48,6 +49,7 @@ class EvaluationConfiguration(Base): evaluation_model: Mapped[str | None] = mapped_column(String(255), nullable=True) metrics_config: Mapped[str | None] = mapped_column(LongText, nullable=True) judgement_conditions: Mapped[str | None] = mapped_column(LongText, nullable=True) + customized_workflow_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) created_by: Mapped[str] = mapped_column(StringUUID, nullable=False) updated_by: Mapped[str] = mapped_column(StringUUID, nullable=False) diff --git a/api/services/evaluation_service.py b/api/services/evaluation_service.py index 13ffc9e3b27..196af2a617a 100644 --- a/api/services/evaluation_service.py +++ b/api/services/evaluation_service.py @@ -12,6 +12,7 @@ from sqlalchemy.orm import Session from configs import dify_config from core.evaluation.entities.evaluation_entity import ( METRIC_NODE_TYPE_MAPPING, + METRIC_VALUE_TYPE_MAPPING, DefaultMetric, EvaluationCategory, EvaluationConfigData, @@ -32,6 +33,7 @@ from models.evaluation import ( ) from models.model import App, AppMode from models.snippet import CustomizedSnippet +from models.workflow import Workflow from services.errors.evaluation import ( EvaluationDatasetInvalidError, EvaluationFrameworkNotConfiguredError, @@ -306,11 +308,33 @@ class EvaluationService: } ) config.judgement_conditions = json.dumps(data.judgment_config.model_dump() if data.judgment_config else {}) + config.customized_workflow_id = ( + data.customized_metrics.evaluation_workflow_id if data.customized_metrics else None + ) config.updated_by = account_id session.commit() session.refresh(config) return config + @classmethod + def list_targets_by_customized_workflow( + cls, + session: Session, + tenant_id: str, + customized_workflow_id: str, + ) -> list[EvaluationConfiguration]: + """Return all evaluation configs that reference the given workflow as customized metrics.""" + from sqlalchemy import select + + return list( + session.scalars( + select(EvaluationConfiguration).where( + EvaluationConfiguration.tenant_id == tenant_id, + EvaluationConfiguration.customized_workflow_id == customized_workflow_id, + ) + ).all() + ) + # ---- Evaluation Run Management ---- @classmethod @@ -482,6 +506,71 @@ class EvaluationService: """Return the centrally-defined list of evaluation metrics.""" return [m.value for m in EvaluationMetricName] + @classmethod + def _nodes_for_metrics_from_workflow( + cls, + workflow: Workflow, + metrics: list[str], + ) -> dict[str, list[dict[str, str]]]: + node_type_to_nodes: dict[str, list[dict[str, str]]] = {} + for node_id, node_data in workflow.walk_nodes(): + ntype = node_data.get("type", "") + node_type_to_nodes.setdefault(ntype, []).append( + NodeInfo(node_id=node_id, type=ntype, title=node_data.get("title", "")).model_dump() + ) + + result: dict[str, list[dict[str, str]]] = {} + for metric in metrics: + required_node_type = METRIC_NODE_TYPE_MAPPING.get(metric) + if required_node_type is None: + result[metric] = [] + continue + result[metric] = node_type_to_nodes.get(required_node_type, []) + return result + + @classmethod + def _union_supported_metric_names(cls) -> list[str]: + """Metric names the current evaluation framework supports for any :class:`EvaluationCategory`.""" + ordered: list[str] = [] + seen: set[str] = set() + for category in EvaluationCategory: + for name in cls.get_supported_metrics(category): + if name not in seen: + seen.add(name) + ordered.append(name) + return ordered + + @classmethod + def get_default_metrics_with_nodes_for_published_target( + cls, + target: Union[App, CustomizedSnippet], + target_type: str, + ) -> list[DefaultMetric]: + """List default metrics and matching nodes using only the *published* workflow graph. + + Metrics are those supported by the configured evaluation framework and present in + :data:`METRIC_NODE_TYPE_MAPPING`. Node lists are derived from the published workflow only + (no draft fallback). + """ + workflow = cls._resolve_published_workflow(target, target_type) + if not workflow: + return [] + + supported = cls._union_supported_metric_names() + metric_names = sorted(m for m in supported if m in METRIC_NODE_TYPE_MAPPING) + if not metric_names: + return [] + + nodes_by_metric = cls._nodes_for_metrics_from_workflow(workflow, metric_names) + return [ + DefaultMetric( + metric=m, + value_type=METRIC_VALUE_TYPE_MAPPING.get(m, "number"), + node_info_list=[NodeInfo.model_validate(n) for n in nodes_by_metric.get(m, [])], + ) + for m in metric_names + ] + @classmethod def get_nodes_for_metrics( cls, @@ -509,28 +598,27 @@ class EvaluationService: ] return {"all": all_nodes} - node_type_to_nodes: dict[str, list[dict[str, str]]] = {} - for node_id, node_data in workflow.walk_nodes(): - ntype = node_data.get("type", "") - node_type_to_nodes.setdefault(ntype, []).append( - NodeInfo(node_id=node_id, type=ntype, title=node_data.get("title", "")).model_dump() - ) + return cls._nodes_for_metrics_from_workflow(workflow, metrics) - result: dict[str, list[dict[str, str]]] = {} - for metric in metrics: - required_node_type = METRIC_NODE_TYPE_MAPPING.get(metric) - if required_node_type is None: - result[metric] = [] - continue - result[metric] = node_type_to_nodes.get(required_node_type, []) - return result + @classmethod + def _resolve_published_workflow( + cls, + target: Union[App, CustomizedSnippet], + target_type: str, + ) -> Workflow | None: + """Resolve only the published workflow for the target (no draft fallback).""" + if target_type == "snippets" and isinstance(target, CustomizedSnippet): + return SnippetService().get_published_workflow(snippet=target) + if target_type == "app" and isinstance(target, App): + return WorkflowService().get_published_workflow(app_model=target) + return None @classmethod def _resolve_workflow( cls, target: Union[App, CustomizedSnippet], target_type: str, - ) -> "Workflow | None": + ) -> Workflow | None: """Resolve the *published* (preferred) or *draft* workflow for the target.""" if target_type == "snippets" and isinstance(target, CustomizedSnippet): snippet_service = SnippetService() diff --git a/api/tasks/evaluation_task.py b/api/tasks/evaluation_task.py index bbfa8e62c14..4e3f7acb2e2 100644 --- a/api/tasks/evaluation_task.py +++ b/api/tasks/evaluation_task.py @@ -15,9 +15,11 @@ from core.evaluation.entities.evaluation_entity import ( EvaluationDatasetInput, EvaluationItemResult, EvaluationRunData, + NodeInfo, ) from core.evaluation.entities.judgment_entity import JudgmentConfig from core.evaluation.evaluation_manager import EvaluationManager +from core.evaluation.judgment.processor import JudgmentProcessor from core.evaluation.runners.agent_evaluation_runner import AgentEvaluationRunner from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner from core.evaluation.runners.llm_evaluation_runner import LLMEvaluationRunner @@ -28,7 +30,7 @@ from extensions.ext_database import db from graphon.node_events import NodeRunResult from libs.datetime_utils import naive_utc_now from models.enums import CreatorUserRole -from models.evaluation import EvaluationRun, EvaluationRunStatus +from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus from models.model import UploadFile from services.evaluation_service import EvaluationService @@ -41,11 +43,12 @@ def run_evaluation(run_data_dict: dict[str, Any]) -> None: Workflow: 1. Deserialize EvaluationRunData - 2. Update status to RUNNING - 3. Select appropriate Runner based on evaluation_category - 4. Execute runner.run() which handles target execution + metric computation - 5. Generate result XLSX - 6. Update EvaluationRun status to COMPLETED + 2. Execute target and collect node results + 3. Evaluate metrics via runners (one per metric-node pair) + 4. Merge results per test-data row (1 item = 1 EvaluationRunItem) + 5. Apply judgment conditions + 6. Persist results + generate result XLSX + 7. Update EvaluationRun status to COMPLETED """ run_data = EvaluationRunData.model_validate(run_data_dict) @@ -70,16 +73,19 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: logger.error("EvaluationRun %s not found", run_data.evaluation_run_id) return - # Check if cancelled if evaluation_run.status == EvaluationRunStatus.CANCELLED: logger.info("EvaluationRun %s was cancelled", run_data.evaluation_run_id) return - # Get evaluation instance evaluation_instance = EvaluationManager.get_evaluation_instance() if evaluation_instance is None: raise ValueError("Evaluation framework not configured") + # Mark as running + evaluation_run.status = EvaluationRunStatus.RUNNING + evaluation_run.started_at = naive_utc_now() + session.commit() + if run_data.target_type == "dataset": results: list[EvaluationItemResult] = _execute_retrieval_test( session=session, @@ -95,18 +101,19 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: target_id=run_data.target_id, input_list=run_data.input_list, ) + + workflow_run_id_map = { + item.index: wf_run_id + for item, wf_run_id in zip(run_data.input_list, workflow_run_ids) + if wf_run_id + } + results = _execute_evaluation_runner( session=session, run_data=run_data, evaluation_instance=evaluation_instance, node_run_result_mapping_list=node_run_result_mapping_list, - ) - - _backfill_workflow_run_ids( - session=session, - evaluation_run_id=run_data.evaluation_run_id, - input_list=run_data.input_list, - workflow_run_ids=workflow_run_ids, + workflow_run_id_map=workflow_run_id_map, ) # Compute summary metrics @@ -119,7 +126,7 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: result_file_id = _store_result_file(run_data.tenant_id, run_data.evaluation_run_id, result_xlsx, session) # Update run to completed - evaluation_run: EvaluationRun = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first() + evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first() if evaluation_run: evaluation_run.status = EvaluationRunStatus.COMPLETED evaluation_run.completed_at = naive_utc_now() @@ -131,80 +138,82 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: logger.info("Evaluation run %s completed successfully", run_data.evaluation_run_id) +# --------------------------------------------------------------------------- +# Evaluation orchestration — merge + judgment + persist +# --------------------------------------------------------------------------- + + def _execute_evaluation_runner( session: Any, run_data: EvaluationRunData, evaluation_instance: BaseEvaluationInstance, node_run_result_mapping_list: list[dict[str, NodeRunResult]], + workflow_run_id_map: dict[int, str] | None = None, ) -> list[EvaluationItemResult]: - """Execute the evaluation runner.""" - default_metrics = run_data.default_metrics - customized_metrics = run_data.customized_metrics - results: list[EvaluationItemResult] = [] - for default_metric in default_metrics: + """Evaluate all metrics, merge per-item, apply judgment, persist once. + + Ensures 1 test-data row = 1 EvaluationRunItem with all metrics combined. + """ + results_by_index: dict[int, EvaluationItemResult] = {} + + # Phase 1: Default metrics — one batch per (metric, node) pair + for default_metric in run_data.default_metrics: for node_info in default_metric.node_info_list: node_run_result_list: list[NodeRunResult] = [] - for node_run_result_mapping in node_run_result_mapping_list: - node_run_result = node_run_result_mapping.get(node_info.node_id) - if node_run_result is not None: - node_run_result_list.append(node_run_result) - if node_run_result_list: - runner = _create_runner(EvaluationCategory(node_info.type), evaluation_instance, session) - results.extend( - runner.run( - evaluation_run_id=run_data.evaluation_run_id, - tenant_id=run_data.tenant_id, - target_id=run_data.target_id, - target_type=run_data.target_type, - default_metric=default_metric, - customized_metrics=None, - model_provider=run_data.evaluation_model_provider, - model_name=run_data.evaluation_model, - node_run_result_list=node_run_result_list, - judgment_config=run_data.judgment_config, - input_list=run_data.input_list, - ) + item_indices: list[int] = [] + for i, mapping in enumerate(node_run_result_mapping_list): + node_result = mapping.get(node_info.node_id) + if node_result is not None: + node_run_result_list.append(node_result) + item_indices.append(i) + + if not node_run_result_list: + continue + + runner = _create_runner(EvaluationCategory(node_info.type), evaluation_instance) + try: + evaluated = runner.evaluate_metrics( + node_run_result_list=node_run_result_list, + default_metric=default_metric, + model_provider=run_data.evaluation_model_provider, + model_name=run_data.evaluation_model, + tenant_id=run_data.tenant_id, ) - if customized_metrics: - runner = _create_runner(EvaluationCategory.WORKFLOW, evaluation_instance, session) - results.extend( - runner.run( - evaluation_run_id=run_data.evaluation_run_id, - tenant_id=run_data.tenant_id, - target_id=run_data.target_id, - target_type=run_data.target_type, - default_metric=None, - customized_metrics=customized_metrics, - node_run_result_list=None, + except Exception: + logger.exception( + "Failed metrics for %s on node %s", default_metric.metric, node_info.node_id + ) + continue + + _stamp_and_merge(evaluated, item_indices, node_info, results_by_index) + + # Phase 2: Customized metrics + if run_data.customized_metrics: + try: + customized_results = evaluation_instance.evaluate_with_customized_workflow( node_run_result_mapping_list=node_run_result_mapping_list, - judgment_config=run_data.judgment_config, - input_list=run_data.input_list, + customized_metrics=run_data.customized_metrics, + tenant_id=run_data.tenant_id, ) - ) + for result in customized_results: + _merge_result(results_by_index, result.index, result) + except Exception: + logger.exception("Failed customized metrics for run %s", run_data.evaluation_run_id) + + results = list(results_by_index.values()) + + # Phase 3: Judgment + if run_data.judgment_config: + results = _apply_judgment(results, run_data.judgment_config) + + # Phase 4: Persist — one EvaluationRunItem per test-data row + _persist_results( + session, run_data.evaluation_run_id, results, run_data.input_list, workflow_run_id_map + ) + return results -def _create_runner( - category: EvaluationCategory, - evaluation_instance: BaseEvaluationInstance, - session: Any, -) -> BaseEvaluationRunner: - """Create the appropriate runner for the evaluation category.""" - match category: - case EvaluationCategory.LLM: - return LLMEvaluationRunner(evaluation_instance, session) - case EvaluationCategory.RETRIEVAL | EvaluationCategory.KNOWLEDGE_BASE: - return RetrievalEvaluationRunner(evaluation_instance, session) - case EvaluationCategory.AGENT: - return AgentEvaluationRunner(evaluation_instance, session) - case EvaluationCategory.WORKFLOW: - return WorkflowEvaluationRunner(evaluation_instance, session) - case EvaluationCategory.SNIPPET: - return SnippetEvaluationRunner(evaluation_instance, session) - case _: - raise ValueError(f"Unknown evaluation category: {category}") - - def _execute_retrieval_test( session: Any, evaluation_run: EvaluationRun, @@ -223,54 +232,151 @@ def _execute_retrieval_test( input_list=run_data.input_list, ) - results: list[EvaluationItemResult] = [] - runner = RetrievalEvaluationRunner(evaluation_instance, session) - results.extend( - runner.run( - evaluation_run_id=run_data.evaluation_run_id, - tenant_id=run_data.tenant_id, - target_id=run_data.target_id, - target_type=run_data.target_type, - default_metric=None, - model_provider=run_data.evaluation_model_provider, - model_name=run_data.evaluation_model, - node_run_result_list=node_run_result_list, - judgment_config=run_data.judgment_config, - input_list=run_data.input_list, - ) - ) + results_by_index: dict[int, EvaluationItemResult] = {} + runner = RetrievalEvaluationRunner(evaluation_instance) + + for default_metric in run_data.default_metrics: + try: + evaluated = runner.evaluate_metrics( + node_run_result_list=node_run_result_list, + default_metric=default_metric, + model_provider=run_data.evaluation_model_provider, + model_name=run_data.evaluation_model, + tenant_id=run_data.tenant_id, + ) + item_indices = list(range(len(node_run_result_list))) + _stamp_and_merge(evaluated, item_indices, None, results_by_index) + except Exception: + logger.exception("Failed retrieval metrics for run %s", run_data.evaluation_run_id) + + results = list(results_by_index.values()) + + if run_data.judgment_config: + results = _apply_judgment(results, run_data.judgment_config) + + _persist_results(session, run_data.evaluation_run_id, results, run_data.input_list) + return results -def _backfill_workflow_run_ids( +# --------------------------------------------------------------------------- +# Helpers — merge, judgment, persist +# --------------------------------------------------------------------------- + + +def _stamp_and_merge( + evaluated: list[EvaluationItemResult], + item_indices: list[int], + node_info: NodeInfo | None, + results_by_index: dict[int, EvaluationItemResult], +) -> None: + """Attach node_info to each metric and merge into results_by_index.""" + for result in evaluated: + original_index = item_indices[result.index] + if node_info is not None: + for metric in result.metrics: + metric.node_info = node_info + _merge_result(results_by_index, original_index, result) + + +def _merge_result( + results_by_index: dict[int, EvaluationItemResult], + index: int, + new_result: EvaluationItemResult, +) -> None: + """Merge new metrics into an existing result for the same index.""" + existing = results_by_index.get(index) + if existing: + merged_metrics = existing.metrics + new_result.metrics + actual = existing.actual_output or new_result.actual_output + results_by_index[index] = existing.model_copy( + update={"metrics": merged_metrics, "actual_output": actual} + ) + else: + results_by_index[index] = new_result.model_copy(update={"index": index}) + + +def _apply_judgment( + results: list[EvaluationItemResult], + judgment_config: JudgmentConfig, +) -> list[EvaluationItemResult]: + """Evaluate pass/fail judgment conditions on each result's metrics.""" + judged: list[EvaluationItemResult] = [] + for result in results: + if result.error is not None or not result.metrics: + judged.append(result) + continue + metric_values: dict[tuple[str, str], object] = { + (m.node_info.node_id, m.name): m.value for m in result.metrics if m.node_info + } + judgment_result = JudgmentProcessor.evaluate(metric_values, judgment_config) + judged.append(result.model_copy(update={"judgment": judgment_result})) + return judged + + +def _persist_results( session: Any, evaluation_run_id: str, + results: list[EvaluationItemResult], input_list: list[EvaluationDatasetInput], - workflow_run_ids: list[str | None], + workflow_run_id_map: dict[int, str] | None = None, ) -> None: - """Set ``workflow_run_id`` on items that were created by the runner.""" - from models.evaluation import EvaluationRunItem + """Persist evaluation results — one EvaluationRunItem per test-data row.""" + dataset_map = {item.index: item for item in input_list} + wf_map = workflow_run_id_map or {} - for item, wf_run_id in zip(input_list, workflow_run_ids): - if not wf_run_id: - continue - run_item = ( - session.query(EvaluationRunItem) - .filter_by(evaluation_run_id=evaluation_run_id, item_index=item.index) - .first() + for result in results: + item_input = dataset_map.get(result.index) + run_item = EvaluationRunItem( + evaluation_run_id=evaluation_run_id, + workflow_run_id=wf_map.get(result.index), + item_index=result.index, + inputs=json.dumps(item_input.inputs) if item_input else None, + expected_output=item_input.expected_output if item_input else None, + actual_output=result.actual_output, + metrics=json.dumps([m.model_dump() for m in result.metrics]) if result.metrics else None, + judgment=json.dumps(result.judgment.model_dump()) if result.judgment else None, + metadata_json=json.dumps(result.metadata) if result.metadata else None, + error=result.error, + overall_score=getattr(result, "overall_score", None), ) - if run_item: - run_item.workflow_run_id = wf_run_id + session.add(run_item) + session.commit() +def _create_runner( + category: EvaluationCategory, + evaluation_instance: BaseEvaluationInstance, +) -> BaseEvaluationRunner: + """Create the appropriate runner for the evaluation category.""" + match category: + case EvaluationCategory.LLM: + return LLMEvaluationRunner(evaluation_instance) + case EvaluationCategory.RETRIEVAL | EvaluationCategory.KNOWLEDGE_BASE: + return RetrievalEvaluationRunner(evaluation_instance) + case EvaluationCategory.AGENT: + return AgentEvaluationRunner(evaluation_instance) + case EvaluationCategory.WORKFLOW: + return WorkflowEvaluationRunner(evaluation_instance) + case EvaluationCategory.SNIPPET: + return SnippetEvaluationRunner(evaluation_instance) + case _: + raise ValueError(f"Unknown evaluation category: {category}") + + +# --------------------------------------------------------------------------- +# Status / summary / XLSX / storage helpers (unchanged logic) +# --------------------------------------------------------------------------- + + def _mark_run_failed(session: Any, run_id: str, error: str) -> None: """Mark an evaluation run as failed.""" try: evaluation_run = session.query(EvaluationRun).filter_by(id=run_id).first() if evaluation_run: evaluation_run.status = EvaluationRunStatus.FAILED - evaluation_run.error = error[:2000] # Truncate error + evaluation_run.error = error[:2000] evaluation_run.completed_at = naive_utc_now() session.commit() except Exception: @@ -281,13 +387,7 @@ def _compute_metrics_summary( results: list[EvaluationItemResult], judgment_config: JudgmentConfig | None, ) -> dict[str, Any]: - """Compute aggregate metric and judgment summaries for an evaluation run. - - Metric statistics are calculated from successful item results only. When a - judgment config is present, the summary also reports how many successful - items passed or failed the configured judgment rules. - """ - + """Compute aggregate metric and judgment summaries for an evaluation run.""" summary: dict[str, Any] = {} if judgment_config is not None and judgment_config.conditions: @@ -344,16 +444,13 @@ def _generate_result_xlsx( if key not in input_keys: input_keys.append(key) - # Include judgment column only when at least one result has judgment conditions evaluated has_judgment = any(bool(r.judgment.condition_results) for r in results) - # Build headers judgment_headers = ["judgment"] if has_judgment else [] headers = ( ["index"] + input_keys + ["expected_output", "actual_output"] + all_metric_names + judgment_headers + ["error"] ) - # Write header row for col_idx, header in enumerate(headers, start=1): cell = ws.cell(row=1, column=col_idx, value=header) cell.font = header_font @@ -361,45 +458,36 @@ def _generate_result_xlsx( cell.alignment = header_alignment cell.border = thin_border - # Set column widths ws.column_dimensions["A"].width = 10 for col_idx in range(2, len(headers) + 1): ws.column_dimensions[get_column_letter(col_idx)].width = 25 - # Build result lookup result_by_index = {r.index: r for r in results} - # Write data rows for row_idx, item in enumerate(input_list, start=2): result = result_by_index.get(item.index) col = 1 - # Index ws.cell(row=row_idx, column=col, value=item.index).border = thin_border col += 1 - # Input values for key in input_keys: val = item.inputs.get(key, "") ws.cell(row=row_idx, column=col, value=str(val)).border = thin_border col += 1 - # Expected output ws.cell(row=row_idx, column=col, value=item.expected_output or "").border = thin_border col += 1 - # Actual output ws.cell(row=row_idx, column=col, value=result.actual_output if result else "").border = thin_border col += 1 - # Metric scores metric_scores = {m.name: m.value for m in result.metrics} if result else {} for metric_name in all_metric_names: score = metric_scores.get(metric_name) ws.cell(row=row_idx, column=col, value=score if score is not None else "").border = thin_border col += 1 - # Judgment result if has_judgment: if result and result.judgment.condition_results: judgment_value = "Pass" if result.judgment.passed else "Fail" @@ -408,7 +496,6 @@ def _generate_result_xlsx( ws.cell(row=row_idx, column=col, value=judgment_value).border = thin_border col += 1 - # Error ws.cell(row=row_idx, column=col, value=result.error if result else "").border = thin_border output = io.BytesIO() diff --git a/api/tests/unit_tests/core/evaluation/judgment/test_processor.py b/api/tests/unit_tests/core/evaluation/judgment/test_processor.py index 2dfeff0b547..6f4cdc6708d 100644 --- a/api/tests/unit_tests/core/evaluation/judgment/test_processor.py +++ b/api/tests/unit_tests/core/evaluation/judgment/test_processor.py @@ -10,24 +10,22 @@ def test_evaluate_uses_and_conditions_against_metric_values() -> None: logical_operator="and", conditions=[ JudgmentCondition( - metric_name="faithfulness", + variable_selector=["llm_node_1", "faithfulness"], comparison_operator=">", - condition_value="0.8", - condition_type="number", + value="0.8", ), JudgmentCondition( - metric_name="answer_relevancy", + variable_selector=["llm_node_1", "answer_relevancy"], comparison_operator="≥", - condition_value="0.7", - condition_type="number", + value="0.7", ), ], ) result = JudgmentProcessor.evaluate( { - "faithfulness": 0.9, - "answer_relevancy": 0.75, + ("llm_node_1", "faithfulness"): 0.9, + ("llm_node_1", "answer_relevancy"): 0.75, }, config, ) @@ -43,27 +41,105 @@ def test_evaluate_sets_passed_false_when_any_and_condition_fails() -> None: logical_operator="and", conditions=[ JudgmentCondition( - metric_name="faithfulness", + variable_selector=["llm_node_1", "faithfulness"], comparison_operator=">", - condition_value="0.8", - condition_type="number", + value="0.8", ), JudgmentCondition( - metric_name="answer_relevancy", + variable_selector=["llm_node_1", "answer_relevancy"], comparison_operator="≥", - condition_value="0.7", - condition_type="number", + value="0.7", ), ], ) result = JudgmentProcessor.evaluate( { - "faithfulness": 0.9, - "answer_relevancy": 0.6, + ("llm_node_1", "faithfulness"): 0.9, + ("llm_node_1", "answer_relevancy"): 0.6, }, config, ) assert result.passed is False assert result.condition_results[-1].passed is False + + +def test_evaluate_with_different_nodes_same_metric() -> None: + """Conditions can target different nodes even with the same metric name.""" + config = JudgmentConfig( + logical_operator="and", + conditions=[ + JudgmentCondition( + variable_selector=["llm_node_1", "faithfulness"], + comparison_operator=">", + value="0.8", + ), + JudgmentCondition( + variable_selector=["llm_node_2", "faithfulness"], + comparison_operator=">", + value="0.5", + ), + ], + ) + + result = JudgmentProcessor.evaluate( + { + ("llm_node_1", "faithfulness"): 0.9, + ("llm_node_2", "faithfulness"): 0.6, + }, + config, + ) + + assert result.passed is True + assert len(result.condition_results) == 2 + + +def test_evaluate_or_operator_passes_when_one_condition_met() -> None: + """With ``or`` logical operator, one passing condition should suffice.""" + config = JudgmentConfig( + logical_operator="or", + conditions=[ + JudgmentCondition( + variable_selector=["node_a", "score"], + comparison_operator=">", + value="0.9", + ), + JudgmentCondition( + variable_selector=["node_b", "score"], + comparison_operator=">", + value="0.5", + ), + ], + ) + + result = JudgmentProcessor.evaluate( + { + ("node_a", "score"): 0.3, + ("node_b", "score"): 0.8, + }, + config, + ) + + assert result.passed is True + + +def test_evaluate_string_contains_operator() -> None: + """String operators should work correctly via workflow engine delegation.""" + config = JudgmentConfig( + logical_operator="and", + conditions=[ + JudgmentCondition( + variable_selector=["node_a", "status"], + comparison_operator="contains", + value="success", + ), + ], + ) + + result = JudgmentProcessor.evaluate( + {("node_a", "status"): "evaluation_success_done"}, + config, + ) + + assert result.passed is True diff --git a/api/tests/unit_tests/core/evaluation/runners/test_base_evaluation_runner.py b/api/tests/unit_tests/core/evaluation/runners/test_base_evaluation_runner.py index 477a678baec..e833331e82a 100644 --- a/api/tests/unit_tests/core/evaluation/runners/test_base_evaluation_runner.py +++ b/api/tests/unit_tests/core/evaluation/runners/test_base_evaluation_runner.py @@ -1,80 +1,78 @@ -"""Tests for judgment application in the base evaluation runner.""" +"""Tests for judgment application logic (moved from BaseEvaluationRunner to evaluation_task).""" -from unittest.mock import Mock - -from core.evaluation.entities.evaluation_entity import DefaultMetric, EvaluationItemResult, EvaluationMetric +from core.evaluation.entities.evaluation_entity import EvaluationItemResult, EvaluationMetric, NodeInfo from core.evaluation.entities.judgment_entity import JudgmentCondition, JudgmentConfig -from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner +from tasks.evaluation_task import _apply_judgment + +_NODE_INFO = NodeInfo(node_id="llm_1", type="llm", title="LLM Node") -class _FakeItemInput: - def __init__(self, index: int) -> None: - self.index = index - self.inputs = {"query": "hello"} - self.expected_output = "world" - self.context = None - - -class _FakeEvaluationRun: - def __init__(self) -> None: - self.status = None - self.started_at = None - self.input_list = [_FakeItemInput(index=0)] - - -class _FakeRunner(BaseEvaluationRunner): - def evaluate_metrics( - self, - node_run_result_mapping_list, - node_run_result_list, - default_metric, - customized_metrics, - model_provider, - model_name, - tenant_id, - ) -> list[EvaluationItemResult]: - return [ - EvaluationItemResult( - index=0, - actual_output="result", - metrics=[EvaluationMetric(name="faithfulness", value=0.91)], - ) - ] - - -def test_run_applies_judgment_before_persisting_results() -> None: - """Runner should evaluate judgment rules before persisting item rows.""" - # Arrange - session = Mock() - evaluation_run = _FakeEvaluationRun() - session.query.return_value.filter_by.return_value.first.return_value = evaluation_run - - runner = _FakeRunner(evaluation_instance=Mock(), session=session) +def test_apply_judgment_marks_passing_result() -> None: + """Items whose metrics satisfy the judgment conditions should be marked as passed.""" + results = [ + EvaluationItemResult( + index=0, + actual_output="result", + metrics=[EvaluationMetric(name="faithfulness", value=0.91, node_info=_NODE_INFO)], + ) + ] judgment_config = JudgmentConfig( logical_operator="and", conditions=[ JudgmentCondition( - metric_name="faithfulness", + variable_selector=["llm_1", "faithfulness"], comparison_operator=">", - condition_value="0.8", - condition_type="number", + value="0.8", ) ], ) - # Act - results = runner.run( - evaluation_run_id="run-id", - tenant_id="tenant-id", - target_id="target-id", - target_type="app", - node_run_result_list=[Mock()], - default_metric=DefaultMetric(metric="faithfulness", node_info_list=[]), - judgment_config=judgment_config, + judged = _apply_judgment(results, judgment_config) + + assert judged[0].judgment.passed is True + + +def test_apply_judgment_marks_failing_result() -> None: + """Items whose metrics do NOT satisfy the conditions should be marked as failed.""" + results = [ + EvaluationItemResult( + index=0, + metrics=[EvaluationMetric(name="faithfulness", value=0.5, node_info=_NODE_INFO)], + ) + ] + judgment_config = JudgmentConfig( + logical_operator="and", + conditions=[ + JudgmentCondition( + variable_selector=["llm_1", "faithfulness"], + comparison_operator=">", + value="0.8", + ) + ], ) - # Assert - assert results[0].judgment.passed is True - persisted_item = session.add.call_args.args[0] - assert persisted_item.judgment is not None - assert '"passed": true' in persisted_item.judgment + judged = _apply_judgment(results, judgment_config) + + assert judged[0].judgment.passed is False + + +def test_apply_judgment_skips_errored_items() -> None: + """Items with errors should be passed through without judgment evaluation.""" + results = [ + EvaluationItemResult(index=0, error="timeout"), + ] + judgment_config = JudgmentConfig( + logical_operator="and", + conditions=[ + JudgmentCondition( + variable_selector=["llm_1", "faithfulness"], + comparison_operator=">", + value="0.8", + ) + ], + ) + + judged = _apply_judgment(results, judgment_config) + + assert judged[0].error == "timeout" + assert judged[0].judgment.passed is False diff --git a/api/tests/unit_tests/tasks/test_evaluation_task.py b/api/tests/unit_tests/tasks/test_evaluation_task.py index 34d2849d2fc..6922ec522e7 100644 --- a/api/tests/unit_tests/tasks/test_evaluation_task.py +++ b/api/tests/unit_tests/tasks/test_evaluation_task.py @@ -1,52 +1,44 @@ -"""Unit tests for evaluation task judgment aggregation helpers.""" +"""Unit tests for evaluation task helpers.""" -from core.evaluation.entities.evaluation_entity import EvaluationItemResult, EvaluationMetric +from core.evaluation.entities.evaluation_entity import EvaluationItemResult, EvaluationMetric, NodeInfo from core.evaluation.entities.judgment_entity import ( JudgmentCondition, JudgmentConfig, JudgmentResult, ) -from tasks.evaluation_task import _compute_metrics_summary +from tasks.evaluation_task import _compute_metrics_summary, _merge_result, _stamp_and_merge + +_NODE_INFO = NodeInfo(node_id="llm_1", type="llm", title="LLM Node") def test_compute_metrics_summary_includes_judgment_counts() -> None: """Summary should expose pass/fail counts when judgment rules are configured.""" - # Arrange judgment_config = JudgmentConfig( logical_operator="and", conditions=[ JudgmentCondition( - metric_name="faithfulness", + variable_selector=["llm_1", "faithfulness"], comparison_operator=">", - condition_value="0.8", - condition_type="number", + value="0.8", ) ], ) results = [ EvaluationItemResult( index=0, - metrics=[EvaluationMetric(name="faithfulness", value=0.9)], + metrics=[EvaluationMetric(name="faithfulness", value=0.9, node_info=_NODE_INFO)], judgment=JudgmentResult(passed=True, logical_operator="and", condition_results=[]), ), EvaluationItemResult( index=1, - metrics=[EvaluationMetric(name="faithfulness", value=0.4)], + metrics=[EvaluationMetric(name="faithfulness", value=0.4, node_info=_NODE_INFO)], judgment=JudgmentResult(passed=False, logical_operator="and", condition_results=[]), ), EvaluationItemResult(index=2, error="timeout"), ] - # Act summary = _compute_metrics_summary(results, judgment_config) - # Assert - assert summary["faithfulness"] == { - "average": 0.65, - "min": 0.4, - "max": 0.9, - "count": 2, - } assert summary["_judgment"] == { "enabled": True, "logical_operator": "and", @@ -56,3 +48,50 @@ def test_compute_metrics_summary_includes_judgment_counts() -> None: "failed_items": 1, "pass_rate": 0.5, } + + +def test_merge_result_combines_metrics_for_same_index() -> None: + """Merging two results with the same index should concatenate their metrics.""" + results_by_index: dict[int, EvaluationItemResult] = {} + + first = EvaluationItemResult( + index=0, + actual_output="output_1", + metrics=[EvaluationMetric(name="faithfulness", value=0.9)], + ) + _merge_result(results_by_index, 0, first) + + second = EvaluationItemResult( + index=0, + actual_output="output_2", + metrics=[EvaluationMetric(name="context_precision", value=0.7)], + ) + _merge_result(results_by_index, 0, second) + + merged = results_by_index[0] + assert len(merged.metrics) == 2 + assert merged.metrics[0].name == "faithfulness" + assert merged.metrics[1].name == "context_precision" + assert merged.actual_output == "output_1" + + +def test_stamp_and_merge_attaches_node_info() -> None: + """_stamp_and_merge should set node_info on every metric and remap indices.""" + results_by_index: dict[int, EvaluationItemResult] = {} + node_info = NodeInfo(node_id="llm_1", type="llm", title="GPT-4") + + evaluated = [ + EvaluationItemResult( + index=0, + metrics=[EvaluationMetric(name="faithfulness", value=0.85)], + ) + ] + item_indices = [3] + + _stamp_and_merge(evaluated, item_indices, node_info, results_by_index) + + assert 3 in results_by_index + metric = results_by_index[3].metrics[0] + assert metric.node_info is not None + assert metric.node_info.node_id == "llm_1" + assert metric.node_info.type == "llm"