Files
dify/api/tests/unit_tests/services/test_schedule_service.py
Poojan 5bafb163cc test: add unit tests for services and tasks part-4 (#33223)
Co-authored-by: akashseth-ifp <akash.seth@infocusp.com>
Co-authored-by: rajatagarwal-oss <rajat.agarwal@infocusp.com>
Co-authored-by: Dev Sharma <50591491+cryptus-neoxys@users.noreply.github.com>
Co-authored-by: sahil-infocusp <73810410+sahil-infocusp@users.noreply.github.com>
2026-04-02 08:35:46 +00:00

938 lines
36 KiB
Python

import unittest
from datetime import UTC, datetime
from types import SimpleNamespace
from typing import Any, cast
from unittest.mock import MagicMock, Mock, patch
import pytest
from sqlalchemy.orm import Session
from core.trigger.constants import TRIGGER_SCHEDULE_NODE_TYPE
from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, SchedulePlanUpdate, VisualConfig
from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError, ScheduleNotFoundError
from events.event_handlers.sync_workflow_schedule_when_app_published import (
sync_schedule_from_workflow,
)
from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h
from models.account import Account, TenantAccountJoin
from models.trigger import WorkflowSchedulePlan
from models.workflow import Workflow
from services.errors.account import AccountNotFoundError
from services.trigger import schedule_service as service_module
from services.trigger.schedule_service import ScheduleService
class TestScheduleService(unittest.TestCase):
"""Test cases for ScheduleService class."""
def test_calculate_next_run_at_valid_cron(self):
"""Test calculating next run time with valid cron expression."""
# Test daily cron at 10:30 AM
cron_expr = "30 10 * * *"
timezone = "UTC"
base_time = datetime(2025, 8, 29, 9, 0, 0, tzinfo=UTC)
next_run = calculate_next_run_at(cron_expr, timezone, base_time)
assert next_run is not None
assert next_run.hour == 10
assert next_run.minute == 30
assert next_run.day == 29
def test_calculate_next_run_at_with_timezone(self):
"""Test calculating next run time with different timezone."""
cron_expr = "0 9 * * *" # 9:00 AM
timezone = "America/New_York"
base_time = datetime(2025, 8, 29, 12, 0, 0, tzinfo=UTC) # 8:00 AM EDT
next_run = calculate_next_run_at(cron_expr, timezone, base_time)
assert next_run is not None
# 9:00 AM EDT = 13:00 UTC (during EDT)
expected_utc_hour = 13
assert next_run.hour == expected_utc_hour
def test_calculate_next_run_at_with_last_day_of_month(self):
"""Test calculating next run time with 'L' (last day) syntax."""
cron_expr = "0 10 L * *" # 10:00 AM on last day of month
timezone = "UTC"
base_time = datetime(2025, 2, 15, 9, 0, 0, tzinfo=UTC)
next_run = calculate_next_run_at(cron_expr, timezone, base_time)
assert next_run is not None
# February 2025 has 28 days
assert next_run.day == 28
assert next_run.month == 2
def test_calculate_next_run_at_invalid_cron(self):
"""Test calculating next run time with invalid cron expression."""
cron_expr = "invalid cron"
timezone = "UTC"
with pytest.raises(ValueError):
calculate_next_run_at(cron_expr, timezone)
def test_calculate_next_run_at_invalid_timezone(self):
"""Test calculating next run time with invalid timezone."""
from pytz import UnknownTimeZoneError
cron_expr = "30 10 * * *"
timezone = "Invalid/Timezone"
with pytest.raises(UnknownTimeZoneError):
calculate_next_run_at(cron_expr, timezone)
@patch("libs.schedule_utils.calculate_next_run_at")
def test_create_schedule(self, mock_calculate_next_run):
"""Test creating a new schedule."""
mock_session = MagicMock(spec=Session)
mock_calculate_next_run.return_value = datetime(2025, 8, 30, 10, 30, 0, tzinfo=UTC)
config = ScheduleConfig(
node_id="start",
cron_expression="30 10 * * *",
timezone="UTC",
)
schedule = ScheduleService.create_schedule(
session=mock_session,
tenant_id="test-tenant",
app_id="test-app",
config=config,
)
assert schedule is not None
assert schedule.tenant_id == "test-tenant"
assert schedule.app_id == "test-app"
assert schedule.node_id == "start"
assert schedule.cron_expression == "30 10 * * *"
assert schedule.timezone == "UTC"
assert schedule.next_run_at is not None
mock_session.add.assert_called_once()
mock_session.flush.assert_called_once()
@patch("services.trigger.schedule_service.calculate_next_run_at")
def test_update_schedule(self, mock_calculate_next_run):
"""Test updating an existing schedule."""
mock_session = MagicMock(spec=Session)
mock_schedule = Mock(spec=WorkflowSchedulePlan)
mock_schedule.cron_expression = "0 12 * * *"
mock_schedule.timezone = "America/New_York"
mock_session.get.return_value = mock_schedule
mock_calculate_next_run.return_value = datetime(2025, 8, 30, 12, 0, 0, tzinfo=UTC)
updates = SchedulePlanUpdate(
cron_expression="0 12 * * *",
timezone="America/New_York",
)
result = ScheduleService.update_schedule(
session=mock_session,
schedule_id="test-schedule-id",
updates=updates,
)
assert result is not None
assert result.cron_expression == "0 12 * * *"
assert result.timezone == "America/New_York"
mock_calculate_next_run.assert_called_once()
mock_session.flush.assert_called_once()
def test_update_schedule_not_found(self):
"""Test updating a non-existent schedule raises exception."""
from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError
mock_session = MagicMock(spec=Session)
mock_session.get.return_value = None
updates = SchedulePlanUpdate(
cron_expression="0 12 * * *",
)
with pytest.raises(ScheduleNotFoundError) as context:
ScheduleService.update_schedule(
session=mock_session,
schedule_id="non-existent-id",
updates=updates,
)
assert "Schedule not found: non-existent-id" in str(context.value)
mock_session.flush.assert_not_called()
def test_delete_schedule(self):
"""Test deleting a schedule."""
mock_session = MagicMock(spec=Session)
mock_schedule = Mock(spec=WorkflowSchedulePlan)
mock_session.get.return_value = mock_schedule
# Should not raise exception and complete successfully
ScheduleService.delete_schedule(
session=mock_session,
schedule_id="test-schedule-id",
)
mock_session.delete.assert_called_once_with(mock_schedule)
mock_session.flush.assert_called_once()
def test_delete_schedule_not_found(self):
"""Test deleting a non-existent schedule raises exception."""
from core.workflow.nodes.trigger_schedule.exc import ScheduleNotFoundError
mock_session = MagicMock(spec=Session)
mock_session.get.return_value = None
# Should raise ScheduleNotFoundError
with pytest.raises(ScheduleNotFoundError) as context:
ScheduleService.delete_schedule(
session=mock_session,
schedule_id="non-existent-id",
)
assert "Schedule not found: non-existent-id" in str(context.value)
mock_session.delete.assert_not_called()
@patch("services.trigger.schedule_service.select")
def test_get_tenant_owner(self, mock_select):
"""Test getting tenant owner account."""
mock_session = MagicMock(spec=Session)
mock_account = Mock(spec=Account)
mock_account.id = "owner-account-id"
# Mock owner query
mock_owner_result = Mock(spec=TenantAccountJoin)
mock_owner_result.account_id = "owner-account-id"
mock_session.execute.return_value.scalar_one_or_none.return_value = mock_owner_result
mock_session.get.return_value = mock_account
result = ScheduleService.get_tenant_owner(
session=mock_session,
tenant_id="test-tenant",
)
assert result is not None
assert result.id == "owner-account-id"
@patch("services.trigger.schedule_service.select")
def test_get_tenant_owner_fallback_to_admin(self, mock_select):
"""Test getting tenant owner falls back to admin if no owner."""
mock_session = MagicMock(spec=Session)
mock_account = Mock(spec=Account)
mock_account.id = "admin-account-id"
# Mock admin query (owner returns None)
mock_admin_result = Mock(spec=TenantAccountJoin)
mock_admin_result.account_id = "admin-account-id"
mock_session.execute.return_value.scalar_one_or_none.side_effect = [None, mock_admin_result]
mock_session.get.return_value = mock_account
result = ScheduleService.get_tenant_owner(
session=mock_session,
tenant_id="test-tenant",
)
assert result is not None
assert result.id == "admin-account-id"
@patch("services.trigger.schedule_service.calculate_next_run_at")
def test_update_next_run_at(self, mock_calculate_next_run):
"""Test updating next run time after schedule triggered."""
mock_session = MagicMock(spec=Session)
mock_schedule = Mock(spec=WorkflowSchedulePlan)
mock_schedule.cron_expression = "30 10 * * *"
mock_schedule.timezone = "UTC"
mock_session.get.return_value = mock_schedule
next_time = datetime(2025, 8, 31, 10, 30, 0, tzinfo=UTC)
mock_calculate_next_run.return_value = next_time
result = ScheduleService.update_next_run_at(
session=mock_session,
schedule_id="test-schedule-id",
)
assert result == next_time
assert mock_schedule.next_run_at == next_time
mock_session.flush.assert_called_once()
class TestVisualToCron(unittest.TestCase):
"""Test cases for visual configuration to cron conversion."""
def test_visual_to_cron_hourly(self):
"""Test converting hourly visual config to cron."""
visual_config = VisualConfig(on_minute=15)
result = ScheduleService.visual_to_cron("hourly", visual_config)
assert result == "15 * * * *"
def test_visual_to_cron_daily(self):
"""Test converting daily visual config to cron."""
visual_config = VisualConfig(time="2:30 PM")
result = ScheduleService.visual_to_cron("daily", visual_config)
assert result == "30 14 * * *"
def test_visual_to_cron_weekly(self):
"""Test converting weekly visual config to cron."""
visual_config = VisualConfig(
time="10:00 AM",
weekdays=["mon", "wed", "fri"],
)
result = ScheduleService.visual_to_cron("weekly", visual_config)
assert result == "0 10 * * 1,3,5"
def test_visual_to_cron_monthly_with_specific_days(self):
"""Test converting monthly visual config with specific days."""
visual_config = VisualConfig(
time="11:30 AM",
monthly_days=[1, 15],
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "30 11 1,15 * *"
def test_visual_to_cron_monthly_with_last_day(self):
"""Test converting monthly visual config with last day using 'L' syntax."""
visual_config = VisualConfig(
time="11:30 AM",
monthly_days=[1, "last"],
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "30 11 1,L * *"
def test_visual_to_cron_monthly_only_last_day(self):
"""Test converting monthly visual config with only last day."""
visual_config = VisualConfig(
time="9:00 PM",
monthly_days=["last"],
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "0 21 L * *"
def test_visual_to_cron_monthly_with_end_days_and_last(self):
"""Test converting monthly visual config with days 29, 30, 31 and 'last'."""
visual_config = VisualConfig(
time="3:45 PM",
monthly_days=[29, 30, 31, "last"],
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
# Should have 29,30,31,L - the L handles all possible last days
assert result == "45 15 29,30,31,L * *"
def test_visual_to_cron_invalid_frequency(self):
"""Test converting with invalid frequency."""
with pytest.raises(ScheduleConfigError, match="Unsupported frequency: invalid"):
ScheduleService.visual_to_cron("invalid", VisualConfig())
def test_visual_to_cron_weekly_no_weekdays(self):
"""Test converting weekly with no weekdays specified."""
visual_config = VisualConfig(time="10:00 AM")
with pytest.raises(ScheduleConfigError, match="Weekdays are required for weekly schedules"):
ScheduleService.visual_to_cron("weekly", visual_config)
def test_visual_to_cron_hourly_no_minute(self):
"""Test converting hourly with no on_minute specified."""
visual_config = VisualConfig() # on_minute defaults to 0
result = ScheduleService.visual_to_cron("hourly", visual_config)
assert result == "0 * * * *" # Should use default value 0
def test_visual_to_cron_daily_no_time(self):
"""Test converting daily with no time specified."""
visual_config = VisualConfig(time=None)
with pytest.raises(ScheduleConfigError, match="time is required for daily schedules"):
ScheduleService.visual_to_cron("daily", visual_config)
def test_visual_to_cron_weekly_no_time(self):
"""Test converting weekly with no time specified."""
visual_config = VisualConfig(weekdays=["mon"])
visual_config.time = None # Override default
with pytest.raises(ScheduleConfigError, match="time is required for weekly schedules"):
ScheduleService.visual_to_cron("weekly", visual_config)
def test_visual_to_cron_monthly_no_time(self):
"""Test converting monthly with no time specified."""
visual_config = VisualConfig(monthly_days=[1])
visual_config.time = None # Override default
with pytest.raises(ScheduleConfigError, match="time is required for monthly schedules"):
ScheduleService.visual_to_cron("monthly", visual_config)
def test_visual_to_cron_monthly_duplicate_days(self):
"""Test monthly with duplicate days should be deduplicated."""
visual_config = VisualConfig(
time="10:00 AM",
monthly_days=[1, 15, 1, 15, 31], # Duplicates
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "0 10 1,15,31 * *" # Should be deduplicated
def test_visual_to_cron_monthly_unsorted_days(self):
"""Test monthly with unsorted days should be sorted."""
visual_config = VisualConfig(
time="2:30 PM",
monthly_days=[20, 5, 15, 1, 10], # Unsorted
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "30 14 1,5,10,15,20 * *" # Should be sorted
def test_visual_to_cron_weekly_all_weekdays(self):
"""Test weekly with all weekdays."""
visual_config = VisualConfig(
time="8:00 AM",
weekdays=["sun", "mon", "tue", "wed", "thu", "fri", "sat"],
)
result = ScheduleService.visual_to_cron("weekly", visual_config)
assert result == "0 8 * * 0,1,2,3,4,5,6"
def test_visual_to_cron_hourly_boundary_values(self):
"""Test hourly with boundary minute values."""
# Minimum value
visual_config = VisualConfig(on_minute=0)
result = ScheduleService.visual_to_cron("hourly", visual_config)
assert result == "0 * * * *"
# Maximum value
visual_config = VisualConfig(on_minute=59)
result = ScheduleService.visual_to_cron("hourly", visual_config)
assert result == "59 * * * *"
def test_visual_to_cron_daily_midnight_noon(self):
"""Test daily at special times (midnight and noon)."""
# Midnight
visual_config = VisualConfig(time="12:00 AM")
result = ScheduleService.visual_to_cron("daily", visual_config)
assert result == "0 0 * * *"
# Noon
visual_config = VisualConfig(time="12:00 PM")
result = ScheduleService.visual_to_cron("daily", visual_config)
assert result == "0 12 * * *"
def test_visual_to_cron_monthly_mixed_with_last_and_duplicates(self):
"""Test monthly with mixed days, 'last', and duplicates."""
visual_config = VisualConfig(
time="11:45 PM",
monthly_days=[15, 1, "last", 15, 30, 1, "last"], # Mixed with duplicates
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
assert result == "45 23 1,15,30,L * *" # Deduplicated and sorted with L at end
def test_visual_to_cron_weekly_single_day(self):
"""Test weekly with single weekday."""
visual_config = VisualConfig(
time="6:30 PM",
weekdays=["sun"],
)
result = ScheduleService.visual_to_cron("weekly", visual_config)
assert result == "30 18 * * 0"
def test_visual_to_cron_monthly_all_possible_days(self):
"""Test monthly with all 31 days plus 'last'."""
all_days = list(range(1, 32)) + ["last"]
visual_config = VisualConfig(
time="12:01 AM",
monthly_days=all_days,
)
result = ScheduleService.visual_to_cron("monthly", visual_config)
expected_days = ",".join([str(i) for i in range(1, 32)]) + ",L"
assert result == f"1 0 {expected_days} * *"
def test_visual_to_cron_monthly_no_days(self):
"""Test monthly without any days specified should raise error."""
visual_config = VisualConfig(time="10:00 AM", monthly_days=[])
with pytest.raises(ScheduleConfigError, match="Monthly days are required for monthly schedules"):
ScheduleService.visual_to_cron("monthly", visual_config)
def test_visual_to_cron_weekly_empty_weekdays_list(self):
"""Test weekly with empty weekdays list should raise error."""
visual_config = VisualConfig(time="10:00 AM", weekdays=[])
with pytest.raises(ScheduleConfigError, match="Weekdays are required for weekly schedules"):
ScheduleService.visual_to_cron("weekly", visual_config)
class TestParseTime(unittest.TestCase):
"""Test cases for time parsing function."""
def test_parse_time_am(self):
"""Test parsing AM time."""
hour, minute = convert_12h_to_24h("9:30 AM")
assert hour == 9
assert minute == 30
def test_parse_time_pm(self):
"""Test parsing PM time."""
hour, minute = convert_12h_to_24h("2:45 PM")
assert hour == 14
assert minute == 45
def test_parse_time_noon(self):
"""Test parsing 12:00 PM (noon)."""
hour, minute = convert_12h_to_24h("12:00 PM")
assert hour == 12
assert minute == 0
def test_parse_time_midnight(self):
"""Test parsing 12:00 AM (midnight)."""
hour, minute = convert_12h_to_24h("12:00 AM")
assert hour == 0
assert minute == 0
def test_parse_time_invalid_format(self):
"""Test parsing invalid time format."""
with pytest.raises(ValueError, match="Invalid time format"):
convert_12h_to_24h("25:00")
def test_parse_time_invalid_hour(self):
"""Test parsing invalid hour."""
with pytest.raises(ValueError, match="Invalid hour: 13"):
convert_12h_to_24h("13:00 PM")
def test_parse_time_invalid_minute(self):
"""Test parsing invalid minute."""
with pytest.raises(ValueError, match="Invalid minute: 60"):
convert_12h_to_24h("10:60 AM")
def test_parse_time_empty_string(self):
"""Test parsing empty string."""
with pytest.raises(ValueError, match="Time string cannot be empty"):
convert_12h_to_24h("")
def test_parse_time_invalid_period(self):
"""Test parsing invalid period."""
with pytest.raises(ValueError, match="Invalid period"):
convert_12h_to_24h("10:30 XM")
class TestExtractScheduleConfig(unittest.TestCase):
"""Test cases for extracting schedule configuration from workflow."""
def test_extract_schedule_config_with_cron_mode(self):
"""Test extracting schedule config in cron mode."""
workflow = Mock(spec=Workflow)
workflow.graph_dict = {
"nodes": [
{
"id": "schedule-node",
"data": {
"type": "trigger-schedule",
"mode": "cron",
"cron_expression": "0 10 * * *",
"timezone": "America/New_York",
},
}
]
}
config = ScheduleService.extract_schedule_config(workflow)
assert config is not None
assert config.node_id == "schedule-node"
assert config.cron_expression == "0 10 * * *"
assert config.timezone == "America/New_York"
def test_extract_schedule_config_with_visual_mode(self):
"""Test extracting schedule config in visual mode."""
workflow = Mock(spec=Workflow)
workflow.graph_dict = {
"nodes": [
{
"id": "schedule-node",
"data": {
"type": "trigger-schedule",
"mode": "visual",
"frequency": "daily",
"visual_config": {"time": "10:30 AM"},
"timezone": "UTC",
},
}
]
}
config = ScheduleService.extract_schedule_config(workflow)
assert config is not None
assert config.node_id == "schedule-node"
assert config.cron_expression == "30 10 * * *"
assert config.timezone == "UTC"
def test_extract_schedule_config_no_schedule_node(self):
"""Test extracting config when no schedule node exists."""
workflow = Mock(spec=Workflow)
workflow.graph_dict = {
"nodes": [
{
"id": "other-node",
"data": {"type": "llm"},
}
]
}
config = ScheduleService.extract_schedule_config(workflow)
assert config is None
def test_extract_schedule_config_invalid_graph(self):
"""Test extracting config with invalid graph data."""
workflow = Mock(spec=Workflow)
workflow.graph_dict = None
with pytest.raises(ScheduleConfigError, match="Workflow graph is empty"):
ScheduleService.extract_schedule_config(workflow)
class TestScheduleWithTimezone(unittest.TestCase):
"""Test cases for schedule with timezone handling."""
def test_visual_schedule_with_timezone_integration(self):
"""Test complete flow: visual config → cron → execution in different timezones.
This test verifies that when a user in Shanghai sets a schedule for 10:30 AM,
it runs at 10:30 AM Shanghai time, not 10:30 AM UTC.
"""
# User in Shanghai wants to run a task at 10:30 AM local time
visual_config = VisualConfig(
time="10:30 AM", # This is Shanghai time
monthly_days=[1],
)
# Convert to cron expression
cron_expr = ScheduleService.visual_to_cron("monthly", visual_config)
assert cron_expr is not None
assert cron_expr == "30 10 1 * *" # Direct conversion
# Now test execution with Shanghai timezone
shanghai_tz = "Asia/Shanghai"
# Base time: 2025-01-01 00:00:00 UTC (08:00:00 Shanghai)
base_time = datetime(2025, 1, 1, 0, 0, 0, tzinfo=UTC)
next_run = calculate_next_run_at(cron_expr, shanghai_tz, base_time)
assert next_run is not None
# Should run at 10:30 AM Shanghai time on Jan 1
# 10:30 AM Shanghai = 02:30 AM UTC (Shanghai is UTC+8)
assert next_run.year == 2025
assert next_run.month == 1
assert next_run.day == 1
assert next_run.hour == 2 # 02:30 UTC
assert next_run.minute == 30
def test_visual_schedule_different_timezones_same_local_time(self):
"""Test that same visual config in different timezones runs at different UTC times.
This verifies that a schedule set for "9:00 AM" runs at 9 AM local time
regardless of the timezone.
"""
visual_config = VisualConfig(
time="9:00 AM",
weekdays=["mon"],
)
cron_expr = ScheduleService.visual_to_cron("weekly", visual_config)
assert cron_expr is not None
assert cron_expr == "0 9 * * 1"
# Base time: Sunday 2025-01-05 12:00:00 UTC
base_time = datetime(2025, 1, 5, 12, 0, 0, tzinfo=UTC)
# Test New York (UTC-5 in January)
ny_next = calculate_next_run_at(cron_expr, "America/New_York", base_time)
assert ny_next is not None
# Monday 9 AM EST = Monday 14:00 UTC
assert ny_next.day == 6
assert ny_next.hour == 14 # 9 AM EST = 2 PM UTC
# Test Tokyo (UTC+9)
tokyo_next = calculate_next_run_at(cron_expr, "Asia/Tokyo", base_time)
assert tokyo_next is not None
# Monday 9 AM JST = Monday 00:00 UTC
assert tokyo_next.day == 6
assert tokyo_next.hour == 0 # 9 AM JST = 0 AM UTC
def test_visual_schedule_daily_across_dst_change(self):
"""Test that daily schedules adjust correctly during DST changes.
A schedule set for "10:00 AM" should always run at 10 AM local time,
even when DST changes.
"""
visual_config = VisualConfig(
time="10:00 AM",
)
cron_expr = ScheduleService.visual_to_cron("daily", visual_config)
assert cron_expr is not None
assert cron_expr == "0 10 * * *"
# Test before DST (EST - UTC-5)
winter_base = datetime(2025, 2, 1, 0, 0, 0, tzinfo=UTC)
winter_next = calculate_next_run_at(cron_expr, "America/New_York", winter_base)
assert winter_next is not None
# 10 AM EST = 15:00 UTC
assert winter_next.hour == 15
# Test during DST (EDT - UTC-4)
summer_base = datetime(2025, 6, 1, 0, 0, 0, tzinfo=UTC)
summer_next = calculate_next_run_at(cron_expr, "America/New_York", summer_base)
assert summer_next is not None
# 10 AM EDT = 14:00 UTC
assert summer_next.hour == 14
class TestSyncScheduleFromWorkflow(unittest.TestCase):
"""Test cases for syncing schedule from workflow."""
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.db")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.select")
def test_sync_schedule_create_new(self, mock_select, mock_service, mock_db):
"""Test creating new schedule when none exists."""
mock_session = MagicMock()
mock_db.engine = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
Session = MagicMock(return_value=mock_session)
with patch("events.event_handlers.sync_workflow_schedule_when_app_published.Session", Session):
mock_session.scalar.return_value = None # No existing plan
# Mock extract_schedule_config to return a ScheduleConfig object
mock_config = Mock(spec=ScheduleConfig)
mock_config.node_id = "start"
mock_config.cron_expression = "30 10 * * *"
mock_config.timezone = "UTC"
mock_service.extract_schedule_config.return_value = mock_config
mock_new_plan = Mock(spec=WorkflowSchedulePlan)
mock_service.create_schedule.return_value = mock_new_plan
workflow = Mock(spec=Workflow)
result = sync_schedule_from_workflow("tenant-id", "app-id", workflow)
assert result == mock_new_plan
mock_service.create_schedule.assert_called_once()
mock_session.commit.assert_called_once()
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.db")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.select")
def test_sync_schedule_update_existing(self, mock_select, mock_service, mock_db):
"""Test updating existing schedule."""
mock_session = MagicMock()
mock_db.engine = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
Session = MagicMock(return_value=mock_session)
with patch("events.event_handlers.sync_workflow_schedule_when_app_published.Session", Session):
mock_existing_plan = Mock(spec=WorkflowSchedulePlan)
mock_existing_plan.id = "existing-plan-id"
mock_session.scalar.return_value = mock_existing_plan
# Mock extract_schedule_config to return a ScheduleConfig object
mock_config = Mock(spec=ScheduleConfig)
mock_config.node_id = "start"
mock_config.cron_expression = "0 12 * * *"
mock_config.timezone = "America/New_York"
mock_service.extract_schedule_config.return_value = mock_config
mock_updated_plan = Mock(spec=WorkflowSchedulePlan)
mock_service.update_schedule.return_value = mock_updated_plan
workflow = Mock(spec=Workflow)
result = sync_schedule_from_workflow("tenant-id", "app-id", workflow)
assert result == mock_updated_plan
mock_service.update_schedule.assert_called_once()
# Verify the arguments passed to update_schedule
call_args = mock_service.update_schedule.call_args
assert call_args.kwargs["session"] == mock_session
assert call_args.kwargs["schedule_id"] == "existing-plan-id"
updates_obj = call_args.kwargs["updates"]
assert isinstance(updates_obj, SchedulePlanUpdate)
assert updates_obj.node_id == "start"
assert updates_obj.cron_expression == "0 12 * * *"
assert updates_obj.timezone == "America/New_York"
mock_session.commit.assert_called_once()
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.db")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.ScheduleService")
@patch("events.event_handlers.sync_workflow_schedule_when_app_published.select")
def test_sync_schedule_remove_when_no_config(self, mock_select, mock_service, mock_db):
"""Test removing schedule when no schedule config in workflow."""
mock_session = MagicMock()
mock_db.engine = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
Session = MagicMock(return_value=mock_session)
with patch("events.event_handlers.sync_workflow_schedule_when_app_published.Session", Session):
mock_existing_plan = Mock(spec=WorkflowSchedulePlan)
mock_existing_plan.id = "existing-plan-id"
mock_session.scalar.return_value = mock_existing_plan
mock_service.extract_schedule_config.return_value = None # No schedule config
workflow = Mock(spec=Workflow)
result = sync_schedule_from_workflow("tenant-id", "app-id", workflow)
assert result is None
# Now using ScheduleService.delete_schedule instead of session.delete
mock_service.delete_schedule.assert_called_once_with(session=mock_session, schedule_id="existing-plan-id")
mock_session.commit.assert_called_once()
@pytest.fixture
def session_mock() -> MagicMock:
return MagicMock(spec=Session)
def _workflow(**kwargs: Any) -> Workflow:
return cast(Workflow, SimpleNamespace(**kwargs))
def test_update_schedule_should_update_only_node_id_without_recomputing_time(
session_mock: MagicMock,
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Arrange
schedule = MagicMock(spec=WorkflowSchedulePlan)
schedule.cron_expression = "0 10 * * *"
schedule.timezone = "UTC"
session_mock.get.return_value = schedule
next_run_mock = MagicMock(return_value=datetime(2026, 1, 1, 10, 0, tzinfo=UTC))
monkeypatch.setattr(service_module, "calculate_next_run_at", next_run_mock)
# Act
result = ScheduleService.update_schedule(
session=session_mock,
schedule_id="schedule-1",
updates=SchedulePlanUpdate(node_id="node-new"),
)
# Assert
assert result is schedule
assert schedule.node_id == "node-new"
next_run_mock.assert_not_called()
session_mock.flush.assert_called_once()
def test_get_tenant_owner_should_raise_when_account_record_missing(session_mock: MagicMock) -> None:
# Arrange
join = SimpleNamespace(account_id="account-404")
session_mock.execute.return_value.scalar_one_or_none.return_value = join
session_mock.get.return_value = None
# Act / Assert
with pytest.raises(AccountNotFoundError, match="Account not found: account-404"):
ScheduleService.get_tenant_owner(session=session_mock, tenant_id="tenant-1")
def test_get_tenant_owner_should_raise_when_no_owner_or_admin_found(session_mock: MagicMock) -> None:
# Arrange
session_mock.execute.return_value.scalar_one_or_none.side_effect = [None, None]
# Act / Assert
with pytest.raises(AccountNotFoundError, match="Account not found for tenant: tenant-1"):
ScheduleService.get_tenant_owner(session=session_mock, tenant_id="tenant-1")
def test_update_next_run_at_should_raise_when_schedule_not_found(session_mock: MagicMock) -> None:
# Arrange
session_mock.get.return_value = None
# Act / Assert
with pytest.raises(ScheduleNotFoundError, match="Schedule not found: schedule-1"):
ScheduleService.update_next_run_at(session=session_mock, schedule_id="schedule-1")
def test_to_schedule_config_should_build_from_cron_mode() -> None:
# Arrange
node_config: dict[str, Any] = {
"id": "node-1",
"data": {
"mode": "cron",
"cron_expression": "0 12 * * *",
"timezone": "Asia/Kolkata",
},
}
# Act
result = ScheduleService.to_schedule_config(node_config=node_config)
# Assert
assert result.node_id == "node-1"
assert result.cron_expression == "0 12 * * *"
assert result.timezone == "Asia/Kolkata"
def test_to_schedule_config_should_raise_for_cron_mode_without_expression() -> None:
# Arrange
node_config = {"id": "node-1", "data": {"mode": "cron", "cron_expression": ""}}
# Act / Assert
with pytest.raises(ScheduleConfigError, match="Cron expression is required for cron mode"):
ScheduleService.to_schedule_config(node_config=node_config)
def test_to_schedule_config_should_build_from_visual_mode(monkeypatch: pytest.MonkeyPatch) -> None:
# Arrange
node_config = {
"id": "node-1",
"data": {
"mode": "visual",
"frequency": "daily",
"visual_config": {"time": "9:30 AM"},
"timezone": "UTC",
},
}
monkeypatch.setattr(ScheduleService, "visual_to_cron", MagicMock(return_value="30 9 * * *"))
# Act
result = ScheduleService.to_schedule_config(node_config=node_config)
# Assert
assert result.cron_expression == "30 9 * * *"
def test_to_schedule_config_should_raise_for_invalid_mode() -> None:
# Arrange
node_config = {"id": "node-1", "data": {"mode": "manual"}}
# Act / Assert
with pytest.raises(ScheduleConfigError, match="Invalid schedule mode: manual"):
ScheduleService.to_schedule_config(node_config=node_config)
def test_extract_schedule_config_should_raise_when_graph_is_empty() -> None:
# Arrange
workflow = _workflow(graph_dict={})
# Act / Assert
with pytest.raises(ScheduleConfigError, match="Workflow graph is empty"):
ScheduleService.extract_schedule_config(workflow=workflow)
def test_extract_schedule_config_should_raise_when_mode_invalid() -> None:
# Arrange
workflow = _workflow(
graph_dict={
"nodes": [
{
"id": "schedule-1",
"data": {
"type": TRIGGER_SCHEDULE_NODE_TYPE,
"mode": "invalid",
},
}
]
}
)
# Act / Assert
with pytest.raises(ScheduleConfigError, match="Invalid schedule mode: invalid"):
ScheduleService.extract_schedule_config(workflow=workflow)
if __name__ == "__main__":
unittest.main()