mirror of
https://github.com/langgenius/dify.git
synced 2026-04-05 20:22:39 +08:00
542 lines
20 KiB
Python
542 lines
20 KiB
Python
import io
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
|
|
from celery import shared_task
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
|
|
from openpyxl.utils import get_column_letter
|
|
|
|
from configs import dify_config
|
|
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
|
|
from core.evaluation.entities.evaluation_entity import (
|
|
EvaluationCategory,
|
|
EvaluationDatasetInput,
|
|
EvaluationItemResult,
|
|
EvaluationRunData,
|
|
NodeInfo,
|
|
)
|
|
from core.evaluation.entities.judgment_entity import JudgmentConfig
|
|
from core.evaluation.evaluation_manager import EvaluationManager
|
|
from core.evaluation.judgment.processor import JudgmentProcessor
|
|
from core.evaluation.runners.agent_evaluation_runner import AgentEvaluationRunner
|
|
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
|
|
from core.evaluation.runners.llm_evaluation_runner import LLMEvaluationRunner
|
|
from core.evaluation.runners.retrieval_evaluation_runner import RetrievalEvaluationRunner
|
|
from core.evaluation.runners.snippet_evaluation_runner import SnippetEvaluationRunner
|
|
from core.evaluation.runners.workflow_evaluation_runner import WorkflowEvaluationRunner
|
|
from extensions.ext_database import db
|
|
from graphon.node_events import NodeRunResult
|
|
from libs.datetime_utils import naive_utc_now
|
|
from models.enums import CreatorUserRole
|
|
from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus
|
|
from models.model import UploadFile
|
|
from services.evaluation_service import EvaluationService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@shared_task(queue="evaluation")
|
|
def run_evaluation(run_data_dict: dict[str, Any]) -> None:
|
|
"""Celery task for running evaluations asynchronously.
|
|
|
|
Workflow:
|
|
1. Deserialize EvaluationRunData
|
|
2. Execute target and collect node results
|
|
3. Evaluate metrics via runners (one per metric-node pair)
|
|
4. Merge results per test-data row (1 item = 1 EvaluationRunItem)
|
|
5. Apply judgment conditions
|
|
6. Persist results + generate result XLSX
|
|
7. Update EvaluationRun status to COMPLETED
|
|
"""
|
|
run_data = EvaluationRunData.model_validate(run_data_dict)
|
|
|
|
with db.engine.connect() as connection:
|
|
from sqlalchemy.orm import Session
|
|
|
|
session = Session(bind=connection)
|
|
|
|
try:
|
|
_execute_evaluation(session, run_data)
|
|
except Exception as e:
|
|
logger.exception("Evaluation run %s failed", run_data.evaluation_run_id)
|
|
_mark_run_failed(session, run_data.evaluation_run_id, str(e))
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
|
|
"""Core evaluation execution logic."""
|
|
evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first()
|
|
if not evaluation_run:
|
|
logger.error("EvaluationRun %s not found", run_data.evaluation_run_id)
|
|
return
|
|
|
|
if evaluation_run.status == EvaluationRunStatus.CANCELLED:
|
|
logger.info("EvaluationRun %s was cancelled", run_data.evaluation_run_id)
|
|
return
|
|
|
|
evaluation_instance = EvaluationManager.get_evaluation_instance()
|
|
if evaluation_instance is None:
|
|
raise ValueError("Evaluation framework not configured")
|
|
|
|
# Mark as running
|
|
evaluation_run.status = EvaluationRunStatus.RUNNING
|
|
evaluation_run.started_at = naive_utc_now()
|
|
session.commit()
|
|
|
|
if run_data.target_type == "dataset":
|
|
results: list[EvaluationItemResult] = _execute_retrieval_test(
|
|
session=session,
|
|
evaluation_run=evaluation_run,
|
|
run_data=run_data,
|
|
evaluation_instance=evaluation_instance,
|
|
)
|
|
else:
|
|
evaluation_service = EvaluationService()
|
|
node_run_result_mapping_list, workflow_run_ids = evaluation_service.execute_targets(
|
|
tenant_id=run_data.tenant_id,
|
|
target_type=run_data.target_type,
|
|
target_id=run_data.target_id,
|
|
input_list=run_data.input_list,
|
|
)
|
|
|
|
workflow_run_id_map = {
|
|
item.index: wf_run_id
|
|
for item, wf_run_id in zip(run_data.input_list, workflow_run_ids)
|
|
if wf_run_id
|
|
}
|
|
|
|
results = _execute_evaluation_runner(
|
|
session=session,
|
|
run_data=run_data,
|
|
evaluation_instance=evaluation_instance,
|
|
node_run_result_mapping_list=node_run_result_mapping_list,
|
|
workflow_run_id_map=workflow_run_id_map,
|
|
)
|
|
|
|
# Compute summary metrics
|
|
metrics_summary = _compute_metrics_summary(results, run_data.judgment_config)
|
|
|
|
# Generate result XLSX
|
|
result_xlsx = _generate_result_xlsx(run_data.input_list, results)
|
|
|
|
# Store result file
|
|
result_file_id = _store_result_file(run_data.tenant_id, run_data.evaluation_run_id, result_xlsx, session)
|
|
|
|
# Update run to completed
|
|
evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first()
|
|
if evaluation_run:
|
|
evaluation_run.status = EvaluationRunStatus.COMPLETED
|
|
evaluation_run.completed_at = naive_utc_now()
|
|
evaluation_run.metrics_summary = json.dumps(metrics_summary)
|
|
if result_file_id:
|
|
evaluation_run.result_file_id = result_file_id
|
|
session.commit()
|
|
|
|
logger.info("Evaluation run %s completed successfully", run_data.evaluation_run_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Evaluation orchestration — merge + judgment + persist
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _execute_evaluation_runner(
|
|
session: Any,
|
|
run_data: EvaluationRunData,
|
|
evaluation_instance: BaseEvaluationInstance,
|
|
node_run_result_mapping_list: list[dict[str, NodeRunResult]],
|
|
workflow_run_id_map: dict[int, str] | None = None,
|
|
) -> list[EvaluationItemResult]:
|
|
"""Evaluate all metrics, merge per-item, apply judgment, persist once.
|
|
|
|
Ensures 1 test-data row = 1 EvaluationRunItem with all metrics combined.
|
|
"""
|
|
results_by_index: dict[int, EvaluationItemResult] = {}
|
|
|
|
# Phase 1: Default metrics — one batch per (metric, node) pair
|
|
for default_metric in run_data.default_metrics:
|
|
for node_info in default_metric.node_info_list:
|
|
node_run_result_list: list[NodeRunResult] = []
|
|
item_indices: list[int] = []
|
|
for i, mapping in enumerate(node_run_result_mapping_list):
|
|
node_result = mapping.get(node_info.node_id)
|
|
if node_result is not None:
|
|
node_run_result_list.append(node_result)
|
|
item_indices.append(i)
|
|
|
|
if not node_run_result_list:
|
|
continue
|
|
|
|
runner = _create_runner(EvaluationCategory(node_info.type), evaluation_instance)
|
|
try:
|
|
evaluated = runner.evaluate_metrics(
|
|
node_run_result_list=node_run_result_list,
|
|
default_metric=default_metric,
|
|
model_provider=run_data.evaluation_model_provider,
|
|
model_name=run_data.evaluation_model,
|
|
tenant_id=run_data.tenant_id,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed metrics for %s on node %s", default_metric.metric, node_info.node_id
|
|
)
|
|
continue
|
|
|
|
_stamp_and_merge(evaluated, item_indices, node_info, results_by_index)
|
|
|
|
# Phase 2: Customized metrics
|
|
if run_data.customized_metrics:
|
|
try:
|
|
customized_results = evaluation_instance.evaluate_with_customized_workflow(
|
|
node_run_result_mapping_list=node_run_result_mapping_list,
|
|
customized_metrics=run_data.customized_metrics,
|
|
tenant_id=run_data.tenant_id,
|
|
)
|
|
for result in customized_results:
|
|
_merge_result(results_by_index, result.index, result)
|
|
except Exception:
|
|
logger.exception("Failed customized metrics for run %s", run_data.evaluation_run_id)
|
|
|
|
results = list(results_by_index.values())
|
|
|
|
# Phase 3: Judgment
|
|
if run_data.judgment_config:
|
|
results = _apply_judgment(results, run_data.judgment_config)
|
|
|
|
# Phase 4: Persist — one EvaluationRunItem per test-data row
|
|
_persist_results(
|
|
session, run_data.evaluation_run_id, results, run_data.input_list, workflow_run_id_map
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
def _execute_retrieval_test(
|
|
session: Any,
|
|
evaluation_run: EvaluationRun,
|
|
run_data: EvaluationRunData,
|
|
evaluation_instance: BaseEvaluationInstance,
|
|
) -> list[EvaluationItemResult]:
|
|
"""Execute knowledge base retrieval for all items, then evaluate metrics.
|
|
|
|
Unlike the workflow-based path, there are no workflow nodes to traverse.
|
|
Hit testing is run directly for each dataset item and the results are fed
|
|
straight into :class:`RetrievalEvaluationRunner`.
|
|
"""
|
|
node_run_result_list = EvaluationService.execute_retrieval_test_targets(
|
|
dataset_id=run_data.target_id,
|
|
account_id=evaluation_run.created_by,
|
|
input_list=run_data.input_list,
|
|
)
|
|
|
|
results_by_index: dict[int, EvaluationItemResult] = {}
|
|
runner = RetrievalEvaluationRunner(evaluation_instance)
|
|
|
|
for default_metric in run_data.default_metrics:
|
|
try:
|
|
evaluated = runner.evaluate_metrics(
|
|
node_run_result_list=node_run_result_list,
|
|
default_metric=default_metric,
|
|
model_provider=run_data.evaluation_model_provider,
|
|
model_name=run_data.evaluation_model,
|
|
tenant_id=run_data.tenant_id,
|
|
)
|
|
item_indices = list(range(len(node_run_result_list)))
|
|
_stamp_and_merge(evaluated, item_indices, None, results_by_index)
|
|
except Exception:
|
|
logger.exception("Failed retrieval metrics for run %s", run_data.evaluation_run_id)
|
|
|
|
results = list(results_by_index.values())
|
|
|
|
if run_data.judgment_config:
|
|
results = _apply_judgment(results, run_data.judgment_config)
|
|
|
|
_persist_results(session, run_data.evaluation_run_id, results, run_data.input_list)
|
|
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers — merge, judgment, persist
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _stamp_and_merge(
|
|
evaluated: list[EvaluationItemResult],
|
|
item_indices: list[int],
|
|
node_info: NodeInfo | None,
|
|
results_by_index: dict[int, EvaluationItemResult],
|
|
) -> None:
|
|
"""Attach node_info to each metric and merge into results_by_index."""
|
|
for result in evaluated:
|
|
original_index = item_indices[result.index]
|
|
if node_info is not None:
|
|
for metric in result.metrics:
|
|
metric.node_info = node_info
|
|
_merge_result(results_by_index, original_index, result)
|
|
|
|
|
|
def _merge_result(
|
|
results_by_index: dict[int, EvaluationItemResult],
|
|
index: int,
|
|
new_result: EvaluationItemResult,
|
|
) -> None:
|
|
"""Merge new metrics into an existing result for the same index."""
|
|
existing = results_by_index.get(index)
|
|
if existing:
|
|
merged_metrics = existing.metrics + new_result.metrics
|
|
actual = existing.actual_output or new_result.actual_output
|
|
results_by_index[index] = existing.model_copy(
|
|
update={"metrics": merged_metrics, "actual_output": actual}
|
|
)
|
|
else:
|
|
results_by_index[index] = new_result.model_copy(update={"index": index})
|
|
|
|
|
|
def _apply_judgment(
|
|
results: list[EvaluationItemResult],
|
|
judgment_config: JudgmentConfig,
|
|
) -> list[EvaluationItemResult]:
|
|
"""Evaluate pass/fail judgment conditions on each result's metrics."""
|
|
judged: list[EvaluationItemResult] = []
|
|
for result in results:
|
|
if result.error is not None or not result.metrics:
|
|
judged.append(result)
|
|
continue
|
|
metric_values: dict[tuple[str, str], object] = {
|
|
(m.node_info.node_id, m.name): m.value for m in result.metrics if m.node_info
|
|
}
|
|
judgment_result = JudgmentProcessor.evaluate(metric_values, judgment_config)
|
|
judged.append(result.model_copy(update={"judgment": judgment_result}))
|
|
return judged
|
|
|
|
|
|
def _persist_results(
|
|
session: Any,
|
|
evaluation_run_id: str,
|
|
results: list[EvaluationItemResult],
|
|
input_list: list[EvaluationDatasetInput],
|
|
workflow_run_id_map: dict[int, str] | None = None,
|
|
) -> None:
|
|
"""Persist evaluation results — one EvaluationRunItem per test-data row."""
|
|
dataset_map = {item.index: item for item in input_list}
|
|
wf_map = workflow_run_id_map or {}
|
|
|
|
for result in results:
|
|
item_input = dataset_map.get(result.index)
|
|
run_item = EvaluationRunItem(
|
|
evaluation_run_id=evaluation_run_id,
|
|
workflow_run_id=wf_map.get(result.index),
|
|
item_index=result.index,
|
|
inputs=json.dumps(item_input.inputs) if item_input else None,
|
|
expected_output=item_input.expected_output if item_input else None,
|
|
actual_output=result.actual_output,
|
|
metrics=json.dumps([m.model_dump() for m in result.metrics]) if result.metrics else None,
|
|
judgment=json.dumps(result.judgment.model_dump()) if result.judgment else None,
|
|
metadata_json=json.dumps(result.metadata) if result.metadata else None,
|
|
error=result.error,
|
|
overall_score=getattr(result, "overall_score", None),
|
|
)
|
|
session.add(run_item)
|
|
|
|
session.commit()
|
|
|
|
|
|
def _create_runner(
|
|
category: EvaluationCategory,
|
|
evaluation_instance: BaseEvaluationInstance,
|
|
) -> BaseEvaluationRunner:
|
|
"""Create the appropriate runner for the evaluation category."""
|
|
match category:
|
|
case EvaluationCategory.LLM:
|
|
return LLMEvaluationRunner(evaluation_instance)
|
|
case EvaluationCategory.RETRIEVAL | EvaluationCategory.KNOWLEDGE_BASE:
|
|
return RetrievalEvaluationRunner(evaluation_instance)
|
|
case EvaluationCategory.AGENT:
|
|
return AgentEvaluationRunner(evaluation_instance)
|
|
case EvaluationCategory.WORKFLOW:
|
|
return WorkflowEvaluationRunner(evaluation_instance)
|
|
case EvaluationCategory.SNIPPET:
|
|
return SnippetEvaluationRunner(evaluation_instance)
|
|
case _:
|
|
raise ValueError(f"Unknown evaluation category: {category}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status / summary / XLSX / storage helpers (unchanged logic)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _mark_run_failed(session: Any, run_id: str, error: str) -> None:
|
|
"""Mark an evaluation run as failed."""
|
|
try:
|
|
evaluation_run = session.query(EvaluationRun).filter_by(id=run_id).first()
|
|
if evaluation_run:
|
|
evaluation_run.status = EvaluationRunStatus.FAILED
|
|
evaluation_run.error = error[:2000]
|
|
evaluation_run.completed_at = naive_utc_now()
|
|
session.commit()
|
|
except Exception:
|
|
logger.exception("Failed to mark run %s as failed", run_id)
|
|
|
|
|
|
def _compute_metrics_summary(
|
|
results: list[EvaluationItemResult],
|
|
judgment_config: JudgmentConfig | None,
|
|
) -> dict[str, Any]:
|
|
"""Compute aggregate metric and judgment summaries for an evaluation run."""
|
|
summary: dict[str, Any] = {}
|
|
|
|
if judgment_config is not None and judgment_config.conditions:
|
|
evaluated_results: list[EvaluationItemResult] = [
|
|
result for result in results if result.error is None and result.metrics
|
|
]
|
|
passed_items = sum(1 for result in evaluated_results if result.judgment.passed)
|
|
evaluated_items = len(evaluated_results)
|
|
summary["_judgment"] = {
|
|
"enabled": True,
|
|
"logical_operator": judgment_config.logical_operator,
|
|
"configured_conditions": len(judgment_config.conditions),
|
|
"evaluated_items": evaluated_items,
|
|
"passed_items": passed_items,
|
|
"failed_items": evaluated_items - passed_items,
|
|
"pass_rate": passed_items / evaluated_items if evaluated_items else 0.0,
|
|
}
|
|
|
|
return summary
|
|
|
|
|
|
def _generate_result_xlsx(
|
|
input_list: list[EvaluationDatasetInput],
|
|
results: list[EvaluationItemResult],
|
|
) -> bytes:
|
|
"""Generate result XLSX with input data, actual output, metric scores, and judgment."""
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
if ws is None:
|
|
ws = wb.create_sheet("Evaluation Results")
|
|
ws.title = "Evaluation Results"
|
|
|
|
header_font = Font(bold=True, color="FFFFFF")
|
|
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
|
|
header_alignment = Alignment(horizontal="center", vertical="center")
|
|
thin_border = Border(
|
|
left=Side(style="thin"),
|
|
right=Side(style="thin"),
|
|
top=Side(style="thin"),
|
|
bottom=Side(style="thin"),
|
|
)
|
|
|
|
# Collect all metric names
|
|
all_metric_names: list[str] = []
|
|
for result in results:
|
|
for metric in result.metrics:
|
|
if metric.name not in all_metric_names:
|
|
all_metric_names.append(metric.name)
|
|
|
|
# Collect all input keys
|
|
input_keys: list[str] = []
|
|
for item in input_list:
|
|
for key in item.inputs:
|
|
if key not in input_keys:
|
|
input_keys.append(key)
|
|
|
|
has_judgment = any(bool(r.judgment.condition_results) for r in results)
|
|
|
|
judgment_headers = ["judgment"] if has_judgment else []
|
|
headers = (
|
|
["index"] + input_keys + ["expected_output", "actual_output"] + all_metric_names + judgment_headers + ["error"]
|
|
)
|
|
|
|
for col_idx, header in enumerate(headers, start=1):
|
|
cell = ws.cell(row=1, column=col_idx, value=header)
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
cell.alignment = header_alignment
|
|
cell.border = thin_border
|
|
|
|
ws.column_dimensions["A"].width = 10
|
|
for col_idx in range(2, len(headers) + 1):
|
|
ws.column_dimensions[get_column_letter(col_idx)].width = 25
|
|
|
|
result_by_index = {r.index: r for r in results}
|
|
|
|
for row_idx, item in enumerate(input_list, start=2):
|
|
result = result_by_index.get(item.index)
|
|
|
|
col = 1
|
|
ws.cell(row=row_idx, column=col, value=item.index).border = thin_border
|
|
col += 1
|
|
|
|
for key in input_keys:
|
|
val = item.inputs.get(key, "")
|
|
ws.cell(row=row_idx, column=col, value=str(val)).border = thin_border
|
|
col += 1
|
|
|
|
ws.cell(row=row_idx, column=col, value=item.expected_output or "").border = thin_border
|
|
col += 1
|
|
|
|
ws.cell(row=row_idx, column=col, value=result.actual_output if result else "").border = thin_border
|
|
col += 1
|
|
|
|
metric_scores = {m.name: m.value for m in result.metrics} if result else {}
|
|
for metric_name in all_metric_names:
|
|
score = metric_scores.get(metric_name)
|
|
ws.cell(row=row_idx, column=col, value=score if score is not None else "").border = thin_border
|
|
col += 1
|
|
|
|
if has_judgment:
|
|
if result and result.judgment.condition_results:
|
|
judgment_value = "Pass" if result.judgment.passed else "Fail"
|
|
else:
|
|
judgment_value = ""
|
|
ws.cell(row=row_idx, column=col, value=judgment_value).border = thin_border
|
|
col += 1
|
|
|
|
ws.cell(row=row_idx, column=col, value=result.error if result else "").border = thin_border
|
|
|
|
output = io.BytesIO()
|
|
wb.save(output)
|
|
output.seek(0)
|
|
return output.getvalue()
|
|
|
|
|
|
def _store_result_file(
|
|
tenant_id: str,
|
|
run_id: str,
|
|
xlsx_content: bytes,
|
|
session: Any,
|
|
) -> str | None:
|
|
"""Store result XLSX file and return the UploadFile ID."""
|
|
try:
|
|
from extensions.ext_storage import storage
|
|
from libs.uuid_utils import uuidv7
|
|
|
|
filename = f"evaluation-result-{run_id[:8]}.xlsx"
|
|
storage_key = f"evaluation_results/{tenant_id}/{str(uuidv7())}.xlsx"
|
|
|
|
storage.save(storage_key, xlsx_content)
|
|
|
|
upload_file: UploadFile = UploadFile(
|
|
tenant_id=tenant_id,
|
|
storage_type=dify_config.STORAGE_TYPE,
|
|
key=storage_key,
|
|
name=filename,
|
|
size=len(xlsx_content),
|
|
extension="xlsx",
|
|
mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
created_by_role=CreatorUserRole.ACCOUNT,
|
|
created_by="system",
|
|
created_at=naive_utc_now(),
|
|
used=False,
|
|
)
|
|
session.add(upload_file)
|
|
session.commit()
|
|
return upload_file.id
|
|
except Exception:
|
|
logger.exception("Failed to store result file for run %s", run_id)
|
|
return None
|