fix: adding a restore API for version control on workflow draft (#33582)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
盐粒 Yanli
2026-03-20 14:54:23 +08:00
committed by GitHub
parent 4d538c3727
commit c8ed584c0e
31 changed files with 1452 additions and 179 deletions

View File

@@ -79,10 +79,11 @@ from services.entities.knowledge_entities.rag_pipeline_entities import (
KnowledgeConfiguration,
PipelineTemplateInfoEntity,
)
from services.errors.app import WorkflowHashNotEqualError
from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError, WorkflowNotFoundError
from services.rag_pipeline.pipeline_template.pipeline_template_factory import PipelineTemplateRetrievalFactory
from services.tools.builtin_tools_manage_service import BuiltinToolManageService
from services.workflow_draft_variable_service import DraftVariableSaver, DraftVarLoader
from services.workflow_restore import apply_published_workflow_snapshot_to_draft
logger = logging.getLogger(__name__)
@@ -234,6 +235,21 @@ class RagPipelineService:
return workflow
def get_published_workflow_by_id(self, pipeline: Pipeline, workflow_id: str) -> Workflow | None:
"""Fetch a published workflow snapshot by ID for restore operations."""
workflow = (
db.session.query(Workflow)
.where(
Workflow.tenant_id == pipeline.tenant_id,
Workflow.app_id == pipeline.id,
Workflow.id == workflow_id,
)
.first()
)
if workflow and workflow.version == Workflow.VERSION_DRAFT:
raise IsDraftWorkflowError("source workflow must be published")
return workflow
def get_all_published_workflow(
self,
*,
@@ -327,6 +343,42 @@ class RagPipelineService:
# return draft workflow
return workflow
def restore_published_workflow_to_draft(
self,
*,
pipeline: Pipeline,
workflow_id: str,
account: Account,
) -> Workflow:
"""Restore a published pipeline workflow snapshot into the draft workflow.
Pipelines reuse the shared draft-restore field copy helper, but still own
the pipeline-specific flush/link step that wires a newly created draft
back onto ``pipeline.workflow_id``.
"""
source_workflow = self.get_published_workflow_by_id(pipeline=pipeline, workflow_id=workflow_id)
if not source_workflow:
raise WorkflowNotFoundError("Workflow not found.")
draft_workflow = self.get_draft_workflow(pipeline=pipeline)
draft_workflow, is_new_draft = apply_published_workflow_snapshot_to_draft(
tenant_id=pipeline.tenant_id,
app_id=pipeline.id,
source_workflow=source_workflow,
draft_workflow=draft_workflow,
account=account,
updated_at_factory=lambda: datetime.now(UTC).replace(tzinfo=None),
)
if is_new_draft:
db.session.add(draft_workflow)
db.session.flush()
pipeline.workflow_id = draft_workflow.id
db.session.commit()
return draft_workflow
def publish_workflow(
self,
*,

View File

@@ -0,0 +1,58 @@
"""Shared helpers for restoring published workflow snapshots into drafts.
Both app workflows and RAG pipeline workflows restore the same workflow fields
from a published snapshot into a draft. Keeping that field-copy logic in one
place prevents the two restore paths from drifting when we add or adjust draft
state in the future. Restore stays within a tenant, so we can safely reuse the
serialized workflow storage blobs without decrypting and re-encrypting secrets.
"""
from collections.abc import Callable
from datetime import datetime
from models import Account
from models.workflow import Workflow, WorkflowType
UpdatedAtFactory = Callable[[], datetime]
def apply_published_workflow_snapshot_to_draft(
*,
tenant_id: str,
app_id: str,
source_workflow: Workflow,
draft_workflow: Workflow | None,
account: Account,
updated_at_factory: UpdatedAtFactory,
) -> tuple[Workflow, bool]:
"""Copy a published workflow snapshot into a draft workflow record.
The caller remains responsible for source lookup, validation, flushing, and
post-commit side effects. This helper only centralizes the shared draft
creation/update semantics used by both restore entry points. Features are
copied from the stored JSON payload so restore does not normalize and dirty
the published source row before the caller commits.
"""
if not draft_workflow:
workflow_type = (
source_workflow.type.value if isinstance(source_workflow.type, WorkflowType) else source_workflow.type
)
draft_workflow = Workflow(
tenant_id=tenant_id,
app_id=app_id,
type=workflow_type,
version=Workflow.VERSION_DRAFT,
graph=source_workflow.graph,
features=source_workflow.serialized_features,
created_by=account.id,
)
draft_workflow.copy_serialized_variable_storage_from(source_workflow)
return draft_workflow, True
draft_workflow.graph = source_workflow.graph
draft_workflow.features = source_workflow.serialized_features
draft_workflow.updated_by = account.id
draft_workflow.updated_at = updated_at_factory()
draft_workflow.copy_serialized_variable_storage_from(source_workflow)
return draft_workflow, False

View File

@@ -63,7 +63,12 @@ from models.workflow import Workflow, WorkflowNodeExecutionModel, WorkflowNodeEx
from repositories.factory import DifyAPIRepositoryFactory
from services.billing_service import BillingService
from services.enterprise.plugin_manager_service import PluginCredentialType
from services.errors.app import IsDraftWorkflowError, TriggerNodeLimitExceededError, WorkflowHashNotEqualError
from services.errors.app import (
IsDraftWorkflowError,
TriggerNodeLimitExceededError,
WorkflowHashNotEqualError,
WorkflowNotFoundError,
)
from services.workflow.workflow_converter import WorkflowConverter
from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
@@ -75,6 +80,7 @@ from .human_input_delivery_test_service import (
HumanInputDeliveryTestService,
)
from .workflow_draft_variable_service import DraftVariableSaver, DraftVarLoader, WorkflowDraftVariableService
from .workflow_restore import apply_published_workflow_snapshot_to_draft
class WorkflowService:
@@ -279,6 +285,43 @@ class WorkflowService:
# return draft workflow
return workflow
def restore_published_workflow_to_draft(
self,
*,
app_model: App,
workflow_id: str,
account: Account,
) -> Workflow:
"""Restore a published workflow snapshot into the draft workflow.
Secret environment variables are copied server-side from the selected
published workflow so the normal draft sync flow stays stateless.
"""
source_workflow = self.get_published_workflow_by_id(app_model=app_model, workflow_id=workflow_id)
if not source_workflow:
raise WorkflowNotFoundError("Workflow not found.")
self.validate_features_structure(app_model=app_model, features=source_workflow.normalized_features_dict)
self.validate_graph_structure(graph=source_workflow.graph_dict)
draft_workflow = self.get_draft_workflow(app_model=app_model)
draft_workflow, is_new_draft = apply_published_workflow_snapshot_to_draft(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
source_workflow=source_workflow,
draft_workflow=draft_workflow,
account=account,
updated_at_factory=naive_utc_now,
)
if is_new_draft:
db.session.add(draft_workflow)
db.session.commit()
app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=draft_workflow)
return draft_workflow
def publish_workflow(
self,
*,