diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index a884a1c7f9b..48cb270313a 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -33,6 +33,7 @@ from core.moderation.base import ModerationError from core.moderation.input_moderation import InputModeration from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository from core.workflow.node_factory import get_default_root_node_id +from core.workflow.runtime_state import create_graph_runtime_state from core.workflow.system_variables import ( build_bootstrap_variables, build_system_variables, @@ -188,7 +189,11 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=new_inputs) # init graph - graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time()) + graph_runtime_state = create_graph_runtime_state( + variable_pool=variable_pool, + start_at=time.time(), + workflow_id=self._workflow.id, + ) graph = self._init_graph( graph_config=self._workflow.graph_dict, graph_runtime_state=graph_runtime_state, diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index b4d2310da85..34cb3025cfe 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -23,6 +23,7 @@ from core.app.entities.app_invoke_entities import ( from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id +from core.workflow.runtime_state import create_graph_runtime_state from core.workflow.system_variables import build_bootstrap_variables, build_system_variables from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool from core.workflow.workflow_entry import WorkflowEntry @@ -158,7 +159,11 @@ class PipelineRunner(WorkflowBasedAppRunner): workflow.graph_dict ) add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs) - graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) + graph_runtime_state = create_graph_runtime_state( + variable_pool=variable_pool, + start_at=time.perf_counter(), + workflow_id=workflow.id, + ) # init graph graph = self._init_rag_pipeline_graph( diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 2cb8088971a..edd07dae509 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -16,6 +16,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository from core.workflow.node_factory import get_default_root_node_id +from core.workflow.runtime_state import create_graph_runtime_state from core.workflow.system_variables import build_bootstrap_variables, build_system_variables from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool from core.workflow.workflow_entry import WorkflowEntry @@ -118,7 +119,11 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): root_node_id = self._root_node_id or get_default_root_node_id(self._workflow.graph_dict) add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=inputs) - graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) + graph_runtime_state = create_graph_runtime_state( + variable_pool=variable_pool, + start_at=time.perf_counter(), + workflow_id=self._workflow.id, + ) graph = self._init_graph( graph_config=self._workflow.graph_dict, graph_runtime_state=graph_runtime_state, diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index f68c8e60b4f..b969a1fd749 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -68,6 +68,7 @@ from core.app.entities.queue_entities import ( ) from core.rag.entities.citation_metadata import RetrievalSourceMetadata from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id, resolve_workflow_node_class +from core.workflow.runtime_state import create_graph_runtime_state from core.workflow.system_variables import ( build_bootstrap_variables, default_system_variables, @@ -191,7 +192,11 @@ class WorkflowBasedAppRunner: environment_variables=workflow.environment_variables, ), ) - graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.time()) + graph_runtime_state = create_graph_runtime_state( + variable_pool=variable_pool, + start_at=time.time(), + workflow_id=workflow.id, + ) # Determine which type of single node execution and get graph/variable_pool if single_iteration_run: diff --git a/api/core/workflow/runtime_state.py b/api/core/workflow/runtime_state.py new file mode 100644 index 00000000000..fa984d672a8 --- /dev/null +++ b/api/core/workflow/runtime_state.py @@ -0,0 +1,123 @@ +"""Helpers for explicitly wiring GraphRuntimeState collaborators. + +GraphOn currently supports lazy construction of several runtime-state +collaborators such as the ready queue, graph execution aggregate, and response +coordinator. Dify initializes those collaborators eagerly so repository code +does not depend on that implicit behavior. +""" + +from __future__ import annotations + +from contextlib import AbstractContextManager + +from graphon.graph import Graph +from graphon.graph_engine.domain.graph_execution import GraphExecution +from graphon.graph_engine.ready_queue import InMemoryReadyQueue +from graphon.graph_engine.response_coordinator import ResponseStreamCoordinator +from graphon.model_runtime.entities.llm_entities import LLMUsage +from graphon.runtime import GraphRuntimeState, VariablePool + + +def _require_workflow_id(workflow_id: str) -> str: + """Validate that workflow-scoped runtime collaborators receive a real id.""" + + if not workflow_id: + raise ValueError("workflow_id must be a non-empty string") + return workflow_id + + +def create_graph_runtime_state( + *, + variable_pool: VariablePool, + start_at: float, + workflow_id: str, + total_tokens: int = 0, + llm_usage: LLMUsage | None = None, + outputs: dict[str, object] | None = None, + node_run_steps: int = 0, + execution_context: AbstractContextManager[object] | None = None, +) -> GraphRuntimeState: + """Create a runtime state with explicit non-graph collaborators. + + The graph itself is attached later, once node construction has completed and + the final Graph instance exists. + """ + workflow_id = _require_workflow_id(workflow_id) + + return GraphRuntimeState( + variable_pool=variable_pool, + start_at=start_at, + total_tokens=total_tokens, + llm_usage=llm_usage or LLMUsage.empty_usage(), + outputs=outputs or {}, + node_run_steps=node_run_steps, + ready_queue=InMemoryReadyQueue(), + graph_execution=GraphExecution(workflow_id=workflow_id), + execution_context=execution_context, + ) + + +def ensure_graph_runtime_state_initialized( + graph_runtime_state: GraphRuntimeState, + *, + workflow_id: str, +) -> GraphRuntimeState: + """Materialize non-graph collaborators when loading legacy or sparse state.""" + workflow_id = _require_workflow_id(workflow_id) + + if graph_runtime_state._ready_queue is None: + graph_runtime_state._ready_queue = InMemoryReadyQueue() + + graph_execution = graph_runtime_state._graph_execution + if graph_execution is None: + graph_runtime_state._graph_execution = GraphExecution( + workflow_id=workflow_id, + ) + elif not graph_execution.workflow_id: + graph_execution.workflow_id = workflow_id + elif graph_execution.workflow_id != workflow_id: + raise ValueError("GraphRuntimeState workflow_id does not match graph execution workflow_id") + + return graph_runtime_state + + +def bind_graph_runtime_state_to_graph( + graph_runtime_state: GraphRuntimeState, + graph: Graph, + *, + workflow_id: str, +) -> GraphRuntimeState: + """Attach graph-scoped collaborators without relying on GraphOn lazy setup.""" + + ensure_graph_runtime_state_initialized( + graph_runtime_state, + workflow_id=workflow_id, + ) + + attached_graph = graph_runtime_state._graph + if attached_graph is not None and attached_graph is not graph: + raise ValueError("GraphRuntimeState already attached to a different graph instance") + + if graph_runtime_state._response_coordinator is None: + response_coordinator = ResponseStreamCoordinator( + variable_pool=graph_runtime_state.variable_pool, + graph=graph, + ) + graph_runtime_state._response_coordinator = response_coordinator + + graph_runtime_state.attach_graph(graph) + return graph_runtime_state + + +def snapshot_graph_runtime_state( + graph_runtime_state: GraphRuntimeState, + *, + workflow_id: str, +) -> str: + """Serialize runtime state after explicit collaborator initialization.""" + + ensure_graph_runtime_state_initialized( + graph_runtime_state, + workflow_id=workflow_id, + ) + return graph_runtime_state.dumps() diff --git a/api/core/workflow/workflow_entry.py b/api/core/workflow/workflow_entry.py index 2346a95d6a8..e5e154b72fc 100644 --- a/api/core/workflow/workflow_entry.py +++ b/api/core/workflow/workflow_entry.py @@ -25,6 +25,10 @@ from core.app.file_access import DatabaseFileAccessController from core.app.workflow.layers.llm_quota import LLMQuotaLayer from core.app.workflow.layers.observability import ObservabilityLayer from core.workflow.node_factory import DifyNodeFactory, is_start_node_type, resolve_workflow_node_class +from core.workflow.runtime_state import ( + bind_graph_runtime_state_to_graph, + create_graph_runtime_state, +) from core.workflow.system_variables import ( default_system_variables, get_node_creation_preload_selectors, @@ -72,9 +76,10 @@ class _WorkflowChildEngineBuilder: variable_pool: VariablePool | None = None, ) -> GraphEngine: """Build a child engine with a fresh runtime state and only child-safe layers.""" - child_graph_runtime_state = GraphRuntimeState( + child_graph_runtime_state = create_graph_runtime_state( variable_pool=variable_pool if variable_pool is not None else parent_graph_runtime_state.variable_pool, start_at=time.perf_counter(), + workflow_id=workflow_id, execution_context=parent_graph_runtime_state.execution_context, ) node_factory = DifyNodeFactory( @@ -92,6 +97,11 @@ class _WorkflowChildEngineBuilder: node_factory=node_factory, root_node_id=root_node_id, ) + bind_graph_runtime_state_to_graph( + child_graph_runtime_state, + child_graph, + workflow_id=workflow_id, + ) command_channel = InMemoryChannel() config = GraphEngineConfig() @@ -152,6 +162,11 @@ class WorkflowEntry: self.command_channel = command_channel execution_context = capture_current_context() graph_runtime_state.execution_context = execution_context + bind_graph_runtime_state_to_graph( + graph_runtime_state, + graph, + workflow_id=workflow_id, + ) self._child_engine_builder = _WorkflowChildEngineBuilder() self.graph_engine = GraphEngine( workflow_id=workflow_id, @@ -244,9 +259,10 @@ class WorkflowEntry: ), call_depth=0, ) - graph_runtime_state = GraphRuntimeState( + graph_runtime_state = create_graph_runtime_state( variable_pool=variable_pool, start_at=time.perf_counter(), + workflow_id=workflow.id, execution_context=capture_current_context(), ) @@ -402,7 +418,7 @@ class WorkflowEntry: ), call_depth=0, ) - graph_runtime_state = GraphRuntimeState( + graph_runtime_state = create_graph_runtime_state( variable_pool=variable_pool, start_at=time.perf_counter(), execution_context=capture_current_context(), diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 8f365c7c51f..270f9dd2c4a 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -25,7 +25,7 @@ from graphon.nodes.human_input.entities import HumanInputNodeData, validate_huma from graphon.nodes.human_input.enums import HumanInputFormKind from graphon.nodes.human_input.human_input_node import HumanInputNode from graphon.nodes.start.entities import StartNodeData -from graphon.runtime import GraphRuntimeState, VariablePool +from graphon.runtime import VariablePool from graphon.variable_loader import load_into_variable_pool from graphon.variables import VariableBase from graphon.variables.input_entities import VariableEntityType @@ -49,6 +49,7 @@ from core.workflow.human_input_compat import ( ) from core.workflow.node_factory import LATEST_VERSION, get_node_type_classes_mapping, is_start_node_type from core.workflow.node_runtime import DifyHumanInputNodeRuntime, apply_dify_debug_email_recipient +from core.workflow.runtime_state import create_graph_runtime_state from core.workflow.system_variables import build_bootstrap_variables, build_system_variables, default_system_variables from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool from core.workflow.workflow_entry import WorkflowEntry @@ -1146,9 +1147,10 @@ class WorkflowService: ), call_depth=0, ) - graph_runtime_state = GraphRuntimeState( + graph_runtime_state = create_graph_runtime_state( variable_pool=variable_pool, start_at=time.perf_counter(), + workflow_id=workflow.id, ) node = HumanInputNode( id=node_config["id"], diff --git a/api/tests/test_containers_integration_tests/core/app/layers/test_pause_state_persist_layer.py b/api/tests/test_containers_integration_tests/core/app/layers/test_pause_state_persist_layer.py index 2b4c1b59abf..ebf2b1befdb 100644 --- a/api/tests/test_containers_integration_tests/core/app/layers/test_pause_state_persist_layer.py +++ b/api/tests/test_containers_integration_tests/core/app/layers/test_pause_state_persist_layer.py @@ -28,7 +28,7 @@ from graphon.graph_engine.entities.commands import GraphEngineCommand from graphon.graph_engine.layers.base import GraphEngineLayerNotInitializedError from graphon.graph_events import GraphRunPausedEvent from graphon.model_runtime.entities.llm_entities import LLMUsage -from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool +from graphon.runtime import ReadOnlyGraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool from sqlalchemy import Engine, delete, select from sqlalchemy.orm import Session @@ -38,6 +38,7 @@ from core.app.layers.pause_state_persist_layer import ( PauseStatePersistenceLayer, WorkflowResumptionContext, ) +from core.workflow.runtime_state import create_graph_runtime_state from core.workflow.system_variables import build_system_variables from extensions.ext_storage import storage from libs.datetime_utils import naive_utc_now @@ -203,11 +204,13 @@ class TestPauseStatePersistenceLayerTestContainers: node_run_steps: int = 0, variables: dict[tuple[str, str], object] | None = None, workflow_run_id: str | None = None, + workflow_id: str | None = None, ) -> ReadOnlyGraphRuntimeState: """Create a real GraphRuntimeState for testing.""" start_at = time() execution_id = workflow_run_id or getattr(self, "test_workflow_run_id", None) or str(uuid.uuid4()) + resolved_workflow_id = workflow_id or getattr(self, "test_workflow_id", None) or str(uuid.uuid4()) # Create variable pool variable_pool = VariablePool(system_variables=build_system_variables(workflow_execution_id=execution_id)) @@ -219,9 +222,10 @@ class TestPauseStatePersistenceLayerTestContainers: llm_usage = LLMUsage.empty_usage() # Create graph runtime state - graph_runtime_state = GraphRuntimeState( + graph_runtime_state = create_graph_runtime_state( variable_pool=variable_pool, start_at=start_at, + workflow_id=resolved_workflow_id, total_tokens=total_tokens, llm_usage=llm_usage, outputs=outputs or {}, diff --git a/api/tests/test_containers_integration_tests/core/workflow/test_human_input_resume_node_execution.py b/api/tests/test_containers_integration_tests/core/workflow/test_human_input_resume_node_execution.py index 0a9b476afc5..1f9701f5a46 100644 --- a/api/tests/test_containers_integration_tests/core/workflow/test_human_input_resume_node_execution.py +++ b/api/tests/test_containers_integration_tests/core/workflow/test_human_input_resume_node_execution.py @@ -26,6 +26,11 @@ from core.repositories.human_input_repository import HumanInputFormEntity, Human from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository from core.workflow.node_runtime import DifyHumanInputNodeRuntime +from core.workflow.runtime_state import ( + bind_graph_runtime_state_to_graph, + create_graph_runtime_state, + snapshot_graph_runtime_state, +) from core.workflow.system_variables import build_system_variables from libs.datetime_utils import naive_utc_now from models import Account @@ -76,7 +81,11 @@ def _build_runtime_state(workflow_execution_id: str, app_id: str, workflow_id: s user_inputs={}, conversation_variables=[], ) - return GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) + return create_graph_runtime_state( + variable_pool=variable_pool, + start_at=time.perf_counter(), + workflow_id=workflow_id, + ) def _build_graph( @@ -285,6 +294,11 @@ class TestHumanInputResumeNodeExecutionIntegration: ) def _run_graph(self, graph: Graph, runtime_state: GraphRuntimeState, execution_id: str) -> None: + bind_graph_runtime_state_to_graph( + runtime_state, + graph, + workflow_id=self.workflow.id, + ) engine = GraphEngine( workflow_id=self.workflow.id, graph=graph, @@ -314,7 +328,10 @@ class TestHumanInputResumeNodeExecutionIntegration: ) self._run_graph(paused_graph, runtime_state, execution_id) - snapshot = runtime_state.dumps() + snapshot = snapshot_graph_runtime_state( + runtime_state, + workflow_id=self.workflow.id, + ) resumed_state = GraphRuntimeState.from_snapshot(snapshot) resume_repo = _mock_form_repository_with_submission(action_id="continue") resumed_graph = _build_graph( diff --git a/api/tests/test_containers_integration_tests/tasks/test_mail_human_input_delivery_task.py b/api/tests/test_containers_integration_tests/tasks/test_mail_human_input_delivery_task.py index a16f3ff773b..8c631cb8d5e 100644 --- a/api/tests/test_containers_integration_tests/tasks/test_mail_human_input_delivery_task.py +++ b/api/tests/test_containers_integration_tests/tasks/test_mail_human_input_delivery_task.py @@ -5,7 +5,7 @@ from unittest.mock import patch import pytest from graphon.enums import WorkflowExecutionStatus from graphon.nodes.human_input.entities import HumanInputNodeData -from graphon.runtime import GraphRuntimeState, VariablePool +from graphon.runtime import VariablePool from configs import dify_config from core.app.app_config.entities import WorkflowUIBasedAppConfig @@ -19,6 +19,7 @@ from core.workflow.human_input_compat import ( ExternalRecipient, MemberRecipient, ) +from core.workflow.runtime_state import create_graph_runtime_state, snapshot_graph_runtime_state from extensions.ext_storage import storage from models.account import Account, AccountStatus, Tenant, TenantAccountJoin, TenantAccountRole from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom @@ -136,7 +137,11 @@ def _create_workflow_pause_state( ) db_session_with_containers.add(workflow_run) - runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0) + runtime_state = create_graph_runtime_state( + variable_pool=variable_pool, + start_at=0.0, + workflow_id=workflow_id, + ) resumption_context = WorkflowResumptionContext( generate_entity={ "type": AppMode.WORKFLOW, @@ -156,7 +161,10 @@ def _create_workflow_pause_state( workflow_execution_id=workflow_run_id, ), }, - serialized_graph_runtime_state=runtime_state.dumps(), + serialized_graph_runtime_state=snapshot_graph_runtime_state( + runtime_state, + workflow_id=workflow_id, + ), ) state_object_key = f"workflow_pause_states/{workflow_run_id}.json" diff --git a/api/tests/unit_tests/core/app/apps/test_pause_resume.py b/api/tests/unit_tests/core/app/apps/test_pause_resume.py index a126bc85f75..d9e7fdd0006 100644 --- a/api/tests/unit_tests/core/app/apps/test_pause_resume.py +++ b/api/tests/unit_tests/core/app/apps/test_pause_resume.py @@ -30,6 +30,11 @@ from core.app.apps.advanced_chat import app_generator as adv_app_gen_module from core.app.apps.workflow import app_generator as wf_app_gen_module from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.node_factory import DifyNodeFactory +from core.workflow.runtime_state import ( + bind_graph_runtime_state_to_graph, + create_graph_runtime_state, + snapshot_graph_runtime_state, +) from core.workflow.system_variables import build_system_variables from tests.workflow_test_utils import build_test_graph_init_params @@ -168,12 +173,21 @@ def _build_runtime_state(run_id: str) -> GraphRuntimeState: conversation_variables=[], ) variable_pool.add(("sys", "workflow_run_id"), run_id) - return GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) + return create_graph_runtime_state( + variable_pool=variable_pool, + start_at=time.perf_counter(), + workflow_id="workflow", + ) def _run_with_optional_pause(runtime_state: GraphRuntimeState, *, pause_on: str | None) -> list[GraphEngineEvent]: command_channel = InMemoryChannel() graph = _build_graph(runtime_state, pause_on=pause_on) + bind_graph_runtime_state_to_graph( + runtime_state, + graph, + workflow_id="workflow", + ) engine = GraphEngine( workflow_id="workflow", graph=graph, @@ -204,7 +218,10 @@ def test_workflow_app_pause_resume_matches_baseline(mocker): paused_events = _run_with_optional_pause(paused_state, pause_on="tool_a") assert isinstance(paused_events[-1], GraphRunPausedEvent) paused_nodes = _node_successes(paused_events) - snapshot = paused_state.dumps() + snapshot = snapshot_graph_runtime_state( + paused_state, + workflow_id="workflow", + ) resumed_state = GraphRuntimeState.from_snapshot(snapshot) @@ -244,7 +261,10 @@ def test_advanced_chat_pause_resume_matches_baseline(mocker): paused_events = _run_with_optional_pause(paused_state, pause_on="tool_a") assert isinstance(paused_events[-1], GraphRunPausedEvent) paused_nodes = _node_successes(paused_events) - snapshot = paused_state.dumps() + snapshot = snapshot_graph_runtime_state( + paused_state, + workflow_id="workflow", + ) resumed_state = GraphRuntimeState.from_snapshot(snapshot) @@ -281,7 +301,12 @@ def test_resume_emits_resumption_start_reason(mocker) -> None: initial_start = next(event for event in paused_events if isinstance(event, GraphRunStartedEvent)) assert initial_start.reason == WorkflowStartReason.INITIAL - resumed_state = GraphRuntimeState.from_snapshot(paused_state.dumps()) + resumed_state = GraphRuntimeState.from_snapshot( + snapshot_graph_runtime_state( + paused_state, + workflow_id="workflow", + ) + ) resumed_events = _run_with_optional_pause(resumed_state, pause_on=None) resume_start = next(event for event in resumed_events if isinstance(event, GraphRunStartedEvent)) assert resume_start.reason == WorkflowStartReason.RESUMPTION diff --git a/api/tests/unit_tests/core/app/workflow/test_persistence_layer.py b/api/tests/unit_tests/core/app/workflow/test_persistence_layer.py index d8a68f6d000..a6f7127571a 100644 --- a/api/tests/unit_tests/core/app/workflow/test_persistence_layer.py +++ b/api/tests/unit_tests/core/app/workflow/test_persistence_layer.py @@ -27,10 +27,11 @@ from graphon.graph_events import ( NodeRunSucceededEvent, ) from graphon.node_events import NodeRunResult -from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool +from graphon.runtime import ReadOnlyGraphRuntimeStateWrapper, VariablePool from core.app.entities.app_invoke_entities import WorkflowAppGenerateEntity from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer +from core.workflow.runtime_state import create_graph_runtime_state from core.workflow.system_variables import SystemVariableKey, build_system_variables @@ -60,7 +61,11 @@ def _make_layer( workflow_execution_id="run-id", conversation_id="conv-id", ) - runtime_state = GraphRuntimeState(variable_pool=VariablePool(system_variables=system_variables), start_at=0.0) + runtime_state = create_graph_runtime_state( + variable_pool=VariablePool(system_variables=system_variables), + start_at=0.0, + workflow_id="workflow-id", + ) read_only_state = ReadOnlyGraphRuntimeStateWrapper(runtime_state) application_generate_entity = WorkflowAppGenerateEntity.model_construct( diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py index 8b7fbd1b303..440672ce9d5 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_mock_nodes.py @@ -622,7 +622,8 @@ class MockIterationNode(MockNodeMixin, IterationNode): from graphon.graph import Graph from graphon.graph_engine import GraphEngine, GraphEngineConfig from graphon.graph_engine.command_channels import InMemoryChannel - from graphon.runtime import GraphRuntimeState + + from core.workflow.runtime_state import bind_graph_runtime_state_to_graph, create_graph_runtime_state # Import our MockNodeFactory instead of DifyNodeFactory from .test_mock_factory import MockNodeFactory @@ -643,9 +644,10 @@ class MockIterationNode(MockNodeMixin, IterationNode): variable_pool_copy.add([self._node_id, "item"], item) # Create a new GraphRuntimeState for this iteration - graph_runtime_state_copy = GraphRuntimeState( + graph_runtime_state_copy = create_graph_runtime_state( variable_pool=variable_pool_copy, start_at=self.graph_runtime_state.start_at, + workflow_id=self.workflow_id, total_tokens=0, node_run_steps=0, ) @@ -666,6 +668,11 @@ class MockIterationNode(MockNodeMixin, IterationNode): from graphon.nodes.iteration.exc import IterationGraphNotFoundError raise IterationGraphNotFoundError("iteration graph not found") + bind_graph_runtime_state_to_graph( + graph_runtime_state_copy, + iteration_graph, + workflow_id=self.workflow_id, + ) # Create a new GraphEngine for this iteration graph_engine = GraphEngine( @@ -694,7 +701,8 @@ class MockLoopNode(MockNodeMixin, LoopNode): from graphon.graph import Graph from graphon.graph_engine import GraphEngine, GraphEngineConfig from graphon.graph_engine.command_channels import InMemoryChannel - from graphon.runtime import GraphRuntimeState + + from core.workflow.runtime_state import bind_graph_runtime_state_to_graph, create_graph_runtime_state # Import our MockNodeFactory instead of DifyNodeFactory from .test_mock_factory import MockNodeFactory @@ -708,9 +716,10 @@ class MockLoopNode(MockNodeMixin, LoopNode): ) # Create a new GraphRuntimeState for this iteration - graph_runtime_state_copy = GraphRuntimeState( + graph_runtime_state_copy = create_graph_runtime_state( variable_pool=self.graph_runtime_state.variable_pool, start_at=start_at.timestamp(), + workflow_id=self.workflow_id, ) # Create a MockNodeFactory with the same mock_config @@ -725,6 +734,11 @@ class MockLoopNode(MockNodeMixin, LoopNode): if not loop_graph: raise ValueError("loop graph not found") + bind_graph_runtime_state_to_graph( + graph_runtime_state_copy, + loop_graph, + workflow_id=self.workflow_id, + ) # Create a new GraphEngine for this iteration graph_engine = GraphEngine( diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py b/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py index 8311a1e847a..481c7e524d2 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_parallel_human_input_join_resume.py @@ -30,6 +30,11 @@ from core.repositories.human_input_repository import ( HumanInputFormRepository, ) from core.workflow.node_runtime import DifyHumanInputNodeRuntime +from core.workflow.runtime_state import ( + bind_graph_runtime_state_to_graph, + create_graph_runtime_state, + snapshot_graph_runtime_state, +) from core.workflow.system_variables import build_system_variables from libs.datetime_utils import naive_utc_now from tests.workflow_test_utils import build_test_graph_init_params @@ -46,7 +51,10 @@ class InMemoryPauseStore: self._snapshot: str | None = None def save(self, runtime_state: GraphRuntimeState) -> None: - self._snapshot = runtime_state.dumps() + self._snapshot = snapshot_graph_runtime_state( + runtime_state, + workflow_id="workflow", + ) def load(self) -> GraphRuntimeState: assert self._snapshot is not None @@ -122,7 +130,11 @@ def _build_runtime_state() -> GraphRuntimeState: user_inputs={}, conversation_variables=[], ) - return GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) + return create_graph_runtime_state( + variable_pool=variable_pool, + start_at=time.perf_counter(), + workflow_id="workflow", + ) def _build_graph(runtime_state: GraphRuntimeState, repo: HumanInputFormRepository) -> Graph: @@ -200,6 +212,11 @@ def _build_graph(runtime_state: GraphRuntimeState, repo: HumanInputFormRepositor def _run_graph(graph: Graph, runtime_state: GraphRuntimeState) -> list[object]: + bind_graph_runtime_state_to_graph( + runtime_state, + graph, + workflow_id="workflow", + ) engine = GraphEngine( workflow_id="workflow", graph=graph, diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py b/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py index b11f9576777..0a82cda92e4 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py @@ -42,6 +42,7 @@ from graphon.variables import ( from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, InvokeFrom, UserFrom from core.tools.utils.yaml_utils import _load_yaml_file from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id +from core.workflow.runtime_state import bind_graph_runtime_state_to_graph, create_graph_runtime_state from core.workflow.system_variables import build_bootstrap_variables, build_system_variables from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool @@ -65,9 +66,10 @@ class _TableTestChildEngineBuilder: root_node_id: str, variable_pool: VariablePool | None = None, ) -> GraphEngine: - child_graph_runtime_state = GraphRuntimeState( + child_graph_runtime_state = create_graph_runtime_state( variable_pool=variable_pool if variable_pool is not None else parent_graph_runtime_state.variable_pool, start_at=time.perf_counter(), + workflow_id=workflow_id, execution_context=parent_graph_runtime_state.execution_context, ) if self._use_mock_factory: @@ -86,6 +88,11 @@ class _TableTestChildEngineBuilder: child_graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=root_node_id) if not child_graph: raise ValueError("child graph not found") + bind_graph_runtime_state_to_graph( + child_graph_runtime_state, + child_graph, + workflow_id=workflow_id, + ) child_engine = GraphEngine( workflow_id=workflow_id, @@ -261,7 +268,11 @@ class WorkflowRunner: ) add_node_inputs_to_pool(variable_pool, node_id=root_node_id, inputs=root_node_inputs) - graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) + graph_runtime_state = create_graph_runtime_state( + variable_pool=variable_pool, + start_at=time.perf_counter(), + workflow_id=graph_init_params.workflow_id, + ) if use_mock_factory: node_factory = MockNodeFactory( @@ -275,6 +286,11 @@ class WorkflowRunner: node_factory=node_factory, root_node_id=root_node_id, ) + bind_graph_runtime_state_to_graph( + graph_runtime_state, + graph, + workflow_id=graph_init_params.workflow_id, + ) return graph, graph_runtime_state @@ -366,6 +382,11 @@ class TableTestRunner: try: graph, graph_runtime_state = self._create_graph_runtime_state(test_case) + bind_graph_runtime_state_to_graph( + graph_runtime_state, + graph, + workflow_id="test_workflow", + ) # Create and run the engine with configured worker settings engine = GraphEngine( diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py b/api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py index 12aec6edf24..49c7976f71c 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py @@ -5,6 +5,8 @@ from graphon.graph_events import ( NodeRunStreamChunkEvent, ) +from core.workflow.runtime_state import bind_graph_runtime_state_to_graph + from .test_table_runner import TableTestRunner @@ -20,6 +22,11 @@ def test_tool_in_chatflow(): query="1", use_mock_factory=True, ) + bind_graph_runtime_state_to_graph( + graph_runtime_state, + graph, + workflow_id="test_workflow", + ) # Create and run the engine engine = GraphEngine( diff --git a/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py b/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py index b8b073f75c0..795a0c775a5 100644 --- a/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py +++ b/api/tests/unit_tests/services/workflow/test_workflow_event_snapshot_service.py @@ -9,11 +9,12 @@ from threading import Event import pytest from graphon.entities.pause_reason import HumanInputRequired from graphon.enums import WorkflowExecutionStatus, WorkflowNodeExecutionStatus -from graphon.runtime import GraphRuntimeState, VariablePool +from graphon.runtime import VariablePool from core.app.app_config.entities import WorkflowUIBasedAppConfig from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext, _WorkflowGenerateEntityWrapper +from core.workflow.runtime_state import create_graph_runtime_state, snapshot_graph_runtime_state from models.enums import CreatorUserRole from models.model import AppMode from models.workflow import WorkflowRun @@ -116,13 +117,20 @@ def _build_resumption_context(task_id: str) -> WorkflowResumptionContext: call_depth=0, workflow_execution_id="run-1", ) - runtime_state = GraphRuntimeState(variable_pool=VariablePool(), start_at=0.0) + runtime_state = create_graph_runtime_state( + variable_pool=VariablePool(), + start_at=0.0, + workflow_id="workflow-1", + ) runtime_state.register_paused_node("node-1") runtime_state.outputs = {"result": "value"} wrapper = _WorkflowGenerateEntityWrapper(entity=generate_entity) return WorkflowResumptionContext( generate_entity=wrapper, - serialized_graph_runtime_state=runtime_state.dumps(), + serialized_graph_runtime_state=snapshot_graph_runtime_state( + runtime_state, + workflow_id="workflow-1", + ), )