Compare commits

..

78 Commits

Author SHA1 Message Date
Asuka Minato
25c69ac540 one example of Session (#24135)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
2025-09-24 03:32:48 +08:00
QuantumGhost
96a0b9991e fix(api): Fix variable truncation for list[File] value in output mapping (#26133) 2025-09-23 21:30:46 +08:00
QuantumGhost
2913d17fe2 ci: Add hotfix/** branches to build-push workflow triggers (#26129) 2025-09-23 18:48:02 +08:00
Wu Tianwei
d9e45a1abe feat(pipeline): add language support to built-in pipeline templates and update related components (#26124) 2025-09-23 18:18:22 +08:00
longbingljw
24b4289d6c fix:add some explanation for oceanbase parser selection (#26071)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-09-23 17:06:06 +08:00
GuanMu
fb6ccccc3d chore: refactor component exports for consistency (#26033)
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-09-23 17:04:56 +08:00
17hz
8b74ae683a bump nextjs to 15.5 and turbopack for development mode (#24346)
Co-authored-by: crazywoola <427733928@qq.com>
Co-authored-by: 非法操作 <hjlarry@163.com>
2025-09-23 16:59:26 +08:00
Jyong
dd08957381 fix full_text_search name (#26104) 2025-09-23 16:40:26 +08:00
quicksand
407323f817 fix(api): graph engine debug logging NodeRunRetryEvent not effective (#26085) 2025-09-23 13:46:45 +08:00
-LAN-
2e2c87c5a1 fix(graph_engine): error strategy fall. (#26078)
Signed-off-by: -LAN- <laipz8200@outlook.com>
2025-09-23 01:51:43 +08:00
Asuka Minato
f4522fd695 try contextmanager (#26074)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-09-23 00:35:54 +08:00
夏目猫猫
760a2c656c amend regexp exec (#25986) 2025-09-23 00:47:13 +09:00
Asuka Minato
8940decd1b more httpx (#25651)
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-09-22 23:07:09 +08:00
Jyong
0c4193bd91 fix avatar-url to text (#26068) 2025-09-22 21:28:42 +08:00
Jyong
cd40cde790 fix tenant not exist (#26066) 2025-09-22 20:50:30 +08:00
Jyong
c60c754ac9 fix preview url (#26059) 2025-09-22 19:47:39 +08:00
非法操作
ef80d3b707 fix: Ensure compatibility with old provider name when updating model credentials (#26017) 2025-09-22 19:39:17 +08:00
QuantumGhost
24e8d21b3f chore(api): bump version (#25917) 2025-09-22 19:14:43 +08:00
Novice
d823da18db fix: iteration and loop node single step run (#26036) 2025-09-22 19:14:24 +08:00
QuantumGhost
1e3df09fc6 chore(api): adjust monkey patching in gunicorn.conf.py (#26056) 2025-09-22 18:23:01 +08:00
Stream
75a10c276c chore: remove mistakenly added trash file (#26041) 2025-09-22 16:07:02 +08:00
Hunter
50050527eb fix: Correctly map source_url to preview_url in file fields (#25957) 2025-09-22 14:31:49 +08:00
Wu Tianwei
a39b185627 fix: comment out unused segmentation rule properties in RuleDetail component (#26031) 2025-09-22 14:17:02 +08:00
dependabot[bot]
15270f09af chore(deps): bump boto3-stubs from 1.40.29 to 1.40.35 in /api (#26014)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-22 12:16:05 +08:00
QuantumGhost
f6a5ac0698 chore(api): upgade Gevent to 25.9.1 (#26026) 2025-09-22 12:15:50 +08:00
zyssyz123
2b79da722b fix: workflow (#26030) 2025-09-22 12:08:15 +08:00
-LAN-
71d69e43cd Align dev workflow branch triggers (#26029) 2025-09-22 11:56:28 +08:00
Yongtao Huang
5bc6e8a433 Fix: correct regex for file-preview URL re-signing (#25620)
Fixes #25619

The regex patterns for file-preview and image-preview contained an unescaped `?`, 
which caused incorrect matches such as `file-previe` or `image-previw`. 
This led to malformed URLs being incorrectly re-signed.

Changes:
- Escape `?` in both file-preview and image-preview regex patterns.
- Ensure only valid URLs are re-signed.

Added unit tests to cover:
- Valid file-preview and image-preview URLs (correctly re-signed).
- Misspelled file/image preview URLs (no longer incorrectly matched).

Other:
- Fix a deprecated function `datetime.utcnow()`

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-09-22 10:58:29 +08:00
dependabot[bot]
68076f2e22 chore(deps): bump abcjs from 6.5.1 to 6.5.2 in /web (#26018)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-22 10:36:40 +08:00
Wu Tianwei
8c38363038 fix: pass operation name to onUpdate callback in StatusItem component (#26019) 2025-09-22 10:19:12 +08:00
Shili Cao
345ac8333c Add Full-Text & Hybrid Search Support to Baidu Vector DB and Update SDK, Closes #25982 (#25983)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-09-22 10:17:35 +08:00
dependabot[bot]
2375047ef0 chore(deps-dev): bump eslint-plugin-storybook from 0.11.6 to 9.0.7 in /web (#26011)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-09-22 10:03:02 +08:00
Yongtao Huang
857a48012e Fix: use data.type instead of type when checking datasource node (#25965)
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-09-22 10:01:21 +08:00
longbingljw
208fe3d7de feat:support selecting different ftparser for OceanBase. (#25970)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-09-22 09:56:33 +08:00
dependabot[bot]
92cddbcc02 chore(deps): bump negotiator from 0.6.4 to 1.0.0 in /web (#26012)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-22 09:55:00 +08:00
dependabot[bot]
599b53c9cb chore(deps): bump authlib from 1.3.1 to 1.6.4 in /api (#26015)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-09-22 09:54:52 +08:00
jiangbo721
062b173c66 fix: Statistics, like workflows, do not include debug data. (#25979) 2025-09-20 10:47:59 +08:00
Yongtao Huang
db690013fd Chore: remove dead code in datasource.utils (#25984) 2025-09-20 10:47:52 +08:00
lyzno1
e93bfe3d41 fix: resolve chat sidebar UI bugs for hover panel and dropdown menu (#25813) 2025-09-19 18:28:49 +08:00
GuanMu
ab910c736c feat(goto-anything): add RAG pipeline node search (#25948) 2025-09-19 18:28:13 +08:00
Yeuoly
4047a6bb12 fix: ensure original response are maintained by yielding text messages in ApiTool (#23456) (#25973) 2025-09-19 18:27:33 +08:00
github-actions[bot]
df2478dc26 chore: translate i18n files and update type definitions (#25964)
Co-authored-by: WTW0313 <30284043+WTW0313@users.noreply.github.com>
2025-09-19 18:27:09 +08:00
-LAN-
4cc3f6045b Run import-linter within make lint (#25933) 2025-09-19 18:26:43 +08:00
Joel
1550316b8d fix: undefined match the wrong output schema (#25971) 2025-09-19 17:03:09 +08:00
Wu Tianwei
87394d2512 fix: enhance model parameter handling with advanced mode support and localization updates (#25963) 2025-09-19 15:47:52 +08:00
Wu Tianwei
bad59c95bc fix: update details display to conditionally show creator information (#25952) 2025-09-19 15:45:45 +08:00
Xiyuan Chen
9f138ef246 Refactor WorkflowService to handle missing default credentials gracef… (#25960) 2025-09-19 00:45:35 -07:00
zxhlyh
6453fc4973 fix: refresh datasource list after install datasource (#25949) 2025-09-19 11:03:45 +08:00
GuanMu
f62f926537 style: update GotoAnything component styling (#25929)
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-09-19 10:36:43 +08:00
Yongtao Huang
b3dafd913b Chore: correct inconsistent logging and typo (#25945) 2025-09-19 10:36:16 +08:00
-LAN-
b2d8a7eaf1 Fix: enforce editor-only access to chat message logs (#25936) 2025-09-18 21:59:51 +08:00
GuanMu
3e54414191 chore: update post_create_command.sh to use dynamic workspace root for aliases (#25913) 2025-09-18 21:09:43 +08:00
-LAN-
a173546c8d Fix: replace stdout prints with debug logging (#25931)
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-09-18 21:03:20 +08:00
-LAN-
aa69d90489 fix(makefile): correct uv project path for lint target (#25818) 2025-09-18 20:36:26 +08:00
-LAN-
4ba1292455 refactor: replace print statements with proper logging (#25773) 2025-09-18 20:35:47 +08:00
Maries
bb01c31f30 fix(api): enhance data handling in RagPipelineDslService to filter credentials (#25926) 2025-09-18 18:36:49 +08:00
Wu Tianwei
cd90b2ca9e refactor: replace useInvalid with useInvalidCustomizedTemplateList (#25924) 2025-09-18 18:17:20 +08:00
heyszt
9a65350cf7 fix: rollback aliyun_trace icon (#25921) 2025-09-18 18:01:08 +08:00
quicksand
680eb7a9f6 fix(datasets): retrieval_model null issue when updating dataset info (#25907) 2025-09-18 17:58:06 +08:00
crazywoola
878420463c fix: Message => str (#25876) 2025-09-18 17:57:57 +08:00
zxhlyh
4692e20daf fix: workflow header style (#25922) 2025-09-18 17:53:40 +08:00
QuantumGhost
13fe2ca8fe fix(api): fix single stepping variable loading (#25908) 2025-09-18 17:30:02 +08:00
zxhlyh
1264e7d4f6 fix: use invalid last run (#25911) 2025-09-18 16:52:27 +08:00
Yunlu Wen
4f45978cd9 fix: remote code execution in email endpoints (#25753)
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-09-18 16:45:34 +08:00
Saurabh Singh
5a0bf8e028 feat: make SQLALCHEMY_POOL_TIMEOUT configurable (#25468)
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2025-09-18 16:13:56 +08:00
Wu Tianwei
ffa163a8a8 refactor: simplify portal interactions and manage state in Configure component (#25906)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-09-18 15:57:33 +08:00
Novice
8f86f5749d chore: Update the value of sys.dialogue_count to start from 1. (#25905) 2025-09-18 15:52:52 +08:00
17hz
00d3bf15f3 perf(web): optimize ESLint performance with concurrency flag and remove oxlint (#25899)
Co-authored-by: Claude <noreply@anthropic.com>
2025-09-18 15:50:42 +08:00
17hz
7196c09e9d chore(workflows): remove redundant eslint command from style workflow (#25900) 2025-09-18 15:50:09 +08:00
zxhlyh
fadd9e0bf4 fix: workflow logs list (#25903) 2025-09-18 15:45:37 +08:00
zxhlyh
d8b4bbe067 fix: datasource pinned list (#25896) 2025-09-18 14:52:33 +08:00
GuanMu
24611e375a fix: update Python base image to use bullseye variant (#25895) 2025-09-18 14:38:56 +08:00
lyzno1
ccec582cea chore: add missing template translations in ja-JP (#25892) 2025-09-18 14:37:26 +08:00
Bowen Liang
b2e4107c17 chore: improve opendal storage and ensure closing file after reading files in load_stream method (#25874) 2025-09-18 14:09:19 +08:00
quicksand
87aa070486 feat(api/commands): add migrate-oss to migrate from Local/OpenDAL to … (#25828) 2025-09-18 14:09:00 +08:00
Novice
21230a8eb2 fix: handle None description in MCP tool transformation (#25872)
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
2025-09-18 13:11:38 +08:00
-LAN-
85cda47c70 feat: knowledge pipeline (#25360)
Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: twwu <twwu@dify.ai>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
Co-authored-by: jyong <718720800@qq.com>
Co-authored-by: Wu Tianwei <30284043+WTW0313@users.noreply.github.com>
Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com>
Co-authored-by: lyzno1 <yuanyouhuilyz@gmail.com>
Co-authored-by: quicksand <quicksandzn@gmail.com>
Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com>
Co-authored-by: lyzno1 <92089059+lyzno1@users.noreply.github.com>
Co-authored-by: zxhlyh <jasonapring2015@outlook.com>
Co-authored-by: Yongtao Huang <yongtaoh2022@gmail.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Joel <iamjoel007@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: nite-knite <nkCoding@gmail.com>
Co-authored-by: Hanqing Zhao <sherry9277@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Harry <xh001x@hotmail.com>
2025-09-18 12:49:10 +08:00
zyssyz123
7dadb33003 fix: remove billing cache when add or delete app or member (#25885) 2025-09-18 12:18:07 +08:00
212 changed files with 3466 additions and 4131 deletions

View File

@@ -1,4 +1,4 @@
FROM mcr.microsoft.com/devcontainers/python:3.12
FROM mcr.microsoft.com/devcontainers/python:3.12-bullseye
RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
&& apt-get -y install libgmp-dev libmpfr-dev libmpc-dev

View File

@@ -1,15 +1,16 @@
#!/bin/bash
WORKSPACE_ROOT=$(pwd)
corepack enable
cd web && pnpm install
pipx install uv
echo 'alias start-api="cd /workspaces/dify/api && uv run python -m flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc
echo 'alias start-worker="cd /workspaces/dify/api && uv run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage"' >> ~/.bashrc
echo 'alias start-web="cd /workspaces/dify/web && pnpm dev"' >> ~/.bashrc
echo 'alias start-web-prod="cd /workspaces/dify/web && pnpm build && pnpm start"' >> ~/.bashrc
echo 'alias start-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env up -d"' >> ~/.bashrc
echo 'alias stop-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env down"' >> ~/.bashrc
echo "alias start-api=\"cd $WORKSPACE_ROOT/api && uv run python -m flask run --host 0.0.0.0 --port=5001 --debug\"" >> ~/.bashrc
echo "alias start-worker=\"cd $WORKSPACE_ROOT/api && uv run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion,plugin,workflow_storage\"" >> ~/.bashrc
echo "alias start-web=\"cd $WORKSPACE_ROOT/web && pnpm dev\"" >> ~/.bashrc
echo "alias start-web-prod=\"cd $WORKSPACE_ROOT/web && pnpm build && pnpm start\"" >> ~/.bashrc
echo "alias start-containers=\"cd $WORKSPACE_ROOT/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env up -d\"" >> ~/.bashrc
echo "alias stop-containers=\"cd $WORKSPACE_ROOT/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env down\"" >> ~/.bashrc
source /home/vscode/.bashrc

View File

@@ -8,8 +8,7 @@ on:
- "deploy/enterprise"
- "build/**"
- "release/e-*"
- "deploy/rag-dev"
- "feat/rag-2"
- "hotfix/**"
tags:
- "*"

View File

@@ -4,7 +4,7 @@ on:
workflow_run:
workflows: ["Build and Push API & Web"]
branches:
- "deploy/rag-dev"
- "deploy/dev"
types:
- completed
@@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
if: |
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.head_branch == 'deploy/rag-dev'
github.event.workflow_run.head_branch == 'deploy/dev'
steps:
- name: Deploy to server
uses: appleboy/ssh-action@v0.1.8

View File

@@ -102,7 +102,6 @@ jobs:
working-directory: ./web
run: |
pnpm run lint
pnpm run eslint
docker-compose-template:
name: Docker Compose Template

6
.gitignore vendored
View File

@@ -230,4 +230,8 @@ api/.env.backup
# Benchmark
scripts/stress-test/setup/config/
scripts/stress-test/reports/
scripts/stress-test/reports/
# mcp
.playwright-mcp/
.serena/

View File

@@ -61,8 +61,9 @@ check:
@echo "✅ Code check complete"
lint:
@echo "🔧 Running ruff format and check with fixes..."
@uv run --directory api --dev sh -c 'ruff format ./api && ruff check --fix ./api'
@echo "🔧 Running ruff format, check with fixes, and import linter..."
@uv run --project api --dev sh -c 'ruff format ./api && ruff check --fix ./api'
@uv run --directory api --dev lint-imports
@echo "✅ Linting complete"
type-check:

View File

@@ -76,6 +76,7 @@ DB_HOST=localhost
DB_PORT=5432
DB_DATABASE=dify
SQLALCHEMY_POOL_PRE_PING=true
SQLALCHEMY_POOL_TIMEOUT=30
# Storage configuration
# use for store upload files, private keys...
@@ -303,6 +304,8 @@ BAIDU_VECTOR_DB_API_KEY=dify
BAIDU_VECTOR_DB_DATABASE=dify
BAIDU_VECTOR_DB_SHARD=1
BAIDU_VECTOR_DB_REPLICAS=3
BAIDU_VECTOR_DB_INVERTED_INDEX_ANALYZER=DEFAULT_ANALYZER
BAIDU_VECTOR_DB_INVERTED_INDEX_PARSER_MODE=COARSE_MODE
# Upstash configuration
UPSTASH_VECTOR_URL=your-server-url

View File

@@ -30,6 +30,7 @@ select = [
"RUF022", # unsorted-dunder-all
"S506", # unsafe-yaml-load
"SIM", # flake8-simplify rules
"T201", # print-found
"TRY400", # error-instead-of-exception
"TRY401", # verbose-log-message
"UP", # pyupgrade rules
@@ -91,11 +92,18 @@ ignore = [
"configs/*" = [
"N802", # invalid-function-name
]
"core/model_runtime/callbacks/base_callback.py" = [
"T201",
]
"core/workflow/callbacks/workflow_logging_callback.py" = [
"T201",
]
"libs/gmpy2_pkcs10aep_cipher.py" = [
"N803", # invalid-argument-name
]
"tests/*" = [
"F811", # redefined-while-unused
"T201", # allow print in tests
]
[lint.pyflakes]

View File

@@ -1,20 +1,11 @@
import logging
import psycogreen.gevent as pscycogreen_gevent # type: ignore
from grpc.experimental import gevent as grpc_gevent # type: ignore
_logger = logging.getLogger(__name__)
def _log(message: str):
print(message, flush=True)
# grpc gevent
grpc_gevent.init_gevent()
_log("gRPC patched with gevent.")
print("gRPC patched with gevent.", flush=True) # noqa: T201
pscycogreen_gevent.patch_psycopg()
_log("psycopg2 patched with gevent.")
print("psycopg2 patched with gevent.", flush=True) # noqa: T201
from app import app, celery

View File

@@ -10,6 +10,7 @@ from flask import current_app
from pydantic import TypeAdapter
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker
from configs import dify_config
from constants.languages import languages
@@ -25,13 +26,15 @@ from events.app_event import app_was_created
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from extensions.storage.opendal_storage import OpenDALStorage
from extensions.storage.storage_type import StorageType
from libs.helper import email as email_validate
from libs.password import hash_password, password_pattern, valid_password
from libs.rsa import generate_key_pair
from models import Tenant
from models.dataset import Dataset, DatasetCollectionBinding, DatasetMetadata, DatasetMetadataBinding, DocumentSegment
from models.dataset import Document as DatasetDocument
from models.model import Account, App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation
from models.model import Account, App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation, UploadFile
from models.oauth import DatasourceOauthParamConfig, DatasourceProvider
from models.provider import Provider, ProviderModel
from models.provider_ids import DatasourceProviderID, ToolProviderID
@@ -59,31 +62,30 @@ def reset_password(email, new_password, password_confirm):
if str(new_password).strip() != str(password_confirm).strip():
click.echo(click.style("Passwords do not match.", fg="red"))
return
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
account = session.query(Account).where(Account.email == email).one_or_none()
account = db.session.query(Account).where(Account.email == email).one_or_none()
if not account:
click.echo(click.style(f"Account not found for email: {email}", fg="red"))
return
if not account:
click.echo(click.style(f"Account not found for email: {email}", fg="red"))
return
try:
valid_password(new_password)
except:
click.echo(click.style(f"Invalid password. Must match {password_pattern}", fg="red"))
return
try:
valid_password(new_password)
except:
click.echo(click.style(f"Invalid password. Must match {password_pattern}", fg="red"))
return
# generate password salt
salt = secrets.token_bytes(16)
base64_salt = base64.b64encode(salt).decode()
# generate password salt
salt = secrets.token_bytes(16)
base64_salt = base64.b64encode(salt).decode()
# encrypt password with salt
password_hashed = hash_password(new_password, salt)
base64_password_hashed = base64.b64encode(password_hashed).decode()
account.password = base64_password_hashed
account.password_salt = base64_salt
db.session.commit()
AccountService.reset_login_error_rate_limit(email)
click.echo(click.style("Password reset successfully.", fg="green"))
# encrypt password with salt
password_hashed = hash_password(new_password, salt)
base64_password_hashed = base64.b64encode(password_hashed).decode()
account.password = base64_password_hashed
account.password_salt = base64_salt
AccountService.reset_login_error_rate_limit(email)
click.echo(click.style("Password reset successfully.", fg="green"))
@click.command("reset-email", help="Reset the account email.")
@@ -98,22 +100,21 @@ def reset_email(email, new_email, email_confirm):
if str(new_email).strip() != str(email_confirm).strip():
click.echo(click.style("New emails do not match.", fg="red"))
return
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
account = session.query(Account).where(Account.email == email).one_or_none()
account = db.session.query(Account).where(Account.email == email).one_or_none()
if not account:
click.echo(click.style(f"Account not found for email: {email}", fg="red"))
return
if not account:
click.echo(click.style(f"Account not found for email: {email}", fg="red"))
return
try:
email_validate(new_email)
except:
click.echo(click.style(f"Invalid email: {new_email}", fg="red"))
return
try:
email_validate(new_email)
except:
click.echo(click.style(f"Invalid email: {new_email}", fg="red"))
return
account.email = new_email
db.session.commit()
click.echo(click.style("Email updated successfully.", fg="green"))
account.email = new_email
click.echo(click.style("Email updated successfully.", fg="green"))
@click.command(
@@ -137,25 +138,24 @@ def reset_encrypt_key_pair():
if dify_config.EDITION != "SELF_HOSTED":
click.echo(click.style("This command is only for SELF_HOSTED installations.", fg="red"))
return
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
tenants = session.query(Tenant).all()
for tenant in tenants:
if not tenant:
click.echo(click.style("No workspaces found. Run /install first.", fg="red"))
return
tenants = db.session.query(Tenant).all()
for tenant in tenants:
if not tenant:
click.echo(click.style("No workspaces found. Run /install first.", fg="red"))
return
tenant.encrypt_public_key = generate_key_pair(tenant.id)
tenant.encrypt_public_key = generate_key_pair(tenant.id)
session.query(Provider).where(Provider.provider_type == "custom", Provider.tenant_id == tenant.id).delete()
session.query(ProviderModel).where(ProviderModel.tenant_id == tenant.id).delete()
db.session.query(Provider).where(Provider.provider_type == "custom", Provider.tenant_id == tenant.id).delete()
db.session.query(ProviderModel).where(ProviderModel.tenant_id == tenant.id).delete()
db.session.commit()
click.echo(
click.style(
f"Congratulations! The asymmetric key pair of workspace {tenant.id} has been reset.",
fg="green",
click.echo(
click.style(
f"Congratulations! The asymmetric key pair of workspace {tenant.id} has been reset.",
fg="green",
)
)
)
@click.command("vdb-migrate", help="Migrate vector db.")
@@ -180,14 +180,15 @@ def migrate_annotation_vector_database():
try:
# get apps info
per_page = 50
apps = (
db.session.query(App)
.where(App.status == "normal")
.order_by(App.created_at.desc())
.limit(per_page)
.offset((page - 1) * per_page)
.all()
)
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
apps = (
session.query(App)
.where(App.status == "normal")
.order_by(App.created_at.desc())
.limit(per_page)
.offset((page - 1) * per_page)
.all()
)
if not apps:
break
except SQLAlchemyError:
@@ -201,26 +202,27 @@ def migrate_annotation_vector_database():
)
try:
click.echo(f"Creating app annotation index: {app.id}")
app_annotation_setting = (
db.session.query(AppAnnotationSetting).where(AppAnnotationSetting.app_id == app.id).first()
)
with sessionmaker(db.engine, expire_on_commit=False).begin() as session:
app_annotation_setting = (
session.query(AppAnnotationSetting).where(AppAnnotationSetting.app_id == app.id).first()
)
if not app_annotation_setting:
skipped_count = skipped_count + 1
click.echo(f"App annotation setting disabled: {app.id}")
continue
# get dataset_collection_binding info
dataset_collection_binding = (
db.session.query(DatasetCollectionBinding)
.where(DatasetCollectionBinding.id == app_annotation_setting.collection_binding_id)
.first()
)
if not dataset_collection_binding:
click.echo(f"App annotation collection binding not found: {app.id}")
continue
annotations = db.session.scalars(
select(MessageAnnotation).where(MessageAnnotation.app_id == app.id)
).all()
if not app_annotation_setting:
skipped_count = skipped_count + 1
click.echo(f"App annotation setting disabled: {app.id}")
continue
# get dataset_collection_binding info
dataset_collection_binding = (
session.query(DatasetCollectionBinding)
.where(DatasetCollectionBinding.id == app_annotation_setting.collection_binding_id)
.first()
)
if not dataset_collection_binding:
click.echo(f"App annotation collection binding not found: {app.id}")
continue
annotations = session.scalars(
select(MessageAnnotation).where(MessageAnnotation.app_id == app.id)
).all()
dataset = Dataset(
id=app.id,
tenant_id=app.tenant_id,
@@ -737,18 +739,18 @@ where sites.id is null limit 1000"""
try:
app = db.session.query(App).where(App.id == app_id).first()
if not app:
print(f"App {app_id} not found")
logger.info("App %s not found", app_id)
continue
tenant = app.tenant
if tenant:
accounts = tenant.get_accounts()
if not accounts:
print(f"Fix failed for app {app.id}")
logger.info("Fix failed for app %s", app.id)
continue
account = accounts[0]
print(f"Fixing missing site for app {app.id}")
logger.info("Fixing missing site for app %s", app.id)
app_was_created.send(app, account=account)
except Exception:
failed_app_ids.append(app_id)
@@ -1446,41 +1448,52 @@ def transform_datasource_credentials():
notion_credentials_tenant_mapping[tenant_id] = []
notion_credentials_tenant_mapping[tenant_id].append(notion_credential)
for tenant_id, notion_tenant_credentials in notion_credentials_tenant_mapping.items():
# check notion plugin is installed
installed_plugins = installer_manager.list_plugins(tenant_id)
installed_plugins_ids = [plugin.plugin_id for plugin in installed_plugins]
if notion_plugin_id not in installed_plugins_ids:
if notion_plugin_unique_identifier:
# install notion plugin
PluginService.install_from_marketplace_pkg(tenant_id, [notion_plugin_unique_identifier])
auth_count = 0
for notion_tenant_credential in notion_tenant_credentials:
auth_count += 1
# get credential oauth params
access_token = notion_tenant_credential.access_token
# notion info
notion_info = notion_tenant_credential.source_info
workspace_id = notion_info.get("workspace_id")
workspace_name = notion_info.get("workspace_name")
workspace_icon = notion_info.get("workspace_icon")
new_credentials = {
"integration_secret": encrypter.encrypt_token(tenant_id, access_token),
"workspace_id": workspace_id,
"workspace_name": workspace_name,
"workspace_icon": workspace_icon,
}
datasource_provider = DatasourceProvider(
provider="notion_datasource",
tenant_id=tenant_id,
plugin_id=notion_plugin_id,
auth_type=oauth_credential_type.value,
encrypted_credentials=new_credentials,
name=f"Auth {auth_count}",
avatar_url=workspace_icon or "default",
is_default=False,
tenant = db.session.query(Tenant).filter_by(id=tenant_id).first()
if not tenant:
continue
try:
# check notion plugin is installed
installed_plugins = installer_manager.list_plugins(tenant_id)
installed_plugins_ids = [plugin.plugin_id for plugin in installed_plugins]
if notion_plugin_id not in installed_plugins_ids:
if notion_plugin_unique_identifier:
# install notion plugin
PluginService.install_from_marketplace_pkg(tenant_id, [notion_plugin_unique_identifier])
auth_count = 0
for notion_tenant_credential in notion_tenant_credentials:
auth_count += 1
# get credential oauth params
access_token = notion_tenant_credential.access_token
# notion info
notion_info = notion_tenant_credential.source_info
workspace_id = notion_info.get("workspace_id")
workspace_name = notion_info.get("workspace_name")
workspace_icon = notion_info.get("workspace_icon")
new_credentials = {
"integration_secret": encrypter.encrypt_token(tenant_id, access_token),
"workspace_id": workspace_id,
"workspace_name": workspace_name,
"workspace_icon": workspace_icon,
}
datasource_provider = DatasourceProvider(
provider="notion_datasource",
tenant_id=tenant_id,
plugin_id=notion_plugin_id,
auth_type=oauth_credential_type.value,
encrypted_credentials=new_credentials,
name=f"Auth {auth_count}",
avatar_url=workspace_icon or "default",
is_default=False,
)
db.session.add(datasource_provider)
deal_notion_count += 1
except Exception as e:
click.echo(
click.style(
f"Error transforming notion credentials: {str(e)}, tenant_id: {tenant_id}", fg="red"
)
)
db.session.add(datasource_provider)
deal_notion_count += 1
continue
db.session.commit()
# deal firecrawl credentials
deal_firecrawl_count = 0
@@ -1493,37 +1506,48 @@ def transform_datasource_credentials():
firecrawl_credentials_tenant_mapping[tenant_id] = []
firecrawl_credentials_tenant_mapping[tenant_id].append(firecrawl_credential)
for tenant_id, firecrawl_tenant_credentials in firecrawl_credentials_tenant_mapping.items():
# check firecrawl plugin is installed
installed_plugins = installer_manager.list_plugins(tenant_id)
installed_plugins_ids = [plugin.plugin_id for plugin in installed_plugins]
if firecrawl_plugin_id not in installed_plugins_ids:
if firecrawl_plugin_unique_identifier:
# install firecrawl plugin
PluginService.install_from_marketplace_pkg(tenant_id, [firecrawl_plugin_unique_identifier])
tenant = db.session.query(Tenant).filter_by(id=tenant_id).first()
if not tenant:
continue
try:
# check firecrawl plugin is installed
installed_plugins = installer_manager.list_plugins(tenant_id)
installed_plugins_ids = [plugin.plugin_id for plugin in installed_plugins]
if firecrawl_plugin_id not in installed_plugins_ids:
if firecrawl_plugin_unique_identifier:
# install firecrawl plugin
PluginService.install_from_marketplace_pkg(tenant_id, [firecrawl_plugin_unique_identifier])
auth_count = 0
for firecrawl_tenant_credential in firecrawl_tenant_credentials:
auth_count += 1
# get credential api key
credentials_json = json.loads(firecrawl_tenant_credential.credentials)
api_key = credentials_json.get("config", {}).get("api_key")
base_url = credentials_json.get("config", {}).get("base_url")
new_credentials = {
"firecrawl_api_key": api_key,
"base_url": base_url,
}
datasource_provider = DatasourceProvider(
provider="firecrawl",
tenant_id=tenant_id,
plugin_id=firecrawl_plugin_id,
auth_type=api_key_credential_type.value,
encrypted_credentials=new_credentials,
name=f"Auth {auth_count}",
avatar_url="default",
is_default=False,
auth_count = 0
for firecrawl_tenant_credential in firecrawl_tenant_credentials:
auth_count += 1
# get credential api key
credentials_json = json.loads(firecrawl_tenant_credential.credentials)
api_key = credentials_json.get("config", {}).get("api_key")
base_url = credentials_json.get("config", {}).get("base_url")
new_credentials = {
"firecrawl_api_key": api_key,
"base_url": base_url,
}
datasource_provider = DatasourceProvider(
provider="firecrawl",
tenant_id=tenant_id,
plugin_id=firecrawl_plugin_id,
auth_type=api_key_credential_type.value,
encrypted_credentials=new_credentials,
name=f"Auth {auth_count}",
avatar_url="default",
is_default=False,
)
db.session.add(datasource_provider)
deal_firecrawl_count += 1
except Exception as e:
click.echo(
click.style(
f"Error transforming firecrawl credentials: {str(e)}, tenant_id: {tenant_id}", fg="red"
)
)
db.session.add(datasource_provider)
deal_firecrawl_count += 1
continue
db.session.commit()
# deal jina credentials
deal_jina_count = 0
@@ -1536,36 +1560,45 @@ def transform_datasource_credentials():
jina_credentials_tenant_mapping[tenant_id] = []
jina_credentials_tenant_mapping[tenant_id].append(jina_credential)
for tenant_id, jina_tenant_credentials in jina_credentials_tenant_mapping.items():
# check jina plugin is installed
installed_plugins = installer_manager.list_plugins(tenant_id)
installed_plugins_ids = [plugin.plugin_id for plugin in installed_plugins]
if jina_plugin_id not in installed_plugins_ids:
if jina_plugin_unique_identifier:
# install jina plugin
print(jina_plugin_unique_identifier)
PluginService.install_from_marketplace_pkg(tenant_id, [jina_plugin_unique_identifier])
tenant = db.session.query(Tenant).filter_by(id=tenant_id).first()
if not tenant:
continue
try:
# check jina plugin is installed
installed_plugins = installer_manager.list_plugins(tenant_id)
installed_plugins_ids = [plugin.plugin_id for plugin in installed_plugins]
if jina_plugin_id not in installed_plugins_ids:
if jina_plugin_unique_identifier:
# install jina plugin
logger.debug("Installing Jina plugin %s", jina_plugin_unique_identifier)
PluginService.install_from_marketplace_pkg(tenant_id, [jina_plugin_unique_identifier])
auth_count = 0
for jina_tenant_credential in jina_tenant_credentials:
auth_count += 1
# get credential api key
credentials_json = json.loads(jina_tenant_credential.credentials)
api_key = credentials_json.get("config", {}).get("api_key")
new_credentials = {
"integration_secret": api_key,
}
datasource_provider = DatasourceProvider(
provider="jina",
tenant_id=tenant_id,
plugin_id=jina_plugin_id,
auth_type=api_key_credential_type.value,
encrypted_credentials=new_credentials,
name=f"Auth {auth_count}",
avatar_url="default",
is_default=False,
auth_count = 0
for jina_tenant_credential in jina_tenant_credentials:
auth_count += 1
# get credential api key
credentials_json = json.loads(jina_tenant_credential.credentials)
api_key = credentials_json.get("config", {}).get("api_key")
new_credentials = {
"integration_secret": api_key,
}
datasource_provider = DatasourceProvider(
provider="jina",
tenant_id=tenant_id,
plugin_id=jina_plugin_id,
auth_type=api_key_credential_type.value,
encrypted_credentials=new_credentials,
name=f"Auth {auth_count}",
avatar_url="default",
is_default=False,
)
db.session.add(datasource_provider)
deal_jina_count += 1
except Exception as e:
click.echo(
click.style(f"Error transforming jina credentials: {str(e)}, tenant_id: {tenant_id}", fg="red")
)
db.session.add(datasource_provider)
deal_jina_count += 1
continue
db.session.commit()
except Exception as e:
click.echo(click.style(f"Error parsing client params: {str(e)}", fg="red"))
@@ -1597,3 +1630,197 @@ def install_rag_pipeline_plugins(input_file, output_file, workers):
workers,
)
click.echo(click.style("Installing rag pipeline plugins successfully", fg="green"))
@click.command(
"migrate-oss",
help="Migrate files from Local or OpenDAL source to a cloud OSS storage (destination must NOT be local/opendal).",
)
@click.option(
"--path",
"paths",
multiple=True,
help="Storage path prefixes to migrate (repeatable). Defaults: privkeys, upload_files, image_files,"
" tools, website_files, keyword_files, ops_trace",
)
@click.option(
"--source",
type=click.Choice(["local", "opendal"], case_sensitive=False),
default="opendal",
show_default=True,
help="Source storage type to read from",
)
@click.option("--overwrite", is_flag=True, default=False, help="Overwrite destination if file already exists")
@click.option("--dry-run", is_flag=True, default=False, help="Show what would be migrated without uploading")
@click.option("-f", "--force", is_flag=True, help="Skip confirmation and run without prompts")
@click.option(
"--update-db/--no-update-db",
default=True,
help="Update upload_files.storage_type from source type to current storage after migration",
)
def migrate_oss(
paths: tuple[str, ...],
source: str,
overwrite: bool,
dry_run: bool,
force: bool,
update_db: bool,
):
"""
Copy all files under selected prefixes from a source storage
(Local filesystem or OpenDAL-backed) into the currently configured
destination storage backend, then optionally update DB records.
Expected usage: set STORAGE_TYPE (and its credentials) to your target backend.
"""
# Ensure target storage is not local/opendal
if dify_config.STORAGE_TYPE in (StorageType.LOCAL, StorageType.OPENDAL):
click.echo(
click.style(
"Target STORAGE_TYPE must be a cloud OSS (not 'local' or 'opendal').\n"
"Please set STORAGE_TYPE to one of: s3, aliyun-oss, azure-blob, google-storage, tencent-cos, \n"
"volcengine-tos, supabase, oci-storage, huawei-obs, baidu-obs, clickzetta-volume.",
fg="red",
)
)
return
# Default paths if none specified
default_paths = ("privkeys", "upload_files", "image_files", "tools", "website_files", "keyword_files", "ops_trace")
path_list = list(paths) if paths else list(default_paths)
is_source_local = source.lower() == "local"
click.echo(click.style("Preparing migration to target storage.", fg="yellow"))
click.echo(click.style(f"Target storage type: {dify_config.STORAGE_TYPE}", fg="white"))
if is_source_local:
src_root = dify_config.STORAGE_LOCAL_PATH
click.echo(click.style(f"Source: local fs, root: {src_root}", fg="white"))
else:
click.echo(click.style(f"Source: opendal scheme={dify_config.OPENDAL_SCHEME}", fg="white"))
click.echo(click.style(f"Paths to migrate: {', '.join(path_list)}", fg="white"))
click.echo("")
if not force:
click.confirm("Proceed with migration?", abort=True)
# Instantiate source storage
try:
if is_source_local:
src_root = dify_config.STORAGE_LOCAL_PATH
source_storage = OpenDALStorage(scheme="fs", root=src_root)
else:
source_storage = OpenDALStorage(scheme=dify_config.OPENDAL_SCHEME)
except Exception as e:
click.echo(click.style(f"Failed to initialize source storage: {str(e)}", fg="red"))
return
total_files = 0
copied_files = 0
skipped_files = 0
errored_files = 0
copied_upload_file_keys: list[str] = []
for prefix in path_list:
click.echo(click.style(f"Scanning source path: {prefix}", fg="white"))
try:
keys = source_storage.scan(path=prefix, files=True, directories=False)
except FileNotFoundError:
click.echo(click.style(f" -> Skipping missing path: {prefix}", fg="yellow"))
continue
except NotImplementedError:
click.echo(click.style(" -> Source storage does not support scanning.", fg="red"))
return
except Exception as e:
click.echo(click.style(f" -> Error scanning '{prefix}': {str(e)}", fg="red"))
continue
click.echo(click.style(f"Found {len(keys)} files under {prefix}", fg="white"))
for key in keys:
total_files += 1
# check destination existence
if not overwrite:
try:
if storage.exists(key):
skipped_files += 1
continue
except Exception as e:
# existence check failures should not block migration attempt
# but should be surfaced to user as a warning for visibility
click.echo(
click.style(
f" -> Warning: failed target existence check for {key}: {str(e)}",
fg="yellow",
)
)
if dry_run:
copied_files += 1
continue
# read from source and write to destination
try:
data = source_storage.load_once(key)
except FileNotFoundError:
errored_files += 1
click.echo(click.style(f" -> Missing on source: {key}", fg="yellow"))
continue
except Exception as e:
errored_files += 1
click.echo(click.style(f" -> Error reading {key}: {str(e)}", fg="red"))
continue
try:
storage.save(key, data)
copied_files += 1
if prefix == "upload_files":
copied_upload_file_keys.append(key)
except Exception as e:
errored_files += 1
click.echo(click.style(f" -> Error writing {key} to target: {str(e)}", fg="red"))
continue
click.echo("")
click.echo(click.style("Migration summary:", fg="yellow"))
click.echo(click.style(f" Total: {total_files}", fg="white"))
click.echo(click.style(f" Copied: {copied_files}", fg="green"))
click.echo(click.style(f" Skipped: {skipped_files}", fg="white"))
if errored_files:
click.echo(click.style(f" Errors: {errored_files}", fg="red"))
if dry_run:
click.echo(click.style("Dry-run complete. No changes were made.", fg="green"))
return
if errored_files:
click.echo(
click.style(
"Some files failed to migrate. Review errors above before updating DB records.",
fg="yellow",
)
)
if update_db and not force:
if not click.confirm("Proceed to update DB storage_type despite errors?", default=False):
update_db = False
# Optionally update DB records for upload_files.storage_type (only for successfully copied upload_files)
if update_db:
if not copied_upload_file_keys:
click.echo(click.style("No upload_files copied. Skipping DB storage_type update.", fg="yellow"))
else:
try:
source_storage_type = StorageType.LOCAL if is_source_local else StorageType.OPENDAL
updated = (
db.session.query(UploadFile)
.where(
UploadFile.storage_type == source_storage_type,
UploadFile.key.in_(copied_upload_file_keys),
)
.update({UploadFile.storage_type: dify_config.STORAGE_TYPE}, synchronize_session=False)
)
db.session.commit()
click.echo(click.style(f"Updated storage_type for {updated} upload_files records.", fg="green"))
except Exception as e:
db.session.rollback()
click.echo(click.style(f"Failed to update DB storage_type: {str(e)}", fg="red"))

View File

@@ -1,3 +1,4 @@
from enum import StrEnum
from typing import Literal
from pydantic import (
@@ -711,11 +712,35 @@ class ToolConfig(BaseSettings):
)
class TemplateMode(StrEnum):
# unsafe mode allows flexible operations in templates, but may cause security vulnerabilities
UNSAFE = "unsafe"
# sandbox mode restricts some unsafe operations like accessing __class__.
# however, it is still not 100% safe, for example, cpu exploitation can happen.
SANDBOX = "sandbox"
# templating is disabled
DISABLED = "disabled"
class MailConfig(BaseSettings):
"""
Configuration for email services
"""
MAIL_TEMPLATING_MODE: TemplateMode = Field(
description="Template mode for email services",
default=TemplateMode.SANDBOX,
)
MAIL_TEMPLATING_TIMEOUT: int = Field(
description="""
Timeout for email templating in seconds. Used to prevent infinite loops in malicious templates.
Only available in sandbox mode.""",
default=3,
)
MAIL_TYPE: str | None = Field(
description="Email service provider type ('smtp' or 'resend' or 'sendGrid), default to None.",
default=None,

View File

@@ -187,6 +187,11 @@ class DatabaseConfig(BaseSettings):
default=False,
)
SQLALCHEMY_POOL_TIMEOUT: NonNegativeInt = Field(
description="Number of seconds to wait for a connection from the pool before raising a timeout error.",
default=30,
)
RETRIEVAL_SERVICE_EXECUTORS: NonNegativeInt = Field(
description="Number of processes for the retrieval service, default to CPU cores.",
default=os.cpu_count() or 1,
@@ -216,6 +221,7 @@ class DatabaseConfig(BaseSettings):
"connect_args": connect_args,
"pool_use_lifo": self.SQLALCHEMY_POOL_USE_LIFO,
"pool_reset_on_return": None,
"pool_timeout": self.SQLALCHEMY_POOL_TIMEOUT,
}

View File

@@ -41,3 +41,13 @@ class BaiduVectorDBConfig(BaseSettings):
description="Number of replicas for the Baidu Vector Database (default is 3)",
default=3,
)
BAIDU_VECTOR_DB_INVERTED_INDEX_ANALYZER: str = Field(
description="Analyzer type for inverted index in Baidu Vector Database (default is DEFAULT_ANALYZER)",
default="DEFAULT_ANALYZER",
)
BAIDU_VECTOR_DB_INVERTED_INDEX_PARSER_MODE: str = Field(
description="Parser mode for inverted index in Baidu Vector Database (default is COARSE_MODE)",
default="COARSE_MODE",
)

View File

@@ -37,3 +37,15 @@ class OceanBaseVectorConfig(BaseSettings):
"with older versions",
default=False,
)
OCEANBASE_FULLTEXT_PARSER: str | None = Field(
description=(
"Fulltext parser to use for text indexing. "
"Built-in options: 'ngram' (N-gram tokenizer for English/numbers), "
"'beng' (Basic English tokenizer), 'space' (Space-based tokenizer), "
"'ngram2' (Improved N-gram tokenizer), 'ik' (Chinese tokenizer). "
"External plugins (require installation): 'japanese_ftparser' (Japanese tokenizer), "
"'thai_ftparser' (Thai tokenizer). Default is 'ik'"
),
default="ik",
)

View File

@@ -5,7 +5,7 @@ import logging
import os
import time
import requests
import httpx
logger = logging.getLogger(__name__)
@@ -30,10 +30,10 @@ class NacosHttpClient:
params = {}
try:
self._inject_auth_info(headers, params)
response = requests.request(method, url="http://" + self.server + url, headers=headers, params=params)
response = httpx.request(method, url="http://" + self.server + url, headers=headers, params=params)
response.raise_for_status()
return response.text
except requests.RequestException as e:
except httpx.RequestError as e:
return f"Request to Nacos failed: {e}"
def _inject_auth_info(self, headers: dict[str, str], params: dict[str, str], module: str = "config") -> None:
@@ -78,7 +78,7 @@ class NacosHttpClient:
params = {"username": self.username, "password": self.password}
url = "http://" + self.server + "/nacos/v1/auth/login"
try:
resp = requests.request("POST", url, headers=None, params=params)
resp = httpx.request("POST", url, headers=None, params=params)
resp.raise_for_status()
response_data = resp.json()
self.token = response_data.get("accessToken")

View File

@@ -1,6 +1,7 @@
from datetime import datetime
import pytz # pip install pytz
import sqlalchemy as sa
from flask_login import current_user
from flask_restx import Resource, marshal_with, reqparse
from flask_restx.inputs import int_range
@@ -70,7 +71,7 @@ class CompletionConversationApi(Resource):
parser.add_argument("limit", type=int_range(1, 100), default=20, location="args")
args = parser.parse_args()
query = db.select(Conversation).where(
query = sa.select(Conversation).where(
Conversation.app_id == app_model.id, Conversation.mode == "completion", Conversation.is_deleted.is_(False)
)
@@ -236,7 +237,7 @@ class ChatConversationApi(Resource):
.subquery()
)
query = db.select(Conversation).where(Conversation.app_id == app_model.id, Conversation.is_deleted.is_(False))
query = sa.select(Conversation).where(Conversation.app_id == app_model.id, Conversation.is_deleted.is_(False))
if args["keyword"]:
keyword_filter = f"%{args['keyword']}%"

View File

@@ -62,6 +62,9 @@ class ChatMessageListApi(Resource):
@account_initialization_required
@marshal_with(message_infinite_scroll_pagination_fields)
def get(self, app_model):
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("conversation_id", required=True, type=uuid_value, location="args")
parser.add_argument("first_id", type=uuid_value, location="args")

View File

@@ -50,8 +50,9 @@ class DailyMessageStatistic(Resource):
FROM
messages
WHERE
app_id = :app_id"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id}
app_id = :app_id
AND invoke_from != :invoke_from"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id, "invoke_from": InvokeFrom.DEBUGGER.value}
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
@@ -187,8 +188,9 @@ class DailyTerminalsStatistic(Resource):
FROM
messages
WHERE
app_id = :app_id"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id}
app_id = :app_id
AND invoke_from != :invoke_from"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id, "invoke_from": InvokeFrom.DEBUGGER.value}
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
@@ -259,8 +261,9 @@ class DailyTokenCostStatistic(Resource):
FROM
messages
WHERE
app_id = :app_id"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id}
app_id = :app_id
AND invoke_from != :invoke_from"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id, "invoke_from": InvokeFrom.DEBUGGER.value}
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
@@ -340,8 +343,9 @@ FROM
messages m
ON c.id = m.conversation_id
WHERE
c.app_id = :app_id"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id}
c.app_id = :app_id
AND m.invoke_from != :invoke_from"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id, "invoke_from": InvokeFrom.DEBUGGER.value}
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
@@ -426,8 +430,9 @@ LEFT JOIN
message_feedbacks mf
ON mf.message_id=m.id AND mf.rating='like'
WHERE
m.app_id = :app_id"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id}
m.app_id = :app_id
AND m.invoke_from != :invoke_from"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id, "invoke_from": InvokeFrom.DEBUGGER.value}
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
@@ -502,8 +507,9 @@ class AverageResponseTimeStatistic(Resource):
FROM
messages
WHERE
app_id = :app_id"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id}
app_id = :app_id
AND invoke_from != :invoke_from"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id, "invoke_from": InvokeFrom.DEBUGGER.value}
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
@@ -576,8 +582,9 @@ class TokensPerSecondStatistic(Resource):
FROM
messages
WHERE
app_id = :app_id"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id}
app_id = :app_id
AND invoke_from != :invoke_from"""
arg_dict = {"tz": account.timezone, "app_id": app_model.id, "invoke_from": InvokeFrom.DEBUGGER.value}
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc

View File

@@ -1,6 +1,6 @@
import logging
import requests
import httpx
from flask import current_app, redirect, request
from flask_login import current_user
from flask_restx import Resource, fields
@@ -119,7 +119,7 @@ class OAuthDataSourceBinding(Resource):
return {"error": "Invalid code"}, 400
try:
oauth_provider.get_access_token(code)
except requests.HTTPError as e:
except httpx.HTTPStatusError as e:
logger.exception(
"An error occurred during the OAuthCallback process with %s: %s", provider, e.response.text
)
@@ -152,7 +152,7 @@ class OAuthDataSourceSync(Resource):
return {"error": "Invalid provider"}, 400
try:
oauth_provider.sync_data_source(binding_id)
except requests.HTTPError as e:
except httpx.HTTPStatusError as e:
logger.exception(
"An error occurred during the OAuthCallback process with %s: %s", provider, e.response.text
)

View File

@@ -1,6 +1,6 @@
import logging
import requests
import httpx
from flask import current_app, redirect, request
from flask_restx import Resource
from sqlalchemy import select
@@ -101,8 +101,10 @@ class OAuthCallback(Resource):
try:
token = oauth_provider.get_access_token(code)
user_info = oauth_provider.get_user_info(token)
except requests.RequestException as e:
error_text = e.response.text if e.response else str(e)
except httpx.RequestError as e:
error_text = str(e)
if isinstance(e, httpx.HTTPStatusError):
error_text = e.response.text
logger.exception("An error occurred during the OAuth process with %s: %s", provider, error_text)
return {"error": "OAuth process failed"}, 400

View File

@@ -782,7 +782,6 @@ class DatasetRetrievalSettingApi(Resource):
| VectorType.TIDB_VECTOR
| VectorType.CHROMA
| VectorType.PGVECTO_RS
| VectorType.BAIDU
| VectorType.VIKINGDB
| VectorType.UPSTASH
):
@@ -809,6 +808,7 @@ class DatasetRetrievalSettingApi(Resource):
| VectorType.TENCENT
| VectorType.MATRIXONE
| VectorType.CLICKZETTA
| VectorType.BAIDU
):
return {
"retrieval_method": [
@@ -838,7 +838,6 @@ class DatasetRetrievalSettingMockApi(Resource):
| VectorType.TIDB_VECTOR
| VectorType.CHROMA
| VectorType.PGVECTO_RS
| VectorType.BAIDU
| VectorType.VIKINGDB
| VectorType.UPSTASH
):
@@ -863,6 +862,7 @@ class DatasetRetrievalSettingMockApi(Resource):
| VectorType.HUAWEI_CLOUD
| VectorType.MATRIXONE
| VectorType.CLICKZETTA
| VectorType.BAIDU
):
return {
"retrieval_method": [

View File

@@ -4,6 +4,7 @@ from argparse import ArgumentTypeError
from collections.abc import Sequence
from typing import Literal, cast
import sqlalchemy as sa
from flask import request
from flask_login import current_user
from flask_restx import Resource, fields, marshal, marshal_with, reqparse
@@ -211,13 +212,13 @@ class DatasetDocumentListApi(Resource):
if sort == "hit_count":
sub_query = (
db.select(DocumentSegment.document_id, db.func.sum(DocumentSegment.hit_count).label("total_hit_count"))
sa.select(DocumentSegment.document_id, sa.func.sum(DocumentSegment.hit_count).label("total_hit_count"))
.group_by(DocumentSegment.document_id)
.subquery()
)
query = query.outerjoin(sub_query, sub_query.c.document_id == Document.id).order_by(
sort_logic(db.func.coalesce(sub_query.c.total_hit_count, 0)),
sort_logic(sa.func.coalesce(sub_query.c.total_hit_count, 0)),
sort_logic(Document.position),
)
elif sort == "created_at":

View File

@@ -118,12 +118,14 @@ class RagPipelineExportApi(Resource):
# Add include_secret params
parser = reqparse.RequestParser()
parser.add_argument("include_secret", type=bool, default=False, location="args")
parser.add_argument("include_secret", type=str, default="false", location="args")
args = parser.parse_args()
with Session(db.engine) as session:
export_service = RagPipelineDslService(session)
result = export_service.export_rag_pipeline_dsl(pipeline=pipeline, include_secret=args["include_secret"])
result = export_service.export_rag_pipeline_dsl(
pipeline=pipeline, include_secret=args["include_secret"] == "true"
)
return {"data": result}, 200

View File

@@ -1,7 +1,7 @@
import json
import logging
import requests
import httpx
from flask_restx import Resource, fields, reqparse
from packaging import version
@@ -57,7 +57,11 @@ class VersionApi(Resource):
return result
try:
response = requests.get(check_update_url, {"current_version": args["current_version"]}, timeout=(3, 10))
response = httpx.get(
check_update_url,
params={"current_version": args["current_version"]},
timeout=httpx.Timeout(connect=3, read=10),
)
except Exception as error:
logger.warning("Check update version error: %s.", str(error))
result["version"] = args["current_version"]

View File

@@ -261,6 +261,8 @@ class MessageSuggestedQuestionApi(WebApiResource):
questions = MessageService.get_suggested_questions_after_answer(
app_model=app_model, user=end_user, message_id=message_id, invoke_from=InvokeFrom.WEB_APP
)
# questions is a list of strings, not a list of Message objects
# so we can directly return it
except MessageNotExistsError:
raise NotFound("Message not found")
except ConversationNotExistsError:

View File

@@ -420,7 +420,9 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
db.session.refresh(conversation)
# get conversation dialogue count
self._dialogue_count = get_thread_messages_length(conversation.id)
# NOTE: dialogue_count should not start from 0,
# because during the first conversation, dialogue_count should be 1.
self._dialogue_count = get_thread_messages_length(conversation.id) + 1
# init queue manager
queue_manager = MessageBasedAppQueueManager(

View File

@@ -79,29 +79,12 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
if not app_record:
raise ValueError("App not found")
if self.application_generate_entity.single_iteration_run:
# if only single iteration run is requested
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool.empty(),
start_at=time.time(),
)
graph, variable_pool = self._get_graph_and_variable_pool_of_single_iteration(
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
# Handle single iteration or single loop run
graph, variable_pool, graph_runtime_state = self._prepare_single_node_execution(
workflow=self._workflow,
node_id=self.application_generate_entity.single_iteration_run.node_id,
user_inputs=dict(self.application_generate_entity.single_iteration_run.inputs),
graph_runtime_state=graph_runtime_state,
)
elif self.application_generate_entity.single_loop_run:
# if only single loop run is requested
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool.empty(),
start_at=time.time(),
)
graph, variable_pool = self._get_graph_and_variable_pool_of_single_loop(
workflow=self._workflow,
node_id=self.application_generate_entity.single_loop_run.node_id,
user_inputs=dict(self.application_generate_entity.single_loop_run.inputs),
graph_runtime_state=graph_runtime_state,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
)
else:
inputs = self.application_generate_entity.inputs

View File

@@ -427,6 +427,9 @@ class PipelineGenerator(BaseAppGenerator):
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
workflow_execution_id=str(uuid.uuid4()),
single_iteration_run=RagPipelineGenerateEntity.SingleIterationRunEntity(
node_id=node_id, inputs=args["inputs"]
),
)
contexts.plugin_tool_providers.set({})
contexts.plugin_tool_providers_lock.set(threading.Lock())
@@ -465,6 +468,7 @@ class PipelineGenerator(BaseAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
variable_loader=var_loader,
context=contextvars.copy_context(),
)
def single_loop_generate(
@@ -559,6 +563,7 @@ class PipelineGenerator(BaseAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
variable_loader=var_loader,
context=contextvars.copy_context(),
)
def _generate_worker(

View File

@@ -86,29 +86,12 @@ class PipelineRunner(WorkflowBasedAppRunner):
db.session.close()
# if only single iteration run is requested
if self.application_generate_entity.single_iteration_run:
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool.empty(),
start_at=time.time(),
)
# if only single iteration run is requested
graph, variable_pool = self._get_graph_and_variable_pool_of_single_iteration(
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
# Handle single iteration or single loop run
graph, variable_pool, graph_runtime_state = self._prepare_single_node_execution(
workflow=workflow,
node_id=self.application_generate_entity.single_iteration_run.node_id,
user_inputs=self.application_generate_entity.single_iteration_run.inputs,
graph_runtime_state=graph_runtime_state,
)
elif self.application_generate_entity.single_loop_run:
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool.empty(),
start_at=time.time(),
)
# if only single loop run is requested
graph, variable_pool = self._get_graph_and_variable_pool_of_single_loop(
workflow=workflow,
node_id=self.application_generate_entity.single_loop_run.node_id,
user_inputs=self.application_generate_entity.single_loop_run.inputs,
graph_runtime_state=graph_runtime_state,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
)
else:
inputs = self.application_generate_entity.inputs

View File

@@ -51,30 +51,12 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
app_config = self.application_generate_entity.app_config
app_config = cast(WorkflowAppConfig, app_config)
# if only single iteration run is requested
if self.application_generate_entity.single_iteration_run:
# if only single iteration run is requested
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool.empty(),
start_at=time.time(),
)
graph, variable_pool = self._get_graph_and_variable_pool_of_single_iteration(
# if only single iteration or single loop run is requested
if self.application_generate_entity.single_iteration_run or self.application_generate_entity.single_loop_run:
graph, variable_pool, graph_runtime_state = self._prepare_single_node_execution(
workflow=self._workflow,
node_id=self.application_generate_entity.single_iteration_run.node_id,
user_inputs=self.application_generate_entity.single_iteration_run.inputs,
graph_runtime_state=graph_runtime_state,
)
elif self.application_generate_entity.single_loop_run:
# if only single loop run is requested
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool.empty(),
start_at=time.time(),
)
graph, variable_pool = self._get_graph_and_variable_pool_of_single_loop(
workflow=self._workflow,
node_id=self.application_generate_entity.single_loop_run.node_id,
user_inputs=self.application_generate_entity.single_loop_run.inputs,
graph_runtime_state=graph_runtime_state,
single_iteration_run=self.application_generate_entity.single_iteration_run,
single_loop_run=self.application_generate_entity.single_loop_run,
)
else:
inputs = self.application_generate_entity.inputs

View File

@@ -1,3 +1,4 @@
import time
from collections.abc import Mapping
from typing import Any, cast
@@ -119,15 +120,81 @@ class WorkflowBasedAppRunner:
return graph
def _get_graph_and_variable_pool_of_single_iteration(
def _prepare_single_node_execution(
self,
workflow: Workflow,
single_iteration_run: Any | None = None,
single_loop_run: Any | None = None,
) -> tuple[Graph, VariablePool, GraphRuntimeState]:
"""
Prepare graph, variable pool, and runtime state for single node execution
(either single iteration or single loop).
Args:
workflow: The workflow instance
single_iteration_run: SingleIterationRunEntity if running single iteration, None otherwise
single_loop_run: SingleLoopRunEntity if running single loop, None otherwise
Returns:
A tuple containing (graph, variable_pool, graph_runtime_state)
Raises:
ValueError: If neither single_iteration_run nor single_loop_run is specified
"""
# Create initial runtime state with variable pool containing environment variables
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={},
environment_variables=workflow.environment_variables,
),
start_at=time.time(),
)
# Determine which type of single node execution and get graph/variable_pool
if single_iteration_run:
graph, variable_pool = self._get_graph_and_variable_pool_of_single_iteration(
workflow=workflow,
node_id=single_iteration_run.node_id,
user_inputs=dict(single_iteration_run.inputs),
graph_runtime_state=graph_runtime_state,
)
elif single_loop_run:
graph, variable_pool = self._get_graph_and_variable_pool_of_single_loop(
workflow=workflow,
node_id=single_loop_run.node_id,
user_inputs=dict(single_loop_run.inputs),
graph_runtime_state=graph_runtime_state,
)
else:
raise ValueError("Neither single_iteration_run nor single_loop_run is specified")
# Return the graph, variable_pool, and the same graph_runtime_state used during graph creation
# This ensures all nodes in the graph reference the same GraphRuntimeState instance
return graph, variable_pool, graph_runtime_state
def _get_graph_and_variable_pool_for_single_node_run(
self,
workflow: Workflow,
node_id: str,
user_inputs: dict,
user_inputs: dict[str, Any],
graph_runtime_state: GraphRuntimeState,
node_type_filter_key: str, # 'iteration_id' or 'loop_id'
node_type_label: str = "node", # 'iteration' or 'loop' for error messages
) -> tuple[Graph, VariablePool]:
"""
Get variable pool of single iteration
Get graph and variable pool for single node execution (iteration or loop).
Args:
workflow: The workflow instance
node_id: The node ID to execute
user_inputs: User inputs for the node
graph_runtime_state: The graph runtime state
node_type_filter_key: The key to filter nodes ('iteration_id' or 'loop_id')
node_type_label: Label for error messages ('iteration' or 'loop')
Returns:
A tuple containing (graph, variable_pool)
"""
# fetch workflow graph
graph_config = workflow.graph_dict
@@ -145,18 +212,22 @@ class WorkflowBasedAppRunner:
if not isinstance(graph_config.get("edges"), list):
raise ValueError("edges in workflow graph must be a list")
# filter nodes only in iteration
# filter nodes only in the specified node type (iteration or loop)
main_node_config = next((n for n in graph_config.get("nodes", []) if n.get("id") == node_id), None)
start_node_id = main_node_config.get("data", {}).get("start_node_id") if main_node_config else None
node_configs = [
node
for node in graph_config.get("nodes", [])
if node.get("id") == node_id or node.get("data", {}).get("iteration_id", "") == node_id
if node.get("id") == node_id
or node.get("data", {}).get(node_type_filter_key, "") == node_id
or (start_node_id and node.get("id") == start_node_id)
]
graph_config["nodes"] = node_configs
node_ids = [node.get("id") for node in node_configs]
# filter edges only in iteration
# filter edges only in the specified node type
edge_configs = [
edge
for edge in graph_config.get("edges", [])
@@ -190,30 +261,26 @@ class WorkflowBasedAppRunner:
raise ValueError("graph not found in workflow")
# fetch node config from node id
iteration_node_config = None
target_node_config = None
for node in node_configs:
if node.get("id") == node_id:
iteration_node_config = node
target_node_config = node
break
if not iteration_node_config:
raise ValueError("iteration node id not found in workflow graph")
if not target_node_config:
raise ValueError(f"{node_type_label} node id not found in workflow graph")
# Get node class
node_type = NodeType(iteration_node_config.get("data", {}).get("type"))
node_version = iteration_node_config.get("data", {}).get("version", "1")
node_type = NodeType(target_node_config.get("data", {}).get("type"))
node_version = target_node_config.get("data", {}).get("version", "1")
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version]
# init variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={},
environment_variables=workflow.environment_variables,
)
# Use the variable pool from graph_runtime_state instead of creating a new one
variable_pool = graph_runtime_state.variable_pool
try:
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(
graph_config=workflow.graph_dict, config=iteration_node_config
graph_config=workflow.graph_dict, config=target_node_config
)
except NotImplementedError:
variable_mapping = {}
@@ -234,120 +301,44 @@ class WorkflowBasedAppRunner:
return graph, variable_pool
def _get_graph_and_variable_pool_of_single_iteration(
self,
workflow: Workflow,
node_id: str,
user_inputs: dict[str, Any],
graph_runtime_state: GraphRuntimeState,
) -> tuple[Graph, VariablePool]:
"""
Get variable pool of single iteration
"""
return self._get_graph_and_variable_pool_for_single_node_run(
workflow=workflow,
node_id=node_id,
user_inputs=user_inputs,
graph_runtime_state=graph_runtime_state,
node_type_filter_key="iteration_id",
node_type_label="iteration",
)
def _get_graph_and_variable_pool_of_single_loop(
self,
workflow: Workflow,
node_id: str,
user_inputs: dict,
user_inputs: dict[str, Any],
graph_runtime_state: GraphRuntimeState,
) -> tuple[Graph, VariablePool]:
"""
Get variable pool of single loop
"""
# fetch workflow graph
graph_config = workflow.graph_dict
if not graph_config:
raise ValueError("workflow graph not found")
graph_config = cast(dict[str, Any], graph_config)
if "nodes" not in graph_config or "edges" not in graph_config:
raise ValueError("nodes or edges not found in workflow graph")
if not isinstance(graph_config.get("nodes"), list):
raise ValueError("nodes in workflow graph must be a list")
if not isinstance(graph_config.get("edges"), list):
raise ValueError("edges in workflow graph must be a list")
# filter nodes only in loop
node_configs = [
node
for node in graph_config.get("nodes", [])
if node.get("id") == node_id or node.get("data", {}).get("loop_id", "") == node_id
]
graph_config["nodes"] = node_configs
node_ids = [node.get("id") for node in node_configs]
# filter edges only in loop
edge_configs = [
edge
for edge in graph_config.get("edges", [])
if (edge.get("source") is None or edge.get("source") in node_ids)
and (edge.get("target") is None or edge.get("target") in node_ids)
]
graph_config["edges"] = edge_configs
# Create required parameters for Graph.init
graph_init_params = GraphInitParams(
tenant_id=workflow.tenant_id,
app_id=self._app_id,
workflow_id=workflow.id,
graph_config=graph_config,
user_id="",
user_from=UserFrom.ACCOUNT.value,
invoke_from=InvokeFrom.SERVICE_API.value,
call_depth=0,
)
node_factory = DifyNodeFactory(
graph_init_params=graph_init_params,
return self._get_graph_and_variable_pool_for_single_node_run(
workflow=workflow,
node_id=node_id,
user_inputs=user_inputs,
graph_runtime_state=graph_runtime_state,
node_type_filter_key="loop_id",
node_type_label="loop",
)
# init graph
graph = Graph.init(graph_config=graph_config, node_factory=node_factory, root_node_id=node_id)
if not graph:
raise ValueError("graph not found in workflow")
# fetch node config from node id
loop_node_config = None
for node in node_configs:
if node.get("id") == node_id:
loop_node_config = node
break
if not loop_node_config:
raise ValueError("loop node id not found in workflow graph")
# Get node class
node_type = NodeType(loop_node_config.get("data", {}).get("type"))
node_version = loop_node_config.get("data", {}).get("version", "1")
node_cls = NODE_TYPE_CLASSES_MAPPING[node_type][node_version]
# init variable pool
variable_pool = VariablePool(
system_variables=SystemVariable.empty(),
user_inputs={},
environment_variables=workflow.environment_variables,
)
try:
variable_mapping = node_cls.extract_variable_selector_to_variable_mapping(
graph_config=workflow.graph_dict, config=loop_node_config
)
except NotImplementedError:
variable_mapping = {}
load_into_variable_pool(
self._variable_loader,
variable_pool=variable_pool,
variable_mapping=variable_mapping,
user_inputs=user_inputs,
)
WorkflowEntry.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping,
user_inputs=user_inputs,
variable_pool=variable_pool,
tenant_id=workflow.tenant_id,
)
return graph, variable_pool
def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent):
"""
Handle event

View File

@@ -1,388 +0,0 @@
import re
import uuid
from json import dumps as json_dumps
from json import loads as json_loads
from json.decoder import JSONDecodeError
from flask import request
from requests import get
from yaml import YAMLError, safe_load # type: ignore
from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_bundle import ApiToolBundle
from core.tools.entities.tool_entities import ApiProviderSchemaType, ToolParameter
from core.tools.errors import ToolApiSchemaError, ToolNotSupportedError, ToolProviderNotFoundError
class ApiBasedToolSchemaParser:
@staticmethod
def parse_openapi_to_tool_bundle(
openapi: dict, extra_info: dict | None = None, warning: dict | None = None
) -> list[ApiToolBundle]:
warning = warning if warning is not None else {}
extra_info = extra_info if extra_info is not None else {}
# set description to extra_info
extra_info["description"] = openapi["info"].get("description", "")
if len(openapi["servers"]) == 0:
raise ToolProviderNotFoundError("No server found in the openapi yaml.")
server_url = openapi["servers"][0]["url"]
request_env = request.headers.get("X-Request-Env")
if request_env:
matched_servers = [server["url"] for server in openapi["servers"] if server["env"] == request_env]
server_url = matched_servers[0] if matched_servers else server_url
# list all interfaces
interfaces = []
for path, path_item in openapi["paths"].items():
methods = ["get", "post", "put", "delete", "patch", "head", "options", "trace"]
for method in methods:
if method in path_item:
interfaces.append(
{
"path": path,
"method": method,
"operation": path_item[method],
}
)
# get all parameters
bundles = []
for interface in interfaces:
# convert parameters
parameters = []
if "parameters" in interface["operation"]:
for parameter in interface["operation"]["parameters"]:
tool_parameter = ToolParameter(
name=parameter["name"],
label=I18nObject(en_US=parameter["name"], zh_Hans=parameter["name"]),
human_description=I18nObject(
en_US=parameter.get("description", ""), zh_Hans=parameter.get("description", "")
),
type=ToolParameter.ToolParameterType.STRING,
required=parameter.get("required", False),
form=ToolParameter.ToolParameterForm.LLM,
llm_description=parameter.get("description"),
default=parameter["schema"]["default"]
if "schema" in parameter and "default" in parameter["schema"]
else None,
placeholder=I18nObject(
en_US=parameter.get("description", ""), zh_Hans=parameter.get("description", "")
),
)
# check if there is a type
typ = ApiBasedToolSchemaParser._get_tool_parameter_type(parameter)
if typ:
tool_parameter.type = typ
parameters.append(tool_parameter)
# create tool bundle
# check if there is a request body
if "requestBody" in interface["operation"]:
request_body = interface["operation"]["requestBody"]
if "content" in request_body:
for content_type, content in request_body["content"].items():
# if there is a reference, get the reference and overwrite the content
if "schema" not in content:
continue
if "$ref" in content["schema"]:
# get the reference
root = openapi
reference = content["schema"]["$ref"].split("/")[1:]
for ref in reference:
root = root[ref]
# overwrite the content
interface["operation"]["requestBody"]["content"][content_type]["schema"] = root
# parse body parameters
if "schema" in interface["operation"]["requestBody"]["content"][content_type]: # pyright: ignore[reportIndexIssue, reportPossiblyUnboundVariable]
body_schema = interface["operation"]["requestBody"]["content"][content_type]["schema"] # pyright: ignore[reportIndexIssue, reportPossiblyUnboundVariable]
required = body_schema.get("required", [])
properties = body_schema.get("properties", {})
for name, property in properties.items():
tool = ToolParameter(
name=name,
label=I18nObject(en_US=name, zh_Hans=name),
human_description=I18nObject(
en_US=property.get("description", ""), zh_Hans=property.get("description", "")
),
type=ToolParameter.ToolParameterType.STRING,
required=name in required,
form=ToolParameter.ToolParameterForm.LLM,
llm_description=property.get("description", ""),
default=property.get("default", None),
placeholder=I18nObject(
en_US=property.get("description", ""), zh_Hans=property.get("description", "")
),
)
# check if there is a type
typ = ApiBasedToolSchemaParser._get_tool_parameter_type(property)
if typ:
tool.type = typ
parameters.append(tool)
# check if parameters is duplicated
parameters_count = {}
for parameter in parameters:
if parameter.name not in parameters_count:
parameters_count[parameter.name] = 0
parameters_count[parameter.name] += 1
for name, count in parameters_count.items():
if count > 1:
warning["duplicated_parameter"] = f"Parameter {name} is duplicated."
# check if there is a operation id, use $path_$method as operation id if not
if "operationId" not in interface["operation"]:
# remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$
path = interface["path"]
if interface["path"].startswith("/"):
path = interface["path"][1:]
# remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$
path = re.sub(r"[^a-zA-Z0-9_-]", "", path)
if not path:
path = str(uuid.uuid4())
interface["operation"]["operationId"] = f"{path}_{interface['method']}"
bundles.append(
ApiToolBundle(
server_url=server_url + interface["path"],
method=interface["method"],
summary=interface["operation"]["description"]
if "description" in interface["operation"]
else interface["operation"].get("summary", None),
operation_id=interface["operation"]["operationId"],
parameters=parameters,
author="",
icon=None,
openapi=interface["operation"],
)
)
return bundles
@staticmethod
def _get_tool_parameter_type(parameter: dict) -> ToolParameter.ToolParameterType | None:
parameter = parameter or {}
typ: str | None = None
if parameter.get("format") == "binary":
return ToolParameter.ToolParameterType.FILE
if "type" in parameter:
typ = parameter["type"]
elif "schema" in parameter and "type" in parameter["schema"]:
typ = parameter["schema"]["type"]
if typ in {"integer", "number"}:
return ToolParameter.ToolParameterType.NUMBER
elif typ == "boolean":
return ToolParameter.ToolParameterType.BOOLEAN
elif typ == "string":
return ToolParameter.ToolParameterType.STRING
elif typ == "array":
items = parameter.get("items") or parameter.get("schema", {}).get("items")
return ToolParameter.ToolParameterType.FILES if items and items.get("format") == "binary" else None
else:
return None
@staticmethod
def parse_openapi_yaml_to_tool_bundle(
yaml: str, extra_info: dict | None = None, warning: dict | None = None
) -> list[ApiToolBundle]:
"""
parse openapi yaml to tool bundle
:param yaml: the yaml string
:param extra_info: the extra info
:param warning: the warning message
:return: the tool bundle
"""
warning = warning if warning is not None else {}
extra_info = extra_info if extra_info is not None else {}
openapi: dict = safe_load(yaml)
if openapi is None:
raise ToolApiSchemaError("Invalid openapi yaml.")
return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(openapi, extra_info=extra_info, warning=warning)
@staticmethod
def parse_swagger_to_openapi(swagger: dict, extra_info: dict | None = None, warning: dict | None = None) -> dict:
warning = warning or {}
"""
parse swagger to openapi
:param swagger: the swagger dict
:return: the openapi dict
"""
# convert swagger to openapi
info = swagger.get("info", {"title": "Swagger", "description": "Swagger", "version": "1.0.0"})
servers = swagger.get("servers", [])
if len(servers) == 0:
raise ToolApiSchemaError("No server found in the swagger yaml.")
openapi = {
"openapi": "3.0.0",
"info": {
"title": info.get("title", "Swagger"),
"description": info.get("description", "Swagger"),
"version": info.get("version", "1.0.0"),
},
"servers": swagger["servers"],
"paths": {},
"components": {"schemas": {}},
}
# check paths
if "paths" not in swagger or len(swagger["paths"]) == 0:
raise ToolApiSchemaError("No paths found in the swagger yaml.")
# convert paths
for path, path_item in swagger["paths"].items():
openapi["paths"][path] = {} # pyright: ignore[reportIndexIssue]
for method, operation in path_item.items():
if "operationId" not in operation:
raise ToolApiSchemaError(f"No operationId found in operation {method} {path}.")
if ("summary" not in operation or len(operation["summary"]) == 0) and (
"description" not in operation or len(operation["description"]) == 0
):
if warning is not None:
warning["missing_summary"] = f"No summary or description found in operation {method} {path}."
openapi["paths"][path][method] = { # pyright: ignore[reportIndexIssue]
"operationId": operation["operationId"],
"summary": operation.get("summary", ""),
"description": operation.get("description", ""),
"parameters": operation.get("parameters", []),
"responses": operation.get("responses", {}),
}
if "requestBody" in operation:
openapi["paths"][path][method]["requestBody"] = operation["requestBody"] # pyright: ignore[reportIndexIssue]
# convert definitions
for name, definition in swagger["definitions"].items():
openapi["components"]["schemas"][name] = definition # pyright: ignore[reportIndexIssue, reportArgumentType]
return openapi
@staticmethod
def parse_openai_plugin_json_to_tool_bundle(
json: str, extra_info: dict | None = None, warning: dict | None = None
) -> list[ApiToolBundle]:
"""
parse openapi plugin yaml to tool bundle
:param json: the json string
:param extra_info: the extra info
:param warning: the warning message
:return: the tool bundle
"""
warning = warning if warning is not None else {}
extra_info = extra_info if extra_info is not None else {}
try:
openai_plugin = json_loads(json)
api = openai_plugin["api"]
api_url = api["url"]
api_type = api["type"]
except JSONDecodeError:
raise ToolProviderNotFoundError("Invalid openai plugin json.")
if api_type != "openapi":
raise ToolNotSupportedError("Only openapi is supported now.")
# get openapi yaml
response = get(api_url, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "}, timeout=5)
if response.status_code != 200:
raise ToolProviderNotFoundError("cannot get openapi yaml from url.")
return ApiBasedToolSchemaParser.parse_openapi_yaml_to_tool_bundle(
response.text, extra_info=extra_info, warning=warning
)
@staticmethod
def auto_parse_to_tool_bundle(
content: str, extra_info: dict | None = None, warning: dict | None = None
) -> tuple[list[ApiToolBundle], str]:
"""
auto parse to tool bundle
:param content: the content
:param extra_info: the extra info
:param warning: the warning message
:return: tools bundle, schema_type
"""
warning = warning if warning is not None else {}
extra_info = extra_info if extra_info is not None else {}
content = content.strip()
loaded_content = None
json_error = None
yaml_error = None
try:
loaded_content = json_loads(content)
except JSONDecodeError as e:
json_error = e
if loaded_content is None:
try:
loaded_content = safe_load(content)
except YAMLError as e:
yaml_error = e
if loaded_content is None:
raise ToolApiSchemaError(
f"Invalid api schema, schema is neither json nor yaml. json error: {str(json_error)},"
f" yaml error: {str(yaml_error)}"
)
swagger_error = None
openapi_error = None
openapi_plugin_error = None
schema_type = None
try:
openapi = ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(
loaded_content, extra_info=extra_info, warning=warning
)
schema_type = ApiProviderSchemaType.OPENAPI.value
return openapi, schema_type
except ToolApiSchemaError as e:
openapi_error = e
# openai parse error, fallback to swagger
try:
converted_swagger = ApiBasedToolSchemaParser.parse_swagger_to_openapi(
loaded_content, extra_info=extra_info, warning=warning
)
schema_type = ApiProviderSchemaType.SWAGGER.value
return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(
converted_swagger, extra_info=extra_info, warning=warning
), schema_type
except ToolApiSchemaError as e:
swagger_error = e
# swagger parse error, fallback to openai plugin
try:
openapi_plugin = ApiBasedToolSchemaParser.parse_openai_plugin_json_to_tool_bundle(
json_dumps(loaded_content), extra_info=extra_info, warning=warning
)
return openapi_plugin, ApiProviderSchemaType.OPENAI_PLUGIN.value
except ToolNotSupportedError as e:
# maybe it's not plugin at all
openapi_plugin_error = e
raise ToolApiSchemaError(
f"Invalid api schema, openapi error: {str(openapi_error)}, swagger error: {str(swagger_error)},"
f" openapi plugin error: {str(openapi_plugin_error)}"
)

View File

@@ -1,17 +0,0 @@
import re
def remove_leading_symbols(text: str) -> str:
"""
Remove leading punctuation or symbols from the given text.
Args:
text (str): The input text to process.
Returns:
str: The text with leading punctuation or symbols removed.
"""
# Match Unicode ranges for punctuation and symbols
# FIXME this pattern is confused quick fix for #11868 maybe refactor it later
pattern = r"^[\u2000-\u206F\u2E00-\u2E7F\u3000-\u303F!\"#$%&'()*+,./:;<=>?@^_`~]+"
return re.sub(pattern, "", text)

View File

@@ -1,9 +0,0 @@
import uuid
def is_valid_uuid(uuid_str: str) -> bool:
try:
uuid.UUID(uuid_str)
return True
except Exception:
return False

View File

@@ -1,43 +0,0 @@
from collections.abc import Mapping, Sequence
from typing import Any
from core.app.app_config.entities import VariableEntity
from core.tools.entities.tool_entities import WorkflowToolParameterConfiguration
class WorkflowToolConfigurationUtils:
@classmethod
def check_parameter_configurations(cls, configurations: list[Mapping[str, Any]]):
for configuration in configurations:
WorkflowToolParameterConfiguration.model_validate(configuration)
@classmethod
def get_workflow_graph_variables(cls, graph: Mapping[str, Any]) -> Sequence[VariableEntity]:
"""
get workflow graph variables
"""
nodes = graph.get("nodes", [])
start_node = next(filter(lambda x: x.get("data", {}).get("type") == "start", nodes), None)
if not start_node:
return []
return [VariableEntity.model_validate(variable) for variable in start_node.get("data", {}).get("variables", [])]
@classmethod
def check_is_synced(
cls, variables: list[VariableEntity], tool_configurations: list[WorkflowToolParameterConfiguration]
):
"""
check is synced
raise ValueError if not synced
"""
variable_names = [variable.variable for variable in variables]
if len(tool_configurations) != len(variables):
raise ValueError("parameter configuration mismatch, please republish the tool to update")
for parameter in tool_configurations:
if parameter.name not in variable_names:
raise ValueError("parameter configuration mismatch, please republish the tool to update")

View File

@@ -1,35 +0,0 @@
import logging
from pathlib import Path
from typing import Any
import yaml # type: ignore
from yaml import YAMLError
logger = logging.getLogger(__name__)
def load_yaml_file(file_path: str, ignore_error: bool = True, default_value: Any = {}) -> Any:
"""
Safe loading a YAML file
:param file_path: the path of the YAML file
:param ignore_error:
if True, return default_value if error occurs and the error will be logged in debug level
if False, raise error if error occurs
:param default_value: the value returned when errors ignored
:return: an object of the YAML content
"""
if not file_path or not Path(file_path).exists():
if ignore_error:
return default_value
else:
raise FileNotFoundError(f"File not found: {file_path}")
with open(file_path, encoding="utf-8") as yaml_file:
try:
yaml_content = yaml.safe_load(yaml_file)
return yaml_content or default_value
except Exception as e:
if ignore_error:
return default_value
else:
raise YAMLError(f"Failed to load YAML file {file_path}: {e}") from e

View File

@@ -205,16 +205,10 @@ class ProviderConfiguration(BaseModel):
"""
Get custom provider record.
"""
# get provider
model_provider_id = ModelProviderID(self.provider.provider)
provider_names = [self.provider.provider]
if model_provider_id.is_langgenius():
provider_names.append(model_provider_id.provider_name)
stmt = select(Provider).where(
Provider.tenant_id == self.tenant_id,
Provider.provider_type == ProviderType.CUSTOM.value,
Provider.provider_name.in_(provider_names),
Provider.provider_name.in_(self._get_provider_names()),
)
return session.execute(stmt).scalar_one_or_none()
@@ -276,7 +270,7 @@ class ProviderConfiguration(BaseModel):
"""
stmt = select(ProviderCredential.id).where(
ProviderCredential.tenant_id == self.tenant_id,
ProviderCredential.provider_name == self.provider.provider,
ProviderCredential.provider_name.in_(self._get_provider_names()),
ProviderCredential.credential_name == credential_name,
)
if exclude_id:
@@ -324,7 +318,7 @@ class ProviderConfiguration(BaseModel):
try:
stmt = select(ProviderCredential).where(
ProviderCredential.tenant_id == self.tenant_id,
ProviderCredential.provider_name == self.provider.provider,
ProviderCredential.provider_name.in_(self._get_provider_names()),
ProviderCredential.id == credential_id,
)
credential_record = s.execute(stmt).scalar_one_or_none()
@@ -374,7 +368,7 @@ class ProviderConfiguration(BaseModel):
session=session,
query_factory=lambda: select(ProviderCredential).where(
ProviderCredential.tenant_id == self.tenant_id,
ProviderCredential.provider_name == self.provider.provider,
ProviderCredential.provider_name.in_(self._get_provider_names()),
),
)
@@ -387,7 +381,7 @@ class ProviderConfiguration(BaseModel):
session=session,
query_factory=lambda: select(ProviderModelCredential).where(
ProviderModelCredential.tenant_id == self.tenant_id,
ProviderModelCredential.provider_name == self.provider.provider,
ProviderModelCredential.provider_name.in_(self._get_provider_names()),
ProviderModelCredential.model_name == model,
ProviderModelCredential.model_type == model_type.to_origin_model_type(),
),
@@ -423,6 +417,16 @@ class ProviderConfiguration(BaseModel):
logger.warning("Error generating next credential name: %s", str(e))
return "API KEY 1"
def _get_provider_names(self):
"""
The provider name might be stored in the database as either `openai` or `langgenius/openai/openai`.
"""
model_provider_id = ModelProviderID(self.provider.provider)
provider_names = [self.provider.provider]
if model_provider_id.is_langgenius():
provider_names.append(model_provider_id.provider_name)
return provider_names
def create_provider_credential(self, credentials: dict, credential_name: str | None):
"""
Add custom provider credentials.
@@ -501,7 +505,7 @@ class ProviderConfiguration(BaseModel):
stmt = select(ProviderCredential).where(
ProviderCredential.id == credential_id,
ProviderCredential.tenant_id == self.tenant_id,
ProviderCredential.provider_name == self.provider.provider,
ProviderCredential.provider_name.in_(self._get_provider_names()),
)
# Get the credential record to update
@@ -554,7 +558,7 @@ class ProviderConfiguration(BaseModel):
# Find all load balancing configs that use this credential_id
stmt = select(LoadBalancingModelConfig).where(
LoadBalancingModelConfig.tenant_id == self.tenant_id,
LoadBalancingModelConfig.provider_name == self.provider.provider,
LoadBalancingModelConfig.provider_name.in_(self._get_provider_names()),
LoadBalancingModelConfig.credential_id == credential_id,
LoadBalancingModelConfig.credential_source_type == credential_source,
)
@@ -591,7 +595,7 @@ class ProviderConfiguration(BaseModel):
stmt = select(ProviderCredential).where(
ProviderCredential.id == credential_id,
ProviderCredential.tenant_id == self.tenant_id,
ProviderCredential.provider_name == self.provider.provider,
ProviderCredential.provider_name.in_(self._get_provider_names()),
)
# Get the credential record to update
@@ -602,7 +606,7 @@ class ProviderConfiguration(BaseModel):
# Check if this credential is used in load balancing configs
lb_stmt = select(LoadBalancingModelConfig).where(
LoadBalancingModelConfig.tenant_id == self.tenant_id,
LoadBalancingModelConfig.provider_name == self.provider.provider,
LoadBalancingModelConfig.provider_name.in_(self._get_provider_names()),
LoadBalancingModelConfig.credential_id == credential_id,
LoadBalancingModelConfig.credential_source_type == "provider",
)
@@ -624,7 +628,7 @@ class ProviderConfiguration(BaseModel):
# if this is the last credential, we need to delete the provider record
count_stmt = select(func.count(ProviderCredential.id)).where(
ProviderCredential.tenant_id == self.tenant_id,
ProviderCredential.provider_name == self.provider.provider,
ProviderCredential.provider_name.in_(self._get_provider_names()),
)
available_credentials_count = session.execute(count_stmt).scalar() or 0
session.delete(credential_record)
@@ -668,7 +672,7 @@ class ProviderConfiguration(BaseModel):
stmt = select(ProviderCredential).where(
ProviderCredential.id == credential_id,
ProviderCredential.tenant_id == self.tenant_id,
ProviderCredential.provider_name == self.provider.provider,
ProviderCredential.provider_name.in_(self._get_provider_names()),
)
credential_record = session.execute(stmt).scalar_one_or_none()
if not credential_record:
@@ -737,7 +741,7 @@ class ProviderConfiguration(BaseModel):
stmt = select(ProviderModelCredential).where(
ProviderModelCredential.id == credential_id,
ProviderModelCredential.tenant_id == self.tenant_id,
ProviderModelCredential.provider_name == self.provider.provider,
ProviderModelCredential.provider_name.in_(self._get_provider_names()),
ProviderModelCredential.model_name == model,
ProviderModelCredential.model_type == model_type.to_origin_model_type(),
)
@@ -784,7 +788,7 @@ class ProviderConfiguration(BaseModel):
"""
stmt = select(ProviderModelCredential).where(
ProviderModelCredential.tenant_id == self.tenant_id,
ProviderModelCredential.provider_name == self.provider.provider,
ProviderModelCredential.provider_name.in_(self._get_provider_names()),
ProviderModelCredential.model_name == model,
ProviderModelCredential.model_type == model_type.to_origin_model_type(),
ProviderModelCredential.credential_name == credential_name,
@@ -860,7 +864,7 @@ class ProviderConfiguration(BaseModel):
stmt = select(ProviderModelCredential).where(
ProviderModelCredential.id == credential_id,
ProviderModelCredential.tenant_id == self.tenant_id,
ProviderModelCredential.provider_name == self.provider.provider,
ProviderModelCredential.provider_name.in_(self._get_provider_names()),
ProviderModelCredential.model_name == model,
ProviderModelCredential.model_type == model_type.to_origin_model_type(),
)
@@ -997,7 +1001,7 @@ class ProviderConfiguration(BaseModel):
stmt = select(ProviderModelCredential).where(
ProviderModelCredential.id == credential_id,
ProviderModelCredential.tenant_id == self.tenant_id,
ProviderModelCredential.provider_name == self.provider.provider,
ProviderModelCredential.provider_name.in_(self._get_provider_names()),
ProviderModelCredential.model_name == model,
ProviderModelCredential.model_type == model_type.to_origin_model_type(),
)
@@ -1042,7 +1046,7 @@ class ProviderConfiguration(BaseModel):
stmt = select(ProviderModelCredential).where(
ProviderModelCredential.id == credential_id,
ProviderModelCredential.tenant_id == self.tenant_id,
ProviderModelCredential.provider_name == self.provider.provider,
ProviderModelCredential.provider_name.in_(self._get_provider_names()),
ProviderModelCredential.model_name == model,
ProviderModelCredential.model_type == model_type.to_origin_model_type(),
)
@@ -1052,7 +1056,7 @@ class ProviderConfiguration(BaseModel):
lb_stmt = select(LoadBalancingModelConfig).where(
LoadBalancingModelConfig.tenant_id == self.tenant_id,
LoadBalancingModelConfig.provider_name == self.provider.provider,
LoadBalancingModelConfig.provider_name.in_(self._get_provider_names()),
LoadBalancingModelConfig.credential_id == credential_id,
LoadBalancingModelConfig.credential_source_type == "custom_model",
)
@@ -1075,7 +1079,7 @@ class ProviderConfiguration(BaseModel):
# if this is the last credential, we need to delete the custom model record
count_stmt = select(func.count(ProviderModelCredential.id)).where(
ProviderModelCredential.tenant_id == self.tenant_id,
ProviderModelCredential.provider_name == self.provider.provider,
ProviderModelCredential.provider_name.in_(self._get_provider_names()),
ProviderModelCredential.model_name == model,
ProviderModelCredential.model_type == model_type.to_origin_model_type(),
)
@@ -1115,7 +1119,7 @@ class ProviderConfiguration(BaseModel):
stmt = select(ProviderModelCredential).where(
ProviderModelCredential.id == credential_id,
ProviderModelCredential.tenant_id == self.tenant_id,
ProviderModelCredential.provider_name == self.provider.provider,
ProviderModelCredential.provider_name.in_(self._get_provider_names()),
ProviderModelCredential.model_name == model,
ProviderModelCredential.model_type == model_type.to_origin_model_type(),
)
@@ -1157,7 +1161,7 @@ class ProviderConfiguration(BaseModel):
stmt = select(ProviderModelCredential).where(
ProviderModelCredential.id == credential_id,
ProviderModelCredential.tenant_id == self.tenant_id,
ProviderModelCredential.provider_name == self.provider.provider,
ProviderModelCredential.provider_name.in_(self._get_provider_names()),
ProviderModelCredential.model_name == model,
ProviderModelCredential.model_type == model_type.to_origin_model_type(),
)
@@ -1204,15 +1208,9 @@ class ProviderConfiguration(BaseModel):
"""
Get provider model setting.
"""
model_provider_id = ModelProviderID(self.provider.provider)
provider_names = [self.provider.provider]
if model_provider_id.is_langgenius():
provider_names.append(model_provider_id.provider_name)
stmt = select(ProviderModelSetting).where(
ProviderModelSetting.tenant_id == self.tenant_id,
ProviderModelSetting.provider_name.in_(provider_names),
ProviderModelSetting.provider_name.in_(self._get_provider_names()),
ProviderModelSetting.model_type == model_type.to_origin_model_type(),
ProviderModelSetting.model_name == model,
)
@@ -1384,15 +1382,9 @@ class ProviderConfiguration(BaseModel):
return
def _switch(s: Session):
# get preferred provider
model_provider_id = ModelProviderID(self.provider.provider)
provider_names = [self.provider.provider]
if model_provider_id.is_langgenius():
provider_names.append(model_provider_id.provider_name)
stmt = select(TenantPreferredModelProvider).where(
TenantPreferredModelProvider.tenant_id == self.tenant_id,
TenantPreferredModelProvider.provider_name.in_(provider_names),
TenantPreferredModelProvider.provider_name.in_(self._get_provider_names()),
)
preferred_model_provider = s.execute(stmt).scalars().first()

View File

@@ -8,7 +8,7 @@ from collections import deque
from collections.abc import Sequence
from datetime import datetime
import requests
import httpx
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
@@ -65,13 +65,13 @@ class TraceClient:
def api_check(self):
try:
response = requests.head(self.endpoint, timeout=5)
response = httpx.head(self.endpoint, timeout=5)
if response.status_code == 405:
return True
else:
logger.debug("AliyunTrace API check failed: Unexpected status code: %s", response.status_code)
return False
except requests.RequestException as e:
except httpx.RequestError as e:
logger.debug("AliyunTrace API check failed: %s", str(e))
raise ValueError(f"AliyunTrace API check failed: {str(e)}")

View File

@@ -417,7 +417,7 @@ class WeaveDataTrace(BaseTraceInstance):
if not login_status:
raise ValueError("Weave login failed")
else:
print("Weave login successful")
logger.info("Weave login successful")
return True
except Exception as e:
logger.debug("Weave API check failed: %s", str(e))

View File

@@ -178,6 +178,7 @@ class PluginDependency(BaseModel):
class Marketplace(BaseModel):
marketplace_plugin_unique_identifier: str
version: str | None = None
@property
def plugin_unique_identifier(self) -> str:
@@ -185,6 +186,7 @@ class PluginDependency(BaseModel):
class Package(BaseModel):
plugin_unique_identifier: str
version: str | None = None
type: Type
value: Github | Marketplace | Package

View File

@@ -513,6 +513,21 @@ class ProviderManager:
return provider_name_to_provider_load_balancing_model_configs_dict
@staticmethod
def _get_provider_names(provider_name: str) -> list[str]:
"""
provider_name: `openai` or `langgenius/openai/openai`
return: [`openai`, `langgenius/openai/openai`]
"""
provider_names = [provider_name]
model_provider_id = ModelProviderID(provider_name)
if model_provider_id.is_langgenius():
if "/" in provider_name:
provider_names.append(model_provider_id.provider_name)
else:
provider_names.append(str(model_provider_id))
return provider_names
@staticmethod
def get_provider_available_credentials(tenant_id: str, provider_name: str) -> list[CredentialConfiguration]:
"""
@@ -525,7 +540,10 @@ class ProviderManager:
with Session(db.engine, expire_on_commit=False) as session:
stmt = (
select(ProviderCredential)
.where(ProviderCredential.tenant_id == tenant_id, ProviderCredential.provider_name == provider_name)
.where(
ProviderCredential.tenant_id == tenant_id,
ProviderCredential.provider_name.in_(ProviderManager._get_provider_names(provider_name)),
)
.order_by(ProviderCredential.created_at.desc())
)
@@ -554,7 +572,7 @@ class ProviderManager:
select(ProviderModelCredential)
.where(
ProviderModelCredential.tenant_id == tenant_id,
ProviderModelCredential.provider_name == provider_name,
ProviderModelCredential.provider_name.in_(ProviderManager._get_provider_names(provider_name)),
ProviderModelCredential.model_name == model_name,
ProviderModelCredential.model_type == model_type,
)

View File

@@ -1,4 +1,5 @@
import json
import logging
import time
import uuid
from typing import Any
@@ -9,11 +10,24 @@ from pymochow import MochowClient # type: ignore
from pymochow.auth.bce_credentials import BceCredentials # type: ignore
from pymochow.configuration import Configuration # type: ignore
from pymochow.exception import ServerError # type: ignore
from pymochow.model.database import Database
from pymochow.model.enum import FieldType, IndexState, IndexType, MetricType, ServerErrCode, TableState # type: ignore
from pymochow.model.schema import Field, HNSWParams, Schema, VectorIndex # type: ignore
from pymochow.model.table import AnnSearch, HNSWSearchParams, Partition, Row # type: ignore
from pymochow.model.schema import (
Field,
FilteringIndex,
HNSWParams,
InvertedIndex,
InvertedIndexAnalyzer,
InvertedIndexFieldAttribute,
InvertedIndexParams,
InvertedIndexParseMode,
Schema,
VectorIndex,
) # type: ignore
from pymochow.model.table import AnnSearch, BM25SearchRequest, HNSWSearchParams, Partition, Row # type: ignore
from configs import dify_config
from core.rag.datasource.vdb.field import Field as VDBField
from core.rag.datasource.vdb.vector_base import BaseVector
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
from core.rag.datasource.vdb.vector_type import VectorType
@@ -22,6 +36,8 @@ from core.rag.models.document import Document
from extensions.ext_redis import redis_client
from models.dataset import Dataset
logger = logging.getLogger(__name__)
class BaiduConfig(BaseModel):
endpoint: str
@@ -30,9 +46,11 @@ class BaiduConfig(BaseModel):
api_key: str
database: str
index_type: str = "HNSW"
metric_type: str = "L2"
metric_type: str = "IP"
shard: int = 1
replicas: int = 3
inverted_index_analyzer: str = "DEFAULT_ANALYZER"
inverted_index_parser_mode: str = "COARSE_MODE"
@model_validator(mode="before")
@classmethod
@@ -49,13 +67,9 @@ class BaiduConfig(BaseModel):
class BaiduVector(BaseVector):
field_id: str = "id"
field_vector: str = "vector"
field_text: str = "text"
field_metadata: str = "metadata"
field_app_id: str = "app_id"
field_annotation_id: str = "annotation_id"
index_vector: str = "vector_idx"
vector_index: str = "vector_idx"
filtering_index: str = "filtering_idx"
inverted_index: str = "content_inverted_idx"
def __init__(self, collection_name: str, config: BaiduConfig):
super().__init__(collection_name)
@@ -74,8 +88,6 @@ class BaiduVector(BaseVector):
self.add_texts(texts, embeddings)
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents if doc.metadata is not None]
total_count = len(documents)
batch_size = 1000
@@ -84,29 +96,31 @@ class BaiduVector(BaseVector):
for start in range(0, total_count, batch_size):
end = min(start + batch_size, total_count)
rows = []
assert len(metadatas) == total_count, "metadatas length should be equal to total_count"
for i in range(start, end, 1):
metadata = documents[i].metadata
row = Row(
id=metadatas[i].get("doc_id", str(uuid.uuid4())),
id=metadata.get("doc_id", str(uuid.uuid4())),
page_content=documents[i].page_content,
metadata=metadata,
vector=embeddings[i],
text=texts[i],
metadata=json.dumps(metadatas[i]),
app_id=metadatas[i].get("app_id", ""),
annotation_id=metadatas[i].get("annotation_id", ""),
)
rows.append(row)
table.upsert(rows=rows)
# rebuild vector index after upsert finished
table.rebuild_index(self.index_vector)
table.rebuild_index(self.vector_index)
timeout = 3600 # 1 hour timeout
start_time = time.time()
while True:
time.sleep(1)
index = table.describe_index(self.index_vector)
index = table.describe_index(self.vector_index)
if index.state == IndexState.NORMAL:
break
if time.time() - start_time > timeout:
raise TimeoutError(f"Index rebuild timeout after {timeout} seconds")
def text_exists(self, id: str) -> bool:
res = self._db.table(self._collection_name).query(primary_key={self.field_id: id})
res = self._db.table(self._collection_name).query(primary_key={VDBField.PRIMARY_KEY: id})
if res and res.code == 0:
return True
return False
@@ -115,53 +129,73 @@ class BaiduVector(BaseVector):
if not ids:
return
quoted_ids = [f"'{id}'" for id in ids]
self._db.table(self._collection_name).delete(filter=f"id IN({', '.join(quoted_ids)})")
self._db.table(self._collection_name).delete(filter=f"{VDBField.PRIMARY_KEY} IN({', '.join(quoted_ids)})")
def delete_by_metadata_field(self, key: str, value: str):
self._db.table(self._collection_name).delete(filter=f"{key} = '{value}'")
# Escape double quotes in value to prevent injection
escaped_value = value.replace('"', '\\"')
self._db.table(self._collection_name).delete(filter=f'metadata["{key}"] = "{escaped_value}"')
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
query_vector = [float(val) if isinstance(val, np.float64) else val for val in query_vector]
document_ids_filter = kwargs.get("document_ids_filter")
filter = ""
if document_ids_filter:
document_ids = ", ".join(f"'{id}'" for id in document_ids_filter)
anns = AnnSearch(
vector_field=self.field_vector,
vector_floats=query_vector,
params=HNSWSearchParams(ef=kwargs.get("ef", 10), limit=kwargs.get("top_k", 4)),
filter=f"document_id IN ({document_ids})",
)
else:
anns = AnnSearch(
vector_field=self.field_vector,
vector_floats=query_vector,
params=HNSWSearchParams(ef=kwargs.get("ef", 10), limit=kwargs.get("top_k", 4)),
)
filter = f'metadata["document_id"] IN({document_ids})'
anns = AnnSearch(
vector_field=VDBField.VECTOR,
vector_floats=query_vector,
params=HNSWSearchParams(ef=kwargs.get("ef", 20), limit=kwargs.get("top_k", 4)),
filter=filter,
)
res = self._db.table(self._collection_name).search(
anns=anns,
projections=[self.field_id, self.field_text, self.field_metadata],
retrieve_vector=True,
projections=[VDBField.CONTENT_KEY, VDBField.METADATA_KEY],
retrieve_vector=False,
)
score_threshold = float(kwargs.get("score_threshold") or 0.0)
return self._get_search_res(res, score_threshold)
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
# baidu vector database doesn't support bm25 search on current version
return []
# document ids filter
document_ids_filter = kwargs.get("document_ids_filter")
filter = ""
if document_ids_filter:
document_ids = ", ".join(f"'{id}'" for id in document_ids_filter)
filter = f'metadata["document_id"] IN({document_ids})'
request = BM25SearchRequest(
index_name=self.inverted_index, search_text=query, limit=kwargs.get("top_k", 4), filter=filter
)
res = self._db.table(self._collection_name).bm25_search(
request=request, projections=[VDBField.CONTENT_KEY, VDBField.METADATA_KEY]
)
score_threshold = float(kwargs.get("score_threshold") or 0.0)
return self._get_search_res(res, score_threshold)
def _get_search_res(self, res, score_threshold) -> list[Document]:
docs = []
for row in res.rows:
row_data = row.get("row", {})
meta = row_data.get(self.field_metadata)
if meta is not None:
meta = json.loads(meta)
score = row.get("score", 0.0)
meta = row_data.get(VDBField.METADATA_KEY, {})
# Handle both JSON string and dict formats for backward compatibility
if isinstance(meta, str):
try:
import json
meta = json.loads(meta)
except (json.JSONDecodeError, TypeError):
meta = {}
elif not isinstance(meta, dict):
meta = {}
if score >= score_threshold:
meta["score"] = score
doc = Document(page_content=row_data.get(self.field_text), metadata=meta)
doc = Document(page_content=row_data.get(VDBField.CONTENT_KEY), metadata=meta)
docs.append(doc)
return docs
def delete(self):
@@ -178,7 +212,7 @@ class BaiduVector(BaseVector):
client = MochowClient(config)
return client
def _init_database(self):
def _init_database(self) -> Database:
exists = False
for db in self._client.list_databases():
if db.database_name == self._client_config.database:
@@ -192,10 +226,10 @@ class BaiduVector(BaseVector):
self._client.create_database(database_name=self._client_config.database)
except ServerError as e:
if e.code == ServerErrCode.DB_ALREADY_EXIST:
pass
return self._client.database(self._client_config.database)
else:
raise
return
return self._client.database(self._client_config.database)
def _table_existed(self) -> bool:
tables = self._db.list_table()
@@ -232,7 +266,7 @@ class BaiduVector(BaseVector):
fields = []
fields.append(
Field(
self.field_id,
VDBField.PRIMARY_KEY,
FieldType.STRING,
primary_key=True,
partition_key=True,
@@ -240,24 +274,57 @@ class BaiduVector(BaseVector):
not_null=True,
)
)
fields.append(Field(self.field_metadata, FieldType.STRING, not_null=True))
fields.append(Field(self.field_app_id, FieldType.STRING))
fields.append(Field(self.field_annotation_id, FieldType.STRING))
fields.append(Field(self.field_text, FieldType.TEXT, not_null=True))
fields.append(Field(self.field_vector, FieldType.FLOAT_VECTOR, not_null=True, dimension=dimension))
fields.append(Field(VDBField.CONTENT_KEY, FieldType.TEXT, not_null=False))
fields.append(Field(VDBField.METADATA_KEY, FieldType.JSON, not_null=False))
fields.append(Field(VDBField.VECTOR, FieldType.FLOAT_VECTOR, not_null=True, dimension=dimension))
# Construct vector index params
indexes = []
indexes.append(
VectorIndex(
index_name="vector_idx",
index_name=self.vector_index,
index_type=index_type,
field="vector",
field=VDBField.VECTOR,
metric_type=metric_type,
params=HNSWParams(m=16, efconstruction=200),
)
)
# Filtering index
indexes.append(
FilteringIndex(
index_name=self.filtering_index,
fields=[VDBField.METADATA_KEY],
)
)
# Get analyzer and parse_mode from config
analyzer = getattr(
InvertedIndexAnalyzer,
self._client_config.inverted_index_analyzer,
InvertedIndexAnalyzer.DEFAULT_ANALYZER,
)
parse_mode = getattr(
InvertedIndexParseMode,
self._client_config.inverted_index_parser_mode,
InvertedIndexParseMode.COARSE_MODE,
)
# Inverted index
indexes.append(
InvertedIndex(
index_name=self.inverted_index,
fields=[VDBField.CONTENT_KEY],
params=InvertedIndexParams(
analyzer=analyzer,
parse_mode=parse_mode,
case_sensitive=True,
),
field_attributes=[InvertedIndexFieldAttribute.ANALYZED],
)
)
# Create table
self._db.create_table(
table_name=self._collection_name,
@@ -268,11 +335,15 @@ class BaiduVector(BaseVector):
)
# Wait for table created
timeout = 300 # 5 minutes timeout
start_time = time.time()
while True:
time.sleep(1)
table = self._db.describe_table(self._collection_name)
if table.state == TableState.NORMAL:
break
if time.time() - start_time > timeout:
raise TimeoutError(f"Table creation timeout after {timeout} seconds")
redis_client.set(table_exist_cache_key, 1, ex=3600)
@@ -296,5 +367,7 @@ class BaiduVectorFactory(AbstractVectorFactory):
database=dify_config.BAIDU_VECTOR_DB_DATABASE or "",
shard=dify_config.BAIDU_VECTOR_DB_SHARD,
replicas=dify_config.BAIDU_VECTOR_DB_REPLICAS,
inverted_index_analyzer=dify_config.BAIDU_VECTOR_DB_INVERTED_INDEX_ANALYZER,
inverted_index_parser_mode=dify_config.BAIDU_VECTOR_DB_INVERTED_INDEX_PARSER_MODE,
),
)

View File

@@ -4,7 +4,7 @@ import math
from typing import Any
from pydantic import BaseModel, model_validator
from pyobvector import VECTOR, FtsIndexParam, FtsParser, ObVecClient, l2_distance # type: ignore
from pyobvector import VECTOR, ObVecClient, l2_distance # type: ignore
from sqlalchemy import JSON, Column, String
from sqlalchemy.dialects.mysql import LONGTEXT
@@ -117,22 +117,39 @@ class OceanBaseVector(BaseVector):
columns=cols,
vidxs=vidx_params,
)
try:
if self._hybrid_search_enabled:
self._client.create_fts_idx_with_fts_index_param(
table_name=self._collection_name,
fts_idx_param=FtsIndexParam(
index_name="fulltext_index_for_col_text",
field_names=["text"],
parser_type=FtsParser.IK,
),
logger.debug("DEBUG: Table '%s' created successfully", self._collection_name)
if self._hybrid_search_enabled:
# Get parser from config or use default ik parser
parser_name = dify_config.OCEANBASE_FULLTEXT_PARSER or "ik"
allowed_parsers = ["ngram", "beng", "space", "ngram2", "ik", "japanese_ftparser", "thai_ftparser"]
if parser_name not in allowed_parsers:
raise ValueError(
f"Invalid OceanBase full-text parser: {parser_name}. "
f"Allowed values are: {', '.join(allowed_parsers)}"
)
except Exception as e:
raise Exception(
"Failed to add fulltext index to the target table, your OceanBase version must be 4.3.5.1 or above "
+ "to support fulltext index and vector index in the same table",
e,
logger.debug("Hybrid search is enabled, parser_name='%s'", parser_name)
logger.debug(
"About to create fulltext index for collection '%s' using parser '%s'",
self._collection_name,
parser_name,
)
try:
sql_command = f"""ALTER TABLE {self._collection_name}
ADD FULLTEXT INDEX fulltext_index_for_col_text (text) WITH PARSER {parser_name}"""
logger.debug("DEBUG: Executing SQL: %s", sql_command)
self._client.perform_raw_text_sql(sql_command)
logger.debug("DEBUG: Fulltext index created successfully for '%s'", self._collection_name)
except Exception as e:
logger.exception("Exception occurred while creating fulltext index")
raise Exception(
"Failed to add fulltext index to the target table, your OceanBase version must be "
"4.3.5.1 or above to support fulltext index and vector index in the same table"
) from e
else:
logger.debug("DEBUG: Hybrid search is NOT enabled for '%s'", self._collection_name)
self._client.refresh_metadata([self._collection_name])
redis_client.set(collection_exist_cache_key, 1, ex=3600)
@@ -229,7 +246,7 @@ class OceanBaseVector(BaseVector):
try:
metadata = json.loads(metadata_str)
except json.JSONDecodeError:
print(f"Invalid JSON metadata: {metadata_str}")
logger.warning("Invalid JSON metadata: %s", metadata_str)
metadata = {}
metadata["score"] = score
docs.append(Document(page_content=_text, metadata=metadata))

View File

@@ -1,5 +1,6 @@
import array
import json
import logging
import re
import uuid
from typing import Any
@@ -19,6 +20,8 @@ from core.rag.models.document import Document
from extensions.ext_redis import redis_client
from models.dataset import Dataset
logger = logging.getLogger(__name__)
oracledb.defaults.fetch_lobs = False
@@ -180,8 +183,8 @@ class OracleVector(BaseVector):
value,
)
conn.commit()
except Exception as e:
print(e)
except Exception:
logger.exception("Failed to insert record %s into %s", value[0], self.table_name)
conn.close()
return pks

View File

@@ -1,4 +1,5 @@
import json
import logging
import uuid
from typing import Any
@@ -23,6 +24,8 @@ from core.rag.datasource.vdb.vector_base import BaseVector
from core.rag.models.document import Document
from extensions.ext_redis import redis_client
logger = logging.getLogger(__name__)
Base = declarative_base() # type: Any
@@ -187,8 +190,8 @@ class RelytVector(BaseVector):
delete_condition = chunks_table.c.id.in_(ids)
conn.execute(chunks_table.delete().where(delete_condition))
return True
except Exception as e:
print("Delete operation failed:", str(e))
except Exception:
logger.exception("Delete operation failed for collection %s", self._collection_name)
return False
def delete_by_metadata_field(self, key: str, value: str):

View File

@@ -164,8 +164,8 @@ class TiDBVector(BaseVector):
delete_condition = table.c.id.in_(ids)
conn.execute(table.delete().where(delete_condition))
return True
except Exception as e:
print("Delete operation failed:", str(e))
except Exception:
logger.exception("Delete operation failed for collection %s", self._collection_name)
return False
def get_ids_by_metadata_field(self, key: str, value: str):

View File

@@ -417,12 +417,10 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
if db_model is not None:
offload_data = db_model.offload_data
else:
db_model = self._to_db_model(domain_model)
offload_data = []
offload_data = db_model.offload_data
offload_data = db_model.offload_data
if domain_model.inputs is not None:
result = self._truncate_and_upload(
domain_model.inputs,

View File

@@ -1,4 +1,5 @@
import json
import logging
import threading
from collections.abc import Mapping, MutableMapping
from pathlib import Path
@@ -8,6 +9,8 @@ from typing import Any, ClassVar, Optional
class SchemaRegistry:
"""Schema registry manages JSON schemas with version support"""
logger: ClassVar[logging.Logger] = logging.getLogger(__name__)
_default_instance: ClassVar[Optional["SchemaRegistry"]] = None
_lock: ClassVar[threading.Lock] = threading.Lock()
@@ -83,7 +86,7 @@ class SchemaRegistry:
self.metadata[uri] = metadata
except (OSError, json.JSONDecodeError) as e:
print(f"Warning: failed to load schema {version}/{schema_name}: {e}")
self.logger.warning("Failed to load schema %s/%s: %s", version, schema_name, e)
def get_schema(self, uri: str) -> Any | None:
"""Retrieves a schema by URI with version support"""

View File

@@ -396,6 +396,10 @@ class ApiTool(Tool):
# assemble invoke message based on response type
if parsed_response.is_json and isinstance(parsed_response.content, dict):
yield self.create_json_message(parsed_response.content)
# FIXES: https://github.com/langgenius/dify/pull/23456#issuecomment-3182413088
# We need never break the original flows
yield self.create_text_message(response.text)
else:
# Convert to string if needed and create text message
text_response = (

View File

@@ -41,7 +41,8 @@ class GraphExecutionState(BaseModel):
completed: bool = Field(default=False)
aborted: bool = Field(default=False)
error: GraphExecutionErrorState | None = Field(default=None)
node_executions: list[NodeExecutionState] = Field(default_factory=list)
exceptions_count: int = Field(default=0)
node_executions: list[NodeExecutionState] = Field(default_factory=list[NodeExecutionState])
def _serialize_error(error: Exception | None) -> GraphExecutionErrorState | None:
@@ -103,7 +104,8 @@ class GraphExecution:
completed: bool = False
aborted: bool = False
error: Exception | None = None
node_executions: dict[str, NodeExecution] = field(default_factory=dict)
node_executions: dict[str, NodeExecution] = field(default_factory=dict[str, NodeExecution])
exceptions_count: int = 0
def start(self) -> None:
"""Mark the graph execution as started."""
@@ -172,6 +174,7 @@ class GraphExecution:
completed=self.completed,
aborted=self.aborted,
error=_serialize_error(self.error),
exceptions_count=self.exceptions_count,
node_executions=node_states,
)
@@ -195,6 +198,7 @@ class GraphExecution:
self.completed = state.completed
self.aborted = state.aborted
self.error = _deserialize_error(state.error)
self.exceptions_count = state.exceptions_count
self.node_executions = {
item.node_id: NodeExecution(
node_id=item.node_id,
@@ -205,3 +209,7 @@ class GraphExecution:
)
for item in state.node_executions
}
def record_node_failure(self) -> None:
"""Increment the count of node failures encountered during execution."""
self.exceptions_count += 1

View File

@@ -3,11 +3,12 @@ Event handler implementations for different event types.
"""
import logging
from collections.abc import Mapping
from functools import singledispatchmethod
from typing import TYPE_CHECKING, final
from core.workflow.entities import GraphRuntimeState
from core.workflow.enums import NodeExecutionType
from core.workflow.enums import ErrorStrategy, NodeExecutionType
from core.workflow.graph import Graph
from core.workflow.graph_events import (
GraphNodeEventBase,
@@ -122,13 +123,15 @@ class EventHandler:
"""
# Track execution in domain model
node_execution = self._graph_execution.get_or_create_node_execution(event.node_id)
is_initial_attempt = node_execution.retry_count == 0
node_execution.mark_started(event.id)
# Track in response coordinator for stream ordering
self._response_coordinator.track_node_execution(event.node_id, event.id)
# Collect the event
self._event_collector.collect(event)
# Collect the event only for the first attempt; retries remain silent
if is_initial_attempt:
self._event_collector.collect(event)
@_dispatch.register
def _(self, event: NodeRunStreamChunkEvent) -> None:
@@ -161,7 +164,7 @@ class EventHandler:
node_execution.mark_taken()
# Store outputs in variable pool
self._store_node_outputs(event)
self._store_node_outputs(event.node_id, event.node_run_result.outputs)
# Forward to response coordinator and emit streaming events
streaming_events = self._response_coordinator.intercept_event(event)
@@ -191,7 +194,7 @@ class EventHandler:
# Handle response node outputs
if node.execution_type == NodeExecutionType.RESPONSE:
self._update_response_outputs(event)
self._update_response_outputs(event.node_run_result.outputs)
# Collect the event
self._event_collector.collect(event)
@@ -207,6 +210,7 @@ class EventHandler:
# Update domain model
node_execution = self._graph_execution.get_or_create_node_execution(event.node_id)
node_execution.mark_failed(event.error)
self._graph_execution.record_node_failure()
result = self._error_handler.handle_node_failure(event)
@@ -227,10 +231,40 @@ class EventHandler:
Args:
event: The node exception event
"""
# Node continues via fail-branch, so it's technically "succeeded"
# Node continues via fail-branch/default-value, treat as completion
node_execution = self._graph_execution.get_or_create_node_execution(event.node_id)
node_execution.mark_taken()
# Persist outputs produced by the exception strategy (e.g. default values)
self._store_node_outputs(event.node_id, event.node_run_result.outputs)
node = self._graph.nodes[event.node_id]
if node.error_strategy == ErrorStrategy.DEFAULT_VALUE:
ready_nodes, edge_streaming_events = self._edge_processor.process_node_success(event.node_id)
elif node.error_strategy == ErrorStrategy.FAIL_BRANCH:
ready_nodes, edge_streaming_events = self._edge_processor.handle_branch_completion(
event.node_id, event.node_run_result.edge_source_handle
)
else:
raise NotImplementedError(f"Unsupported error strategy: {node.error_strategy}")
for edge_event in edge_streaming_events:
self._event_collector.collect(edge_event)
for node_id in ready_nodes:
self._state_manager.enqueue_node(node_id)
self._state_manager.start_execution(node_id)
# Update response outputs if applicable
if node.execution_type == NodeExecutionType.RESPONSE:
self._update_response_outputs(event.node_run_result.outputs)
self._state_manager.finish_execution(event.node_id)
# Collect the exception event for observers
self._event_collector.collect(event)
@_dispatch.register
def _(self, event: NodeRunRetryEvent) -> None:
"""
@@ -242,21 +276,31 @@ class EventHandler:
node_execution = self._graph_execution.get_or_create_node_execution(event.node_id)
node_execution.increment_retry()
def _store_node_outputs(self, event: NodeRunSucceededEvent) -> None:
# Finish the previous attempt before re-queuing the node
self._state_manager.finish_execution(event.node_id)
# Emit retry event for observers
self._event_collector.collect(event)
# Re-queue node for execution
self._state_manager.enqueue_node(event.node_id)
self._state_manager.start_execution(event.node_id)
def _store_node_outputs(self, node_id: str, outputs: Mapping[str, object]) -> None:
"""
Store node outputs in the variable pool.
Args:
event: The node succeeded event containing outputs
"""
for variable_name, variable_value in event.node_run_result.outputs.items():
self._graph_runtime_state.variable_pool.add((event.node_id, variable_name), variable_value)
for variable_name, variable_value in outputs.items():
self._graph_runtime_state.variable_pool.add((node_id, variable_name), variable_value)
def _update_response_outputs(self, event: NodeRunSucceededEvent) -> None:
def _update_response_outputs(self, outputs: Mapping[str, object]) -> None:
"""Update response outputs for response nodes."""
# TODO: Design a mechanism for nodes to notify the engine about how to update outputs
# in runtime state, rather than allowing nodes to directly access runtime state.
for key, value in event.node_run_result.outputs.items():
for key, value in outputs.items():
if key == "answer":
existing = self._graph_runtime_state.get_output("answer", "")
if existing:

View File

@@ -5,6 +5,7 @@ Unified event manager for collecting and emitting events.
import threading
import time
from collections.abc import Generator
from contextlib import contextmanager
from typing import final
from core.workflow.graph_events import GraphEngineEvent
@@ -51,43 +52,23 @@ class ReadWriteLock:
"""Release a write lock."""
self._read_ready.release()
def read_lock(self) -> "ReadLockContext":
@contextmanager
def read_lock(self):
"""Return a context manager for read locking."""
return ReadLockContext(self)
self.acquire_read()
try:
yield
finally:
self.release_read()
def write_lock(self) -> "WriteLockContext":
@contextmanager
def write_lock(self):
"""Return a context manager for write locking."""
return WriteLockContext(self)
@final
class ReadLockContext:
"""Context manager for read locks."""
def __init__(self, lock: ReadWriteLock) -> None:
self._lock = lock
def __enter__(self) -> "ReadLockContext":
self._lock.acquire_read()
return self
def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None:
self._lock.release_read()
@final
class WriteLockContext:
"""Context manager for write locks."""
def __init__(self, lock: ReadWriteLock) -> None:
self._lock = lock
def __enter__(self) -> "WriteLockContext":
self._lock.acquire_write()
return self
def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: object) -> None:
self._lock.release_write()
self.acquire_write()
try:
yield
finally:
self.release_write()
@final

View File

@@ -23,6 +23,7 @@ from core.workflow.graph_events import (
GraphNodeEventBase,
GraphRunAbortedEvent,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
GraphRunStartedEvent,
GraphRunSucceededEvent,
)
@@ -260,12 +261,23 @@ class GraphEngine:
if self._graph_execution.error:
raise self._graph_execution.error
else:
yield GraphRunSucceededEvent(
outputs=self._graph_runtime_state.outputs,
)
outputs = self._graph_runtime_state.outputs
exceptions_count = self._graph_execution.exceptions_count
if exceptions_count > 0:
yield GraphRunPartialSucceededEvent(
exceptions_count=exceptions_count,
outputs=outputs,
)
else:
yield GraphRunSucceededEvent(
outputs=outputs,
)
except Exception as e:
yield GraphRunFailedEvent(error=str(e))
yield GraphRunFailedEvent(
error=str(e),
exceptions_count=self._graph_execution.exceptions_count,
)
raise
finally:

View File

@@ -15,6 +15,7 @@ from core.workflow.graph_events import (
GraphEngineEvent,
GraphRunAbortedEvent,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
GraphRunStartedEvent,
GraphRunSucceededEvent,
NodeRunExceptionEvent,
@@ -127,6 +128,13 @@ class DebugLoggingLayer(GraphEngineLayer):
if self.include_outputs and event.outputs:
self.logger.info(" Final outputs: %s", self._format_dict(event.outputs))
elif isinstance(event, GraphRunPartialSucceededEvent):
self.logger.warning("⚠️ Graph run partially succeeded")
if event.exceptions_count > 0:
self.logger.warning(" Total exceptions: %s", event.exceptions_count)
if self.include_outputs and event.outputs:
self.logger.info(" Final outputs: %s", self._format_dict(event.outputs))
elif isinstance(event, GraphRunFailedEvent):
self.logger.error("❌ Graph run failed: %s", event.error)
if event.exceptions_count > 0:
@@ -138,6 +146,12 @@ class DebugLoggingLayer(GraphEngineLayer):
self.logger.info(" Partial outputs: %s", self._format_dict(event.outputs))
# Node-level events
# Retry before Started because Retry subclasses Started;
elif isinstance(event, NodeRunRetryEvent):
self.retry_count += 1
self.logger.warning("🔄 Node retry: %s (attempt %s)", event.node_id, event.retry_index)
self.logger.warning(" Previous error: %s", event.error)
elif isinstance(event, NodeRunStartedEvent):
self.node_count += 1
self.logger.info('▶️ Node started: %s - "%s" (type: %s)', event.node_id, event.node_title, event.node_type)
@@ -167,11 +181,6 @@ class DebugLoggingLayer(GraphEngineLayer):
self.logger.warning("⚠️ Node exception handled: %s", event.node_id)
self.logger.warning(" Error: %s", event.error)
elif isinstance(event, NodeRunRetryEvent):
self.retry_count += 1
self.logger.warning("🔄 Node retry: %s (attempt %s)", event.node_id, event.retry_index)
self.logger.warning(" Previous error: %s", event.error)
elif isinstance(event, NodeRunStreamChunkEvent):
# Log stream chunks at debug level to avoid spam
final_indicator = " (FINAL)" if event.is_final else ""

View File

@@ -147,4 +147,4 @@ class ExecutionLimitsLayer(GraphEngineLayer):
self.logger.debug("Abort command sent to engine")
except Exception:
self.logger.exception("Failed to send abort command: %s")
self.logger.exception("Failed to send abort command")

View File

@@ -19,6 +19,7 @@ from core.workflow.enums import (
from core.workflow.graph_events import (
GraphNodeEventBase,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
GraphRunSucceededEvent,
)
from core.workflow.node_events import (
@@ -372,43 +373,16 @@ class IterationNode(Node):
variable_mapping: dict[str, Sequence[str]] = {
f"{node_id}.input_selector": typed_node_data.iterator_selector,
}
iteration_node_ids = set()
# init graph
from core.workflow.entities import GraphInitParams, GraphRuntimeState
from core.workflow.graph import Graph
from core.workflow.nodes.node_factory import DifyNodeFactory
# Create minimal GraphInitParams for static analysis
graph_init_params = GraphInitParams(
tenant_id="",
app_id="",
workflow_id="",
graph_config=graph_config,
user_id="",
user_from="",
invoke_from="",
call_depth=0,
)
# Create minimal GraphRuntimeState for static analysis
from core.workflow.entities import VariablePool
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(),
start_at=0,
)
# Create node factory for static analysis
node_factory = DifyNodeFactory(graph_init_params=graph_init_params, graph_runtime_state=graph_runtime_state)
iteration_graph = Graph.init(
graph_config=graph_config,
node_factory=node_factory,
root_node_id=typed_node_data.start_node_id,
)
if not iteration_graph:
raise IterationGraphNotFoundError("iteration graph not found")
# Find all nodes that belong to this loop
nodes = graph_config.get("nodes", [])
for node in nodes:
node_data = node.get("data", {})
if node_data.get("iteration_id") == node_id:
in_iteration_node_id = node.get("id")
if in_iteration_node_id:
iteration_node_ids.add(in_iteration_node_id)
# Get node configs from graph_config instead of non-existent node_id_config_mapping
node_configs = {node["id"]: node for node in graph_config.get("nodes", []) if "id" in node}
@@ -444,9 +418,7 @@ class IterationNode(Node):
variable_mapping.update(sub_node_variable_mapping)
# remove variable out from iteration
variable_mapping = {
key: value for key, value in variable_mapping.items() if value[0] not in iteration_graph.node_ids
}
variable_mapping = {key: value for key, value in variable_mapping.items() if value[0] not in iteration_node_ids}
return variable_mapping
@@ -485,7 +457,7 @@ class IterationNode(Node):
if isinstance(event, GraphNodeEventBase):
self._append_iteration_info_to_event(event=event, iter_run_index=current_index)
yield event
elif isinstance(event, GraphRunSucceededEvent):
elif isinstance(event, (GraphRunSucceededEvent, GraphRunPartialSucceededEvent)):
result = variable_pool.get(self._node_data.output_selector)
if result is None:
outputs.append(None)

View File

@@ -63,7 +63,7 @@ class RetrievalSetting(BaseModel):
Retrieval Setting.
"""
search_method: Literal["semantic_search", "keyword_search", "fulltext_search", "hybrid_search"]
search_method: Literal["semantic_search", "keyword_search", "full_text_search", "hybrid_search"]
top_k: int
score_threshold: float | None = 0.5
score_threshold_enabled: bool = False

View File

@@ -219,7 +219,7 @@ class LLMNode(Node):
model_instance=model_instance,
)
query = None
query: str | None = None
if self._node_data.memory:
query = self._node_data.memory.query_prompt_template
if not query and (

View File

@@ -1,3 +1,4 @@
import contextlib
import json
import logging
from collections.abc import Callable, Generator, Mapping, Sequence
@@ -127,11 +128,13 @@ class LoopNode(Node):
try:
reach_break_condition = False
if break_conditions:
_, _, reach_break_condition = condition_processor.process_conditions(
variable_pool=self.graph_runtime_state.variable_pool,
conditions=break_conditions,
operator=logical_operator,
)
with contextlib.suppress(ValueError):
_, _, reach_break_condition = condition_processor.process_conditions(
variable_pool=self.graph_runtime_state.variable_pool,
conditions=break_conditions,
operator=logical_operator,
)
if reach_break_condition:
loop_count = 0
cost_tokens = 0
@@ -295,42 +298,11 @@ class LoopNode(Node):
variable_mapping = {}
# init graph
from core.workflow.entities import GraphInitParams, GraphRuntimeState, VariablePool
from core.workflow.graph import Graph
from core.workflow.nodes.node_factory import DifyNodeFactory
# Extract loop node IDs statically from graph_config
# Create minimal GraphInitParams for static analysis
graph_init_params = GraphInitParams(
tenant_id="",
app_id="",
workflow_id="",
graph_config=graph_config,
user_id="",
user_from="",
invoke_from="",
call_depth=0,
)
loop_node_ids = cls._extract_loop_node_ids_from_config(graph_config, node_id)
# Create minimal GraphRuntimeState for static analysis
graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool(),
start_at=0,
)
# Create node factory for static analysis
node_factory = DifyNodeFactory(graph_init_params=graph_init_params, graph_runtime_state=graph_runtime_state)
loop_graph = Graph.init(
graph_config=graph_config,
node_factory=node_factory,
root_node_id=typed_node_data.start_node_id,
)
if not loop_graph:
raise ValueError("loop graph not found")
# Get node configs from graph_config instead of non-existent node_id_config_mapping
# Get node configs from graph_config
node_configs = {node["id"]: node for node in graph_config.get("nodes", []) if "id" in node}
for sub_node_id, sub_node_config in node_configs.items():
if sub_node_config.get("data", {}).get("loop_id") != node_id:
@@ -371,12 +343,35 @@ class LoopNode(Node):
variable_mapping[f"{node_id}.{loop_variable.label}"] = selector
# remove variable out from loop
variable_mapping = {
key: value for key, value in variable_mapping.items() if value[0] not in loop_graph.node_ids
}
variable_mapping = {key: value for key, value in variable_mapping.items() if value[0] not in loop_node_ids}
return variable_mapping
@classmethod
def _extract_loop_node_ids_from_config(cls, graph_config: Mapping[str, Any], loop_node_id: str) -> set[str]:
"""
Extract node IDs that belong to a specific loop from graph configuration.
This method statically analyzes the graph configuration to find all nodes
that are part of the specified loop, without creating actual node instances.
:param graph_config: the complete graph configuration
:param loop_node_id: the ID of the loop node
:return: set of node IDs that belong to the loop
"""
loop_node_ids = set()
# Find all nodes that belong to this loop
nodes = graph_config.get("nodes", [])
for node in nodes:
node_data = node.get("data", {})
if node_data.get("loop_id") == loop_node_id:
node_id = node.get("id")
if node_id:
loop_node_ids.add(node_id)
return loop_node_ids
@staticmethod
def _get_segment_for_constant(var_type: SegmentType, original_value: Any) -> Segment:
"""Get the appropriate segment type for a constant value."""

View File

@@ -402,6 +402,8 @@ class WorkflowEntry:
input_value = user_inputs.get(node_variable)
if not input_value:
input_value = user_inputs.get(node_variable_key)
if input_value is None:
continue
if isinstance(input_value, dict) and "type" in input_value and "transfer_method" in input_value:
input_value = file_factory.build_from_mapping(mapping=input_value, tenant_id=tenant_id)

View File

@@ -15,6 +15,7 @@ def init_app(app: DifyApp):
install_plugins,
install_rag_pipeline_plugins,
migrate_data_for_plugin,
migrate_oss,
old_metadata_migration,
remove_orphaned_files_on_storage,
reset_email,
@@ -47,6 +48,7 @@ def init_app(app: DifyApp):
remove_orphaned_files_on_storage,
setup_system_tool_oauth_client,
cleanup_orphaned_draft_variables,
migrate_oss,
setup_datasource_oauth_client,
transform_datasource_credentials,
install_rag_pipeline_plugins,

View File

@@ -3,8 +3,9 @@ import os
from collections.abc import Generator
from pathlib import Path
import opendal # type: ignore[import]
from dotenv import dotenv_values
from opendal import Operator
from opendal.layers import RetryLayer
from extensions.storage.base_storage import BaseStorage
@@ -34,10 +35,9 @@ class OpenDALStorage(BaseStorage):
root = kwargs.get("root", "storage")
Path(root).mkdir(parents=True, exist_ok=True)
self.op = opendal.Operator(scheme=scheme, **kwargs) # type: ignore
retry_layer = RetryLayer(max_times=3, factor=2.0, jitter=True)
self.op = Operator(scheme=scheme, **kwargs).layer(retry_layer)
logger.debug("opendal operator created with scheme %s", scheme)
retry_layer = opendal.layers.RetryLayer(max_times=3, factor=2.0, jitter=True)
self.op = self.op.layer(retry_layer)
logger.debug("added retry layer to opendal operator")
def save(self, filename: str, data: bytes):
@@ -57,22 +57,24 @@ class OpenDALStorage(BaseStorage):
raise FileNotFoundError("File not found")
batch_size = 4096
file = self.op.open(path=filename, mode="rb")
while chunk := file.read(batch_size):
yield chunk
with self.op.open(
path=filename,
mode="rb",
chunck=batch_size,
) as file:
while chunk := file.read(batch_size):
yield chunk
logger.debug("file %s loaded as stream", filename)
def download(self, filename: str, target_filepath: str):
if not self.exists(filename):
raise FileNotFoundError("File not found")
with Path(target_filepath).open("wb") as f:
f.write(self.op.read(path=filename))
Path(target_filepath).write_bytes(self.op.read(path=filename))
logger.debug("file %s downloaded to %s", filename, target_filepath)
def exists(self, filename: str) -> bool:
res: bool = self.op.exists(path=filename)
return res
return self.op.exists(path=filename)
def delete(self, filename: str):
if self.exists(filename):
@@ -85,7 +87,7 @@ class OpenDALStorage(BaseStorage):
if not self.exists(path):
raise FileNotFoundError("Path not found")
all_files = self.op.scan(path=path)
all_files = self.op.list(path=path)
if files and directories:
logger.debug("files and directories on %s scanned", path)
return [f.path for f in all_files]

View File

@@ -33,6 +33,7 @@ file_fields = {
"created_by": fields.String,
"created_at": TimestampField,
"preview_url": fields.String,
"source_url": fields.String,
}

View File

@@ -1,10 +1,32 @@
import psycogreen.gevent as pscycogreen_gevent # type: ignore
from gevent import events as gevent_events
from grpc.experimental import gevent as grpc_gevent # type: ignore
# NOTE(QuantumGhost): here we cannot use post_fork to patch gRPC, as
# grpc_gevent.init_gevent must be called after patching stdlib.
# Gunicorn calls `post_init` before applying monkey patch.
# Use `post_init` to setup gRPC gevent support would cause deadlock and
# some other weird issues.
#
# ref:
# - https://github.com/grpc/grpc/blob/62533ea13879d6ee95c6fda11ec0826ca822c9dd/src/python/grpcio/grpc/experimental/gevent.py
# - https://github.com/gevent/gevent/issues/2060#issuecomment-3016768668
# - https://github.com/benoitc/gunicorn/blob/master/gunicorn/arbiter.py#L607-L613
def post_fork(server, worker):
def post_patch(event):
# this function is only called for gevent worker.
# from gevent docs (https://www.gevent.org/api/gevent.monkey.html):
# You can also subscribe to the events to provide additional patching beyond what gevent distributes, either for
# additional standard library modules, or for third-party packages. The suggested time to do this patching is in
# the subscriber for gevent.events.GeventDidPatchBuiltinModulesEvent.
if not isinstance(event, gevent_events.GeventDidPatchBuiltinModulesEvent):
return
# grpc gevent
grpc_gevent.init_gevent()
server.log.info("gRPC patched with gevent.")
print("gRPC patched with gevent.", flush=True) # noqa: T201
pscycogreen_gevent.patch_psycopg()
server.log.info("psycopg2 patched with gevent.")
print("psycopg2 patched with gevent.", flush=True) # noqa: T201
gevent_events.subscribers.append(post_patch)

View File

@@ -1,7 +1,7 @@
import urllib.parse
from dataclasses import dataclass
import requests
import httpx
@dataclass
@@ -58,7 +58,7 @@ class GitHubOAuth(OAuth):
"redirect_uri": self.redirect_uri,
}
headers = {"Accept": "application/json"}
response = requests.post(self._TOKEN_URL, data=data, headers=headers)
response = httpx.post(self._TOKEN_URL, data=data, headers=headers)
response_json = response.json()
access_token = response_json.get("access_token")
@@ -70,11 +70,11 @@ class GitHubOAuth(OAuth):
def get_raw_user_info(self, token: str):
headers = {"Authorization": f"token {token}"}
response = requests.get(self._USER_INFO_URL, headers=headers)
response = httpx.get(self._USER_INFO_URL, headers=headers)
response.raise_for_status()
user_info = response.json()
email_response = requests.get(self._EMAIL_INFO_URL, headers=headers)
email_response = httpx.get(self._EMAIL_INFO_URL, headers=headers)
email_info = email_response.json()
primary_email: dict = next((email for email in email_info if email["primary"] == True), {})
@@ -112,7 +112,7 @@ class GoogleOAuth(OAuth):
"redirect_uri": self.redirect_uri,
}
headers = {"Accept": "application/json"}
response = requests.post(self._TOKEN_URL, data=data, headers=headers)
response = httpx.post(self._TOKEN_URL, data=data, headers=headers)
response_json = response.json()
access_token = response_json.get("access_token")
@@ -124,7 +124,7 @@ class GoogleOAuth(OAuth):
def get_raw_user_info(self, token: str):
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(self._USER_INFO_URL, headers=headers)
response = httpx.get(self._USER_INFO_URL, headers=headers)
response.raise_for_status()
return response.json()

View File

@@ -1,7 +1,7 @@
import urllib.parse
from typing import Any
import requests
import httpx
from flask_login import current_user
from sqlalchemy import select
@@ -43,7 +43,7 @@ class NotionOAuth(OAuthDataSource):
data = {"code": code, "grant_type": "authorization_code", "redirect_uri": self.redirect_uri}
headers = {"Accept": "application/json"}
auth = (self.client_id, self.client_secret)
response = requests.post(self._TOKEN_URL, data=data, auth=auth, headers=headers)
response = httpx.post(self._TOKEN_URL, data=data, auth=auth, headers=headers)
response_json = response.json()
access_token = response_json.get("access_token")
@@ -239,7 +239,7 @@ class NotionOAuth(OAuthDataSource):
"Notion-Version": "2022-06-28",
}
response = requests.post(url=self._NOTION_PAGE_SEARCH, json=data, headers=headers)
response = httpx.post(url=self._NOTION_PAGE_SEARCH, json=data, headers=headers)
response_json = response.json()
results.extend(response_json.get("results", []))
@@ -254,7 +254,7 @@ class NotionOAuth(OAuthDataSource):
"Authorization": f"Bearer {access_token}",
"Notion-Version": "2022-06-28",
}
response = requests.get(url=f"{self._NOTION_BLOCK_SEARCH}/{block_id}", headers=headers)
response = httpx.get(url=f"{self._NOTION_BLOCK_SEARCH}/{block_id}", headers=headers)
response_json = response.json()
if response.status_code != 200:
message = response_json.get("message", "unknown error")
@@ -270,7 +270,7 @@ class NotionOAuth(OAuthDataSource):
"Authorization": f"Bearer {access_token}",
"Notion-Version": "2022-06-28",
}
response = requests.get(url=self._NOTION_BOT_USER, headers=headers)
response = httpx.get(url=self._NOTION_BOT_USER, headers=headers)
response_json = response.json()
if "object" in response_json and response_json["object"] == "user":
user_type = response_json["type"]
@@ -294,7 +294,7 @@ class NotionOAuth(OAuthDataSource):
"Authorization": f"Bearer {access_token}",
"Notion-Version": "2022-06-28",
}
response = requests.post(url=self._NOTION_PAGE_SEARCH, json=data, headers=headers)
response = httpx.post(url=self._NOTION_PAGE_SEARCH, json=data, headers=headers)
response_json = response.json()
results.extend(response_json.get("results", []))

View File

@@ -47,7 +47,7 @@ def upgrade():
sa.Column('plugin_id', sa.String(length=255), nullable=False),
sa.Column('auth_type', sa.String(length=255), nullable=False),
sa.Column('encrypted_credentials', postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.Column('avatar_url', sa.String(length=255), nullable=True),
sa.Column('avatar_url', sa.Text(), nullable=True),
sa.Column('is_default', sa.Boolean(), server_default=sa.text('false'), nullable=False),
sa.Column('expires_at', sa.Integer(), server_default='-1', nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
@@ -156,7 +156,7 @@ def upgrade():
sa.Column('type', sa.String(20), nullable=False),
sa.Column('file_id', models.types.StringUUID(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('workflow_node_execution_offload_pkey')),
sa.UniqueConstraint('node_execution_id', 'type', name=op.f('workflow_node_execution_offload_node_execution_id_key'), postgresql_nulls_not_distinct=False)
sa.UniqueConstraint('node_execution_id', 'type', name=op.f('workflow_node_execution_offload_node_execution_id_key'))
)
with op.batch_alter_table('datasets', schema=None) as batch_op:
batch_op.add_column(sa.Column('keyword_number', sa.Integer(), server_default=sa.text('10'), nullable=True))

View File

@@ -910,7 +910,7 @@ class AppDatasetJoin(Base):
id = mapped_column(StringUUID, primary_key=True, nullable=False, server_default=sa.text("uuid_generate_v4()"))
app_id = mapped_column(StringUUID, nullable=False)
dataset_id = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=db.func.current_timestamp())
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=sa.func.current_timestamp())
@property
def app(self):
@@ -931,7 +931,7 @@ class DatasetQuery(Base):
source_app_id = mapped_column(StringUUID, nullable=True)
created_by_role = mapped_column(String, nullable=False)
created_by = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=db.func.current_timestamp())
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=sa.func.current_timestamp())
class DatasetKeywordTable(Base):

View File

@@ -1044,7 +1044,7 @@ class Message(Base):
sign_url = sign_tool_file(tool_file_id=tool_file_id, extension=extension)
elif "file-preview" in url:
# get upload file id
upload_file_id_pattern = r"\/files\/([\w-]+)\/file-preview?\?timestamp="
upload_file_id_pattern = r"\/files\/([\w-]+)\/file-preview\?timestamp="
result = re.search(upload_file_id_pattern, url)
if not result:
continue
@@ -1055,7 +1055,7 @@ class Message(Base):
sign_url = file_helpers.get_signed_file_url(upload_file_id)
elif "image-preview" in url:
# image-preview is deprecated, use file-preview instead
upload_file_id_pattern = r"\/files\/([\w-]+)\/image-preview?\?timestamp="
upload_file_id_pattern = r"\/files\/([\w-]+)\/image-preview\?timestamp="
result = re.search(upload_file_id_pattern, url)
if not result:
continue
@@ -1731,7 +1731,7 @@ class MessageChain(Base):
type: Mapped[str] = mapped_column(String(255), nullable=False)
input = mapped_column(sa.Text, nullable=True)
output = mapped_column(sa.Text, nullable=True)
created_at = mapped_column(sa.DateTime, nullable=False, server_default=db.func.current_timestamp())
created_at = mapped_column(sa.DateTime, nullable=False, server_default=sa.func.current_timestamp())
class MessageAgentThought(Base):
@@ -1769,7 +1769,7 @@ class MessageAgentThought(Base):
latency: Mapped[float | None] = mapped_column(sa.Float, nullable=True)
created_by_role = mapped_column(String, nullable=False)
created_by = mapped_column(StringUUID, nullable=False)
created_at = mapped_column(sa.DateTime, nullable=False, server_default=db.func.current_timestamp())
created_at = mapped_column(sa.DateTime, nullable=False, server_default=sa.func.current_timestamp())
@property
def files(self) -> list[Any]:
@@ -1872,7 +1872,7 @@ class DatasetRetrieverResource(Base):
index_node_hash = mapped_column(sa.Text, nullable=True)
retriever_from = mapped_column(sa.Text, nullable=False)
created_by = mapped_column(StringUUID, nullable=False)
created_at = mapped_column(sa.DateTime, nullable=False, server_default=db.func.current_timestamp())
created_at = mapped_column(sa.DateTime, nullable=False, server_default=sa.func.current_timestamp())
class Tag(Base):

View File

@@ -35,7 +35,7 @@ class DatasourceProvider(Base):
plugin_id: Mapped[str] = db.Column(db.String(255), nullable=False)
auth_type: Mapped[str] = db.Column(db.String(255), nullable=False)
encrypted_credentials: Mapped[dict] = db.Column(JSONB, nullable=False)
avatar_url: Mapped[str] = db.Column(db.String(255), nullable=True, default="default")
avatar_url: Mapped[str] = db.Column(db.Text, nullable=True, default="default")
is_default: Mapped[bool] = db.Column(db.Boolean, nullable=False, server_default=db.text("false"))
expires_at: Mapped[int] = db.Column(db.Integer, nullable=False, server_default="-1")

View File

@@ -890,12 +890,18 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
class WorkflowNodeExecutionOffload(Base):
__tablename__ = "workflow_node_execution_offload"
__table_args__ = (
# PostgreSQL 14 treats NULL values as distinct in unique constraints by default,
# allowing multiple records with NULL values for the same column combination.
#
# This behavior allows us to have multiple records with NULL node_execution_id,
# simplifying garbage collection process.
UniqueConstraint(
"node_execution_id",
"type",
# Treat `NULL` as distinct for this unique index, so
# we can have mutitple records with `NULL` node_exeution_id, simplify garbage collection process.
postgresql_nulls_not_distinct=False,
# Note: PostgreSQL 15+ supports explicit `nulls distinct` behavior through
# `postgresql_nulls_not_distinct=False`, which would make our intention clearer.
# We rely on PostgreSQL's default behavior of treating NULLs as distinct values.
# postgresql_nulls_not_distinct=False,
),
)
_HASH_COL_SIZE = 64

View File

@@ -1,11 +1,11 @@
[project]
name = "dify-api"
version = "2.0.0-beta2"
version = "1.9.0"
requires-python = ">=3.11,<3.13"
dependencies = [
"arize-phoenix-otel~=0.9.2",
"authlib==1.3.1",
"authlib==1.6.4",
"azure-identity==1.16.1",
"beautifulsoup4==4.12.2",
"boto3==1.35.99",
@@ -20,7 +20,7 @@ dependencies = [
"flask-migrate~=4.0.7",
"flask-orjson~=2.0.0",
"flask-sqlalchemy~=3.1.1",
"gevent~=24.11.1",
"gevent~=25.9.1",
"gmpy2~=2.2.1",
"google-api-core==2.18.0",
"google-api-python-client==2.90.0",
@@ -169,7 +169,7 @@ dev = [
"types-redis>=4.6.0.20241004",
"celery-types>=0.23.0",
"mypy~=1.17.1",
"locust>=2.40.4",
# "locust>=2.40.4", # Temporarily removed due to compatibility issues. Uncomment when resolved.
"sseclient-py>=1.8.0",
]
@@ -183,7 +183,7 @@ storage = [
"cos-python-sdk-v5==1.9.30",
"esdk-obs-python==3.24.6.1",
"google-cloud-storage==2.16.0",
"opendal~=0.45.16",
"opendal~=0.46.0",
"oss2==2.18.5",
"supabase~=2.18.1",
"tos~=2.7.1",
@@ -211,7 +211,7 @@ vdb = [
"pgvecto-rs[sqlalchemy]~=0.2.1",
"pgvector==0.2.5",
"pymilvus~=2.5.0",
"pymochow==1.3.1",
"pymochow==2.2.9",
"pyobvector~=0.2.15",
"qdrant-client==1.9.0",
"tablestore==6.2.0",

View File

@@ -1041,6 +1041,8 @@ class TenantService:
db.session.add(ta)
db.session.commit()
if dify_config.BILLING_ENABLED:
BillingService.clean_billing_info_cache(tenant.id)
return ta
@staticmethod
@@ -1199,6 +1201,9 @@ class TenantService:
db.session.delete(ta)
db.session.commit()
if dify_config.BILLING_ENABLED:
BillingService.clean_billing_info_cache(tenant.id)
@staticmethod
def update_member_role(tenant: Tenant, member: Account, new_role: str, operator: Account):
"""Update member role"""

View File

@@ -2,6 +2,7 @@ import json
import logging
from typing import TypedDict, cast
import sqlalchemy as sa
from flask_sqlalchemy.pagination import Pagination
from configs import dify_config
@@ -20,6 +21,7 @@ from libs.login import current_user
from models.account import Account
from models.model import App, AppMode, AppModelConfig, Site
from models.tools import ApiToolProvider
from services.billing_service import BillingService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.tag_service import TagService
@@ -64,7 +66,7 @@ class AppService:
return None
app_models = db.paginate(
db.select(App).where(*filters).order_by(App.created_at.desc()),
sa.select(App).where(*filters).order_by(App.created_at.desc()),
page=args["page"],
per_page=args["limit"],
error_out=False,
@@ -162,6 +164,9 @@ class AppService:
# update web app setting as private
EnterpriseService.WebAppAuth.update_app_access_mode(app.id, "private")
if dify_config.BILLING_ENABLED:
BillingService.clean_billing_info_cache(app.tenant_id)
return app
def get_app(self, app: App) -> App:
@@ -337,6 +342,9 @@ class AppService:
if FeatureService.get_system_features().webapp_auth.enabled:
EnterpriseService.WebAppAuth.cleanup_webapp(app.id)
if dify_config.BILLING_ENABLED:
BillingService.clean_billing_info_cache(app.tenant_id)
# Trigger asynchronous deletion of app and related data
remove_app_and_related_data_task.delay(tenant_id=app.tenant_id, app_id=app.id)

View File

@@ -1,6 +1,6 @@
import json
import requests
import httpx
from services.auth.api_key_auth_base import ApiKeyAuthBase
@@ -36,7 +36,7 @@ class FirecrawlAuth(ApiKeyAuthBase):
return {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
def _post_request(self, url, data, headers):
return requests.post(url, headers=headers, json=data)
return httpx.post(url, headers=headers, json=data)
def _handle_error(self, response):
if response.status_code in {402, 409, 500}:

View File

@@ -1,6 +1,6 @@
import json
import requests
import httpx
from services.auth.api_key_auth_base import ApiKeyAuthBase
@@ -31,7 +31,7 @@ class JinaAuth(ApiKeyAuthBase):
return {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
def _post_request(self, url, data, headers):
return requests.post(url, headers=headers, json=data)
return httpx.post(url, headers=headers, json=data)
def _handle_error(self, response):
if response.status_code in {402, 409, 500}:

View File

@@ -1,6 +1,6 @@
import json
import requests
import httpx
from services.auth.api_key_auth_base import ApiKeyAuthBase
@@ -31,7 +31,7 @@ class JinaAuth(ApiKeyAuthBase):
return {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
def _post_request(self, url, data, headers):
return requests.post(url, headers=headers, json=data)
return httpx.post(url, headers=headers, json=data)
def _handle_error(self, response):
if response.status_code in {402, 409, 500}:

View File

@@ -1,7 +1,7 @@
import json
from urllib.parse import urljoin
import requests
import httpx
from services.auth.api_key_auth_base import ApiKeyAuthBase
@@ -31,7 +31,7 @@ class WatercrawlAuth(ApiKeyAuthBase):
return {"Content-Type": "application/json", "X-API-KEY": self.api_key}
def _get_request(self, url, headers):
return requests.get(url, headers=headers)
return httpx.get(url, headers=headers)
def _handle_error(self, response):
if response.status_code in {402, 409, 500}:

View File

@@ -5,6 +5,7 @@ import httpx
from tenacity import retry, retry_if_exception_type, stop_before_delay, wait_fixed
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from libs.helper import RateLimiter
from models.account import Account, TenantAccountJoin, TenantAccountRole
@@ -173,3 +174,7 @@ class BillingService:
res = cls._send_request("POST", "/compliance/download", json=json)
cls.compliance_download_rate_limiter.increment_rate_limit(limiter_key)
return res
@classmethod
def clean_billing_info_cache(cls, tenant_id: str):
redis_client.delete(f"tenant:{tenant_id}:billing_info")

View File

@@ -115,12 +115,12 @@ class DatasetService:
# Check if permitted_dataset_ids is not empty to avoid WHERE false condition
if permitted_dataset_ids and len(permitted_dataset_ids) > 0:
query = query.where(
db.or_(
sa.or_(
Dataset.permission == DatasetPermissionEnum.ALL_TEAM,
db.and_(
sa.and_(
Dataset.permission == DatasetPermissionEnum.ONLY_ME, Dataset.created_by == user.id
),
db.and_(
sa.and_(
Dataset.permission == DatasetPermissionEnum.PARTIAL_TEAM,
Dataset.id.in_(permitted_dataset_ids),
),
@@ -128,9 +128,9 @@ class DatasetService:
)
else:
query = query.where(
db.or_(
sa.or_(
Dataset.permission == DatasetPermissionEnum.ALL_TEAM,
db.and_(
sa.and_(
Dataset.permission == DatasetPermissionEnum.ONLY_ME, Dataset.created_by == user.id
),
)
@@ -532,7 +532,8 @@ class DatasetService:
filtered_data["updated_by"] = user.id
filtered_data["updated_at"] = naive_utc_now()
# update Retrieval model
filtered_data["retrieval_model"] = data["retrieval_model"]
if data.get("retrieval_model"):
filtered_data["retrieval_model"] = data["retrieval_model"]
# update icon info
if data.get("icon_info"):
filtered_data["icon_info"] = data.get("icon_info")
@@ -1878,7 +1879,7 @@ class DocumentService:
# for notion_info in notion_info_list:
# workspace_id = notion_info.workspace_id
# data_source_binding = DataSourceOauthBinding.query.filter(
# db.and_(
# sa.and_(
# DataSourceOauthBinding.tenant_id == current_user.current_tenant_id,
# DataSourceOauthBinding.provider == "notion",
# DataSourceOauthBinding.disabled == False,

View File

@@ -83,7 +83,7 @@ class RetrievalSetting(BaseModel):
Retrieval Setting.
"""
search_method: Literal["semantic_search", "fulltext_search", "keyword_search", "hybrid_search"]
search_method: Literal["semantic_search", "full_text_search", "keyword_search", "hybrid_search"]
top_k: int
score_threshold: float | None = 0.5
score_threshold_enabled: bool = False

View File

@@ -217,7 +217,7 @@ class MessageService:
@classmethod
def get_suggested_questions_after_answer(
cls, app_model: App, user: Union[Account, EndUser] | None, message_id: str, invoke_from: InvokeFrom
) -> list[Message]:
) -> list[str]:
if not user:
raise ValueError("user cannot be None")
@@ -288,7 +288,7 @@ class MessageService:
)
with measure_time() as timer:
questions: list[Message] = LLMGenerator.generate_suggested_questions_after_answer(
questions: list[str] = LLMGenerator.generate_suggested_questions_after_answer(
tenant_id=app_model.tenant_id, histories=histories
)

View File

@@ -1,6 +1,6 @@
import os
import requests
import httpx
class OperationService:
@@ -12,7 +12,7 @@ class OperationService:
headers = {"Content-Type": "application/json", "Billing-Api-Secret-Key": cls.secret_key}
url = f"{cls.base_url}{endpoint}"
response = requests.request(method, url, json=json, params=params, headers=headers)
response = httpx.request(method, url, json=json, params=params, headers=headers)
return response.json()

View File

@@ -46,7 +46,11 @@ limit 1000"""
record_id = str(i.id)
provider_name = str(i.provider_name)
retrieval_model = i.retrieval_model
print(type(retrieval_model))
logger.debug(
"Processing dataset %s with retrieval model of type %s",
record_id,
type(retrieval_model),
)
if record_id in failed_ids:
continue

View File

@@ -1,9 +1,14 @@
import re
from configs import dify_config
from core.helper import marketplace
from core.plugin.entities.plugin import PluginDependency, PluginInstallationSource
from core.plugin.impl.plugin import PluginInstaller
from models.provider_ids import ModelProviderID, ToolProviderID
# Compile regex pattern for version extraction at module level for better performance
_VERSION_REGEX = re.compile(r":(?P<version>[0-9]+(?:\.[0-9]+){2}(?:[+-][0-9A-Za-z.-]+)?)(?:@|$)")
class DependenciesAnalysisService:
@classmethod
@@ -49,6 +54,13 @@ class DependenciesAnalysisService:
for dependency in dependencies:
unique_identifier = dependency.value.plugin_unique_identifier
if unique_identifier in missing_plugin_unique_identifiers:
# Extract version for Marketplace dependencies
if dependency.type == PluginDependency.Type.Marketplace:
version_match = _VERSION_REGEX.search(unique_identifier)
if version_match:
dependency.value.version = version_match.group("version")
# Create and append the dependency (same for all types)
leaked_dependencies.append(
PluginDependency(
type=dependency.type,

View File

@@ -471,7 +471,7 @@ class PluginMigration:
total_failed_tenant = 0
while True:
# paginate
tenants = db.paginate(db.select(Tenant).order_by(Tenant.created_at.desc()), page=page, per_page=100)
tenants = db.paginate(sa.select(Tenant).order_by(Tenant.created_at.desc()), page=page, per_page=100)
if tenants.items is None or len(tenants.items) == 0:
break

View File

@@ -1327,14 +1327,14 @@ class RagPipelineService:
"""
Retry error document
"""
document_pipeline_excution_log = (
document_pipeline_execution_log = (
db.session.query(DocumentPipelineExecutionLog)
.where(DocumentPipelineExecutionLog.document_id == document.id)
.first()
)
if not document_pipeline_excution_log:
if not document_pipeline_execution_log:
raise ValueError("Document pipeline execution log not found")
pipeline = db.session.query(Pipeline).where(Pipeline.id == document_pipeline_excution_log.pipeline_id).first()
pipeline = db.session.query(Pipeline).where(Pipeline.id == document_pipeline_execution_log.pipeline_id).first()
if not pipeline:
raise ValueError("Pipeline not found")
# convert to app config
@@ -1346,10 +1346,10 @@ class RagPipelineService:
workflow=workflow,
user=user,
args={
"inputs": document_pipeline_excution_log.input_data,
"start_node_id": document_pipeline_excution_log.datasource_node_id,
"datasource_type": document_pipeline_excution_log.datasource_type,
"datasource_info_list": [json.loads(document_pipeline_excution_log.datasource_info)],
"inputs": document_pipeline_execution_log.input_data,
"start_node_id": document_pipeline_execution_log.datasource_node_id,
"datasource_type": document_pipeline_execution_log.datasource_type,
"datasource_info_list": [json.loads(document_pipeline_execution_log.datasource_info)],
"original_document_id": document.id,
},
invoke_from=InvokeFrom.PUBLISHED,
@@ -1381,8 +1381,8 @@ class RagPipelineService:
datasource_nodes = workflow.graph_dict.get("nodes", [])
datasource_plugins = []
for datasource_node in datasource_nodes:
if datasource_node.get("type") == "datasource":
datasource_node_data = datasource_node.get("data", {})
if datasource_node.get("data", {}).get("type") == "datasource":
datasource_node_data = datasource_node["data"]
if not datasource_node_data:
continue

View File

@@ -685,12 +685,24 @@ class RagPipelineDslService:
workflow_dict = workflow.to_dict(include_secret=include_secret)
for node in workflow_dict.get("graph", {}).get("nodes", []):
if node.get("data", {}).get("type", "") == NodeType.KNOWLEDGE_RETRIEVAL.value:
dataset_ids = node["data"].get("dataset_ids", [])
node_data = node.get("data", {})
if not node_data:
continue
data_type = node_data.get("type", "")
if data_type == NodeType.KNOWLEDGE_RETRIEVAL.value:
dataset_ids = node_data.get("dataset_ids", [])
node["data"]["dataset_ids"] = [
self.encrypt_dataset_id(dataset_id=dataset_id, tenant_id=pipeline.tenant_id)
for dataset_id in dataset_ids
]
# filter credential id from tool node
if not include_secret and data_type == NodeType.TOOL.value:
node_data.pop("credential_id", None)
# filter credential id from agent node
if not include_secret and data_type == NodeType.AGENT.value:
for tool in node_data.get("agent_parameters", {}).get("tools", {}).get("value", []):
tool.pop("credential_id", None)
export_data["workflow"] = workflow_dict
dependencies = self._extract_dependencies_from_workflow(workflow)
export_data["dependencies"] = [

View File

@@ -1,4 +1,5 @@
import json
import logging
from datetime import UTC, datetime
from pathlib import Path
from uuid import uuid4
@@ -17,6 +18,8 @@ from services.entities.knowledge_entities.rag_pipeline_entities import Knowledge
from services.plugin.plugin_migration import PluginMigration
from services.plugin.plugin_service import PluginService
logger = logging.getLogger(__name__)
class RagPipelineTransformService:
def transform_dataset(self, dataset_id: str):
@@ -35,11 +38,11 @@ class RagPipelineTransformService:
indexing_technique = dataset.indexing_technique
if not datasource_type and not indexing_technique:
return self._transfrom_to_empty_pipeline(dataset)
return self._transform_to_empty_pipeline(dataset)
doc_form = dataset.doc_form
if not doc_form:
return self._transfrom_to_empty_pipeline(dataset)
return self._transform_to_empty_pipeline(dataset)
retrieval_model = dataset.retrieval_model
pipeline_yaml = self._get_transform_yaml(doc_form, datasource_type, indexing_technique)
# deal dependencies
@@ -257,10 +260,10 @@ class RagPipelineTransformService:
if plugin_unique_identifier:
need_install_plugin_unique_identifiers.append(plugin_unique_identifier)
if need_install_plugin_unique_identifiers:
print(need_install_plugin_unique_identifiers)
logger.debug("Installing missing pipeline plugins %s", need_install_plugin_unique_identifiers)
PluginService.install_from_marketplace_pkg(tenant_id, need_install_plugin_unique_identifiers)
def _transfrom_to_empty_pipeline(self, dataset: Dataset):
def _transform_to_empty_pipeline(self, dataset: Dataset):
pipeline = Pipeline(
tenant_id=dataset.tenant_id,
name=dataset.name,

View File

@@ -1,5 +1,6 @@
import uuid
import sqlalchemy as sa
from flask_login import current_user
from sqlalchemy import func, select
from werkzeug.exceptions import NotFound
@@ -18,7 +19,7 @@ class TagService:
.where(Tag.type == tag_type, Tag.tenant_id == current_tenant_id)
)
if keyword:
query = query.where(db.and_(Tag.name.ilike(f"%{keyword}%")))
query = query.where(sa.and_(Tag.name.ilike(f"%{keyword}%")))
query = query.group_by(Tag.id, Tag.type, Tag.name, Tag.created_at)
results: list = query.order_by(Tag.created_at.desc()).all()
return results

View File

@@ -262,7 +262,7 @@ class ToolTransformService:
author=user.name if user else "Anonymous",
name=tool.name,
label=I18nObject(en_US=tool.name, zh_Hans=tool.name),
description=I18nObject(en_US=tool.description, zh_Hans=tool.description),
description=I18nObject(en_US=tool.description or "", zh_Hans=tool.description or ""),
parameters=ToolTransformService.convert_mcp_schema_to_parameter(tool.inputSchema),
labels=[],
)

View File

@@ -262,6 +262,14 @@ class VariableTruncator:
target_length = self._array_element_limit
for i, item in enumerate(value):
# Dirty fix:
# The output of `Start` node may contain list of `File` elements,
# causing `AssertionError` while invoking `_truncate_json_primitives`.
#
# This check ensures that `list[File]` are handled separately
if isinstance(item, File):
truncated_value.append(item)
continue
if i >= target_length:
return _PartResult(truncated_value, used_size, True)
if i > 0:

View File

@@ -3,7 +3,7 @@ import json
from dataclasses import dataclass
from typing import Any
import requests
import httpx
from flask_login import current_user
from core.helper import encrypter
@@ -216,7 +216,7 @@ class WebsiteService:
@classmethod
def _crawl_with_jinareader(cls, request: CrawlRequest, api_key: str) -> dict[str, Any]:
if not request.options.crawl_sub_pages:
response = requests.get(
response = httpx.get(
f"https://r.jina.ai/{request.url}",
headers={"Accept": "application/json", "Authorization": f"Bearer {api_key}"},
)
@@ -224,7 +224,7 @@ class WebsiteService:
raise ValueError("Failed to crawl:")
return {"status": "active", "data": response.json().get("data")}
else:
response = requests.post(
response = httpx.post(
"https://adaptivecrawl-kir3wx7b3a-uc.a.run.app",
json={
"url": request.url,
@@ -287,7 +287,7 @@ class WebsiteService:
@classmethod
def _get_jinareader_status(cls, job_id: str, api_key: str) -> dict[str, Any]:
response = requests.post(
response = httpx.post(
"https://adaptivecrawlstatus-kir3wx7b3a-uc.a.run.app",
headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
json={"taskId": job_id},
@@ -303,7 +303,7 @@ class WebsiteService:
}
if crawl_status_data["status"] == "completed":
response = requests.post(
response = httpx.post(
"https://adaptivecrawlstatus-kir3wx7b3a-uc.a.run.app",
headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
json={"taskId": job_id, "urls": list(data.get("processed", {}).keys())},
@@ -362,7 +362,7 @@ class WebsiteService:
@classmethod
def _get_jinareader_url_data(cls, job_id: str, url: str, api_key: str) -> dict[str, Any] | None:
if not job_id:
response = requests.get(
response = httpx.get(
f"https://r.jina.ai/{url}",
headers={"Accept": "application/json", "Authorization": f"Bearer {api_key}"},
)
@@ -371,7 +371,7 @@ class WebsiteService:
return dict(response.json().get("data", {}))
else:
# Get crawl status first
status_response = requests.post(
status_response = httpx.post(
"https://adaptivecrawlstatus-kir3wx7b3a-uc.a.run.app",
headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
json={"taskId": job_id},
@@ -381,7 +381,7 @@ class WebsiteService:
raise ValueError("Crawl job is not completed")
# Get processed data
data_response = requests.post(
data_response = httpx.post(
"https://adaptivecrawlstatus-kir3wx7b3a-uc.a.run.app",
headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
json={"taskId": job_id, "urls": list(status_data.get("processed", {}).keys())},

View File

@@ -450,7 +450,8 @@ class WorkflowService:
)
if not default_provider:
raise ValueError("No default credential found")
# plugin does not require credentials, skip
return
# Check credential policy compliance using the default credential ID
from core.helper.credential_utils import check_credential_policy_compliance
@@ -1008,7 +1009,7 @@ def _setup_variable_pool(
if workflow.type != WorkflowType.WORKFLOW.value:
system_variable.query = query
system_variable.conversation_id = conversation_id
system_variable.dialogue_count = 0
system_variable.dialogue_count = 1
else:
system_variable = SystemVariable.empty()

View File

@@ -2,6 +2,7 @@ import logging
import time
import click
import sqlalchemy as sa
from celery import shared_task
from sqlalchemy import select
@@ -51,7 +52,7 @@ def document_indexing_sync_task(dataset_id: str, document_id: str):
data_source_binding = (
db.session.query(DataSourceOauthBinding)
.where(
db.and_(
sa.and_(
DataSourceOauthBinding.tenant_id == document.tenant_id,
DataSourceOauthBinding.provider == "notion",
DataSourceOauthBinding.disabled == False,

View File

@@ -1,17 +1,46 @@
import logging
import time
from collections.abc import Mapping
from typing import Any
import click
from celery import shared_task
from flask import render_template_string
from jinja2.runtime import Context
from jinja2.sandbox import ImmutableSandboxedEnvironment
from configs import dify_config
from configs.feature import TemplateMode
from extensions.ext_mail import mail
from libs.email_i18n import get_email_i18n_service
logger = logging.getLogger(__name__)
class SandboxedEnvironment(ImmutableSandboxedEnvironment):
def __init__(self, timeout: int, *args: Any, **kwargs: Any):
self._timeout_time = time.time() + timeout
super().__init__(*args, **kwargs)
def call(self, context: Context, obj: Any, *args: Any, **kwargs: Any) -> Any:
if time.time() > self._timeout_time:
raise TimeoutError("Template rendering timeout")
return super().call(context, obj, *args, **kwargs)
def _render_template_with_strategy(body: str, substitutions: Mapping[str, str]) -> str:
mode = dify_config.MAIL_TEMPLATING_MODE
timeout = dify_config.MAIL_TEMPLATING_TIMEOUT
if mode == TemplateMode.UNSAFE:
return render_template_string(body, **substitutions)
if mode == TemplateMode.SANDBOX:
tmpl = SandboxedEnvironment(timeout=timeout).from_string(body)
return tmpl.render(substitutions)
if mode == TemplateMode.DISABLED:
return body
raise ValueError(f"Unsupported mail templating mode: {mode}")
@shared_task(queue="mail")
def send_inner_email_task(to: list[str], subject: str, body: str, substitutions: Mapping[str, str]):
if not mail.is_inited():
@@ -21,7 +50,7 @@ def send_inner_email_task(to: list[str], subject: str, body: str, substitutions:
start_at = time.perf_counter()
try:
html_content = render_template_string(body, **substitutions)
html_content = _render_template_with_strategy(body, substitutions)
email_service = get_email_i18n_service()
email_service.send_raw_email(to=to, subject=subject, html_content=html_content)

View File

@@ -1,12 +1,14 @@
"""Integration tests for ChatMessageApi permission verification."""
import uuid
from types import SimpleNamespace
from unittest import mock
import pytest
from flask.testing import FlaskClient
from controllers.console.app import completion as completion_api
from controllers.console.app import message as message_api
from controllers.console.app import wraps
from libs.datetime_utils import naive_utc_now
from models import Account, App, Tenant
@@ -99,3 +101,106 @@ class TestChatMessageApiPermissions:
)
assert response.status_code == status
@pytest.mark.parametrize(
("role", "status"),
[
(TenantAccountRole.OWNER, 200),
(TenantAccountRole.ADMIN, 200),
(TenantAccountRole.EDITOR, 200),
(TenantAccountRole.NORMAL, 403),
(TenantAccountRole.DATASET_OPERATOR, 403),
],
)
def test_get_requires_edit_permission(
self,
test_client: FlaskClient,
auth_header,
monkeypatch,
mock_app_model,
mock_account,
role: TenantAccountRole,
status: int,
):
"""Ensure GET chat-messages endpoint enforces edit permissions."""
mock_load_app_model = mock.Mock(return_value=mock_app_model)
monkeypatch.setattr(wraps, "_load_app_model", mock_load_app_model)
conversation_id = uuid.uuid4()
created_at = naive_utc_now()
mock_conversation = SimpleNamespace(id=str(conversation_id), app_id=str(mock_app_model.id))
mock_message = SimpleNamespace(
id=str(uuid.uuid4()),
conversation_id=str(conversation_id),
inputs=[],
query="hello",
message=[{"text": "hello"}],
message_tokens=0,
re_sign_file_url_answer="",
answer_tokens=0,
provider_response_latency=0.0,
from_source="console",
from_end_user_id=None,
from_account_id=mock_account.id,
feedbacks=[],
workflow_run_id=None,
annotation=None,
annotation_hit_history=None,
created_at=created_at,
agent_thoughts=[],
message_files=[],
message_metadata_dict={},
status="success",
error="",
parent_message_id=None,
)
class MockQuery:
def __init__(self, model):
self.model = model
def where(self, *args, **kwargs):
return self
def first(self):
if getattr(self.model, "__name__", "") == "Conversation":
return mock_conversation
return None
def order_by(self, *args, **kwargs):
return self
def limit(self, *_):
return self
def all(self):
if getattr(self.model, "__name__", "") == "Message":
return [mock_message]
return []
mock_session = mock.Mock()
mock_session.query.side_effect = MockQuery
mock_session.scalar.return_value = False
monkeypatch.setattr(message_api, "db", SimpleNamespace(session=mock_session))
monkeypatch.setattr(message_api, "current_user", mock_account)
class DummyPagination:
def __init__(self, data, limit, has_more):
self.data = data
self.limit = limit
self.has_more = has_more
monkeypatch.setattr(message_api, "InfiniteScrollPagination", DummyPagination)
mock_account.role = role
response = test_client.get(
f"/console/api/apps/{mock_app_model.id}/chat-messages",
headers=auth_header,
query_string={"conversation_id": str(conversation_id)},
)
assert response.status_code == status

View File

@@ -1,8 +1,8 @@
import os
from typing import Literal
import httpx
import pytest
import requests
from core.plugin.entities.plugin_daemon import PluginDaemonBasicResponse
from core.tools.entities.common_entities import I18nObject
@@ -27,13 +27,11 @@ class MockedHttp:
@classmethod
def requests_request(
cls, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"], url: str, **kwargs
) -> requests.Response:
) -> httpx.Response:
"""
Mocked requests.request
Mocked httpx.request
"""
request = requests.PreparedRequest()
request.method = method
request.url = url
request = httpx.Request(method, url)
if url.endswith("/tools"):
content = PluginDaemonBasicResponse[list[ToolProviderEntity]](
code=0, message="success", data=cls.list_tools()
@@ -41,8 +39,7 @@ class MockedHttp:
else:
raise ValueError("")
response = requests.Response()
response.status_code = 200
response = httpx.Response(status_code=200)
response.request = request
response._content = content.encode("utf-8")
return response
@@ -54,7 +51,7 @@ MOCK_SWITCH = os.getenv("MOCK_SWITCH", "false").lower() == "true"
@pytest.fixture
def setup_http_mock(request, monkeypatch: pytest.MonkeyPatch):
if MOCK_SWITCH:
monkeypatch.setattr(requests, "request", MockedHttp.requests_request)
monkeypatch.setattr(httpx, "request", MockedHttp.requests_request)
def unpatch():
monkeypatch.undo()

View File

@@ -100,8 +100,8 @@ class MockBaiduVectorDBClass:
"row": {
"id": primary_key.get("id"),
"vector": [0.23432432, 0.8923744, 0.89238432],
"text": "text",
"metadata": '{"doc_id": "doc_id_001"}',
"page_content": "text",
"metadata": {"doc_id": "doc_id_001"},
},
"code": 0,
"msg": "Success",
@@ -127,8 +127,8 @@ class MockBaiduVectorDBClass:
"row": {
"id": "doc_id_001",
"vector": [0.23432432, 0.8923744, 0.89238432],
"text": "text",
"metadata": '{"doc_id": "doc_id_001"}',
"page_content": "text",
"metadata": {"doc_id": "doc_id_001"},
},
"distance": 0.1,
"score": 0.5,

Some files were not shown because too many files have changed in this diff Show More