Feat/add status filter to workflow runs (#26850)

Co-authored-by: Jacky Su <jacky_su@trendmicro.com>
This commit is contained in:
Jacky Su
2025-10-18 12:15:29 +08:00
committed by GitHub
parent 1a37989769
commit ac79691d69
10 changed files with 851 additions and 19 deletions

View File

@@ -59,6 +59,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
triggered_from: str,
limit: int = 20,
last_id: str | None = None,
status: str | None = None,
) -> InfiniteScrollPagination:
"""
Get paginated workflow runs with filtering.
@@ -73,6 +74,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
triggered_from: Filter by trigger source (e.g., "debugging", "app-run")
limit: Maximum number of records to return (default: 20)
last_id: Cursor for pagination - ID of the last record from previous page
status: Optional filter by status (e.g., "running", "succeeded", "failed")
Returns:
InfiniteScrollPagination object containing:
@@ -107,6 +109,43 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
"""
...
def get_workflow_runs_count(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
status: str | None = None,
time_range: str | None = None,
) -> dict[str, int]:
"""
Get workflow runs count statistics.
Retrieves total count and count by status for workflow runs
matching the specified filters.
Args:
tenant_id: Tenant identifier for multi-tenant isolation
app_id: Application identifier
triggered_from: Filter by trigger source (e.g., "debugging", "app-run")
status: Optional filter by specific status
time_range: Optional time range filter (e.g., "7d", "4h", "30m", "30s")
Filters records based on created_at field
Returns:
Dictionary containing:
- total: Total count of all workflow runs (or filtered by status)
- running: Count of workflow runs with status "running"
- succeeded: Count of workflow runs with status "succeeded"
- failed: Count of workflow runs with status "failed"
- stopped: Count of workflow runs with status "stopped"
- partial_succeeded: Count of workflow runs with status "partial-succeeded"
Note: If a status is provided, 'total' will be the count for that status,
and the specific status count will also be set to this value, with all
other status counts being 0.
"""
...
def get_expired_runs_batch(
self,
tenant_id: str,

View File

@@ -24,11 +24,12 @@ from collections.abc import Sequence
from datetime import datetime
from typing import cast
from sqlalchemy import delete, select
from sqlalchemy import delete, func, select
from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session, sessionmaker
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from libs.time_parser import get_time_threshold
from models.workflow import WorkflowRun
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
@@ -63,6 +64,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
triggered_from: str,
limit: int = 20,
last_id: str | None = None,
status: str | None = None,
) -> InfiniteScrollPagination:
"""
Get paginated workflow runs with filtering.
@@ -79,6 +81,10 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
WorkflowRun.triggered_from == triggered_from,
)
# Add optional status filter
if status:
base_stmt = base_stmt.where(WorkflowRun.status == status)
if last_id:
# Get the last workflow run for cursor-based pagination
last_run_stmt = base_stmt.where(WorkflowRun.id == last_id)
@@ -120,6 +126,73 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
)
return session.scalar(stmt)
def get_workflow_runs_count(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
status: str | None = None,
time_range: str | None = None,
) -> dict[str, int]:
"""
Get workflow runs count statistics grouped by status.
"""
_initial_status_counts = {
"running": 0,
"succeeded": 0,
"failed": 0,
"stopped": 0,
"partial-succeeded": 0,
}
with self._session_maker() as session:
# Build base where conditions
base_conditions = [
WorkflowRun.tenant_id == tenant_id,
WorkflowRun.app_id == app_id,
WorkflowRun.triggered_from == triggered_from,
]
# Add time range filter if provided
if time_range:
time_threshold = get_time_threshold(time_range)
if time_threshold:
base_conditions.append(WorkflowRun.created_at >= time_threshold)
# If status filter is provided, return simple count
if status:
count_stmt = select(func.count(WorkflowRun.id)).where(*base_conditions, WorkflowRun.status == status)
total = session.scalar(count_stmt) or 0
result = {"total": total} | _initial_status_counts
# Set the count for the filtered status
if status in result:
result[status] = total
return result
# No status filter - get counts grouped by status
base_stmt = (
select(WorkflowRun.status, func.count(WorkflowRun.id).label("count"))
.where(*base_conditions)
.group_by(WorkflowRun.status)
)
# Execute query
results = session.execute(base_stmt).all()
# Build response dictionary
status_counts = _initial_status_counts.copy()
total = 0
for status_val, count in results:
total += count
if status_val in status_counts:
status_counts[status_val] = count
return {"total": total} | status_counts
def get_expired_runs_batch(
self,
tenant_id: str,