mirror of
https://github.com/langgenius/dify.git
synced 2026-04-07 04:19:19 +08:00
Compare commits
211 Commits
feat/hitl
...
move-token
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d3aab5901 | ||
|
|
48d8667c4f | ||
|
|
91dfdd87e3 | ||
|
|
e4316a9bf6 | ||
|
|
87bf7401f1 | ||
|
|
33242697ce | ||
|
|
24fe95308a | ||
|
|
d8f8b8cd07 | ||
|
|
ad600f0827 | ||
|
|
35b31d0cdd | ||
|
|
592ad04818 | ||
|
|
71ff135927 | ||
|
|
f73be8d69e | ||
|
|
f9196f7bea | ||
|
|
439ff3775d | ||
|
|
233e12e631 | ||
|
|
eccb67d5b6 | ||
|
|
1e6de0e6ad | ||
|
|
9f0ee5c145 | ||
|
|
6c66e11cac | ||
|
|
149a7870bc | ||
|
|
661af404e9 | ||
|
|
8ff51a58fd | ||
|
|
f17c234a92 | ||
|
|
a694533fc9 | ||
|
|
d20880d102 | ||
|
|
eea1cf17ef | ||
|
|
700a4029c6 | ||
|
|
5b45b62994 | ||
|
|
349d2d8e4e | ||
|
|
2eefb585f9 | ||
|
|
5cb1b53b47 | ||
|
|
b48f36a4e5 | ||
|
|
0bf5f4df3b | ||
|
|
56759c03b7 | ||
|
|
cec6d82650 | ||
|
|
33e0dae2b2 | ||
|
|
4f38229fbc | ||
|
|
5d927b413f | ||
|
|
39de931555 | ||
|
|
05c827606b | ||
|
|
daa923278e | ||
|
|
7b1b5c2445 | ||
|
|
154486bc7b | ||
|
|
fd799fa3f4 | ||
|
|
065122a2ae | ||
|
|
b5f62b98f9 | ||
|
|
0ac09127c7 | ||
|
|
3c69bac2b1 | ||
|
|
0964fc142e | ||
|
|
6f2c101e3c | ||
|
|
34b6fc92d7 | ||
|
|
d773096146 | ||
|
|
212756c315 | ||
|
|
6ff420cd03 | ||
|
|
99cc98320a | ||
|
|
5bc1b6f615 | ||
|
|
de10b342e8 | ||
|
|
48f6b2e885 | ||
|
|
4e142f72e8 | ||
|
|
a6456da393 | ||
|
|
b863f8edbd | ||
|
|
64296da7e7 | ||
|
|
02fef84d7f | ||
|
|
28f2098b00 | ||
|
|
59681ce760 | ||
|
|
4997b82a63 | ||
|
|
3abfbc0246 | ||
|
|
beea1acd92 | ||
|
|
8761109a34 | ||
|
|
00935fe526 | ||
|
|
0358925d7d | ||
|
|
b8fbd7b0f6 | ||
|
|
bcd5dd0f81 | ||
|
|
a1991c51e4 | ||
|
|
b2fa6cb4d3 | ||
|
|
ad3a195734 | ||
|
|
84533cbfe0 | ||
|
|
0eaae4f573 | ||
|
|
9819f7d69c | ||
|
|
a040b9428d | ||
|
|
740d94c6ed | ||
|
|
657eeb65b8 | ||
|
|
f923901d3f | ||
|
|
a0ddaed6d3 | ||
|
|
2162cd1a69 | ||
|
|
0070891114 | ||
|
|
6e531fe44f | ||
|
|
80f49367eb | ||
|
|
7c60ad01d3 | ||
|
|
57890eed25 | ||
|
|
737575d637 | ||
|
|
f76ee7cfa4 | ||
|
|
a0244d1390 | ||
|
|
42af9d5438 | ||
|
|
4c48e3b997 | ||
|
|
46f0cebbb0 | ||
|
|
2d54192f35 | ||
|
|
80a5398dea | ||
|
|
ab64c4adf9 | ||
|
|
ce8354a42a | ||
|
|
d0bb642fc5 | ||
|
|
e4ddf07194 | ||
|
|
aad980f267 | ||
|
|
8141e3af99 | ||
|
|
b108de6607 | ||
|
|
7b3b3dbe52 | ||
|
|
5d7aeaa7e5 | ||
|
|
41e2812349 | ||
|
|
fbacb9f7a2 | ||
|
|
4d36a0707a | ||
|
|
3c4f5b45c4 | ||
|
|
ce75f26744 | ||
|
|
ea0e1b52a8 | ||
|
|
0993b94acd | ||
|
|
368db04519 | ||
|
|
4e3680e139 | ||
|
|
3758904c00 | ||
|
|
938e4790f4 | ||
|
|
00591a592c | ||
|
|
41a4a57d2e | ||
|
|
7656d514b9 | ||
|
|
6824eda1c6 | ||
|
|
3cf13ba9c6 | ||
|
|
c16e64b833 | ||
|
|
ba12960975 | ||
|
|
1f74a251f7 | ||
|
|
db17119a96 | ||
|
|
34e09829fb | ||
|
|
faf5166c67 | ||
|
|
c7bbe05088 | ||
|
|
210710e76d | ||
|
|
98466e2d29 | ||
|
|
a4e03d6284 | ||
|
|
84d090db33 | ||
|
|
f3f56f03e3 | ||
|
|
b6d506828b | ||
|
|
16df9851a2 | ||
|
|
c0ffb6db2a | ||
|
|
0118b45cff | ||
|
|
8fd3eeb760 | ||
|
|
f233e2036f | ||
|
|
3fd1eea4d7 | ||
|
|
b65678bd4c | ||
|
|
bfdc39510b | ||
|
|
80e6312807 | ||
|
|
d6b025e91e | ||
|
|
10f85074e8 | ||
|
|
f953331f91 | ||
|
|
32350f7a04 | ||
|
|
c730fec1e4 | ||
|
|
b4fec9b7aa | ||
|
|
7e0bccbbf0 | ||
|
|
2f87ecc0ce | ||
|
|
5b4c7b2a40 | ||
|
|
378a1d7d08 | ||
|
|
ce0192620d | ||
|
|
e9feeedc01 | ||
|
|
e32490f54e | ||
|
|
e9db50f781 | ||
|
|
0310f631ee | ||
|
|
abc5a61e98 | ||
|
|
5f1698add6 | ||
|
|
36e50f277f | ||
|
|
704ee40caa | ||
|
|
3119c99979 | ||
|
|
16b8733886 | ||
|
|
83f64104fd | ||
|
|
5077879886 | ||
|
|
697b57631a | ||
|
|
6015f23e79 | ||
|
|
f355c8d595 | ||
|
|
0142001fc2 | ||
|
|
4058e9ae23 | ||
|
|
95310561ec | ||
|
|
de33561a52 | ||
|
|
6d9665578b | ||
|
|
18f14c04dc | ||
|
|
14251b249d | ||
|
|
1819bd72ef | ||
|
|
7dabc03a08 | ||
|
|
1a050c9f86 | ||
|
|
7fb6e0cdfe | ||
|
|
e0fcf33979 | ||
|
|
898e09264b | ||
|
|
4ac461d882 | ||
|
|
fa763216d0 | ||
|
|
d546210040 | ||
|
|
4e0a7a7f9e | ||
|
|
e4ab6e0919 | ||
|
|
6fa943fe75 | ||
|
|
a1fc280102 | ||
|
|
56e3a55023 | ||
|
|
6c63c6a221 | ||
|
|
5b06203ef5 | ||
|
|
3348b89436 | ||
|
|
0428ac5f3a | ||
|
|
aead4fe65c | ||
|
|
bdf6739b86 | ||
|
|
483db22b97 | ||
|
|
aa800d838d | ||
|
|
4bd80683a4 | ||
|
|
c185a51bad | ||
|
|
4430a1b3da | ||
|
|
2c9430313d | ||
|
|
552ee369b2 | ||
|
|
d5b9a7b2f8 | ||
|
|
c2a3f459c7 | ||
|
|
4971e11734 | ||
|
|
a297b06aac | ||
|
|
e988266f53 |
168
.agents/skills/backend-code-review/SKILL.md
Normal file
168
.agents/skills/backend-code-review/SKILL.md
Normal file
@@ -0,0 +1,168 @@
|
||||
---
|
||||
name: backend-code-review
|
||||
description: Review backend code for quality, security, maintainability, and best practices based on established checklist rules. Use when the user requests a review, analysis, or improvement of backend files (e.g., `.py`) under the `api/` directory. Do NOT use for frontend files (e.g., `.tsx`, `.ts`, `.js`). Supports pending-change review, code snippets review, and file-focused review.
|
||||
---
|
||||
|
||||
# Backend Code Review
|
||||
|
||||
## When to use this skill
|
||||
|
||||
Use this skill whenever the user asks to **review, analyze, or improve** backend code (e.g., `.py`) under the `api/` directory. Supports the following review modes:
|
||||
|
||||
- **Pending-change review**: when the user asks to review current changes (inspect staged/working-tree files slated for commit to get the changes).
|
||||
- **Code snippets review**: when the user pastes code snippets (e.g., a function/class/module excerpt) into the chat and asks for a review.
|
||||
- **File-focused review**: when the user points to specific files and asks for a review of those files (one file or a small, explicit set of files, e.g., `api/...`, `api/app.py`).
|
||||
|
||||
Do NOT use this skill when:
|
||||
|
||||
- The request is about frontend code or UI (e.g., `.tsx`, `.ts`, `.js`, `web/`).
|
||||
- The user is not asking for a review/analysis/improvement of backend code.
|
||||
- The scope is not under `api/` (unless the user explicitly asks to review backend-related changes outside `api/`).
|
||||
|
||||
## How to use this skill
|
||||
|
||||
Follow these steps when using this skill:
|
||||
|
||||
1. **Identify the review mode** (pending-change vs snippet vs file-focused) based on the user’s input. Keep the scope tight: review only what the user provided or explicitly referenced.
|
||||
2. Follow the rules defined in **Checklist** to perform the review. If no Checklist rule matches, apply **General Review Rules** as a fallback to perform the best-effort review.
|
||||
3. Compose the final output strictly follow the **Required Output Format**.
|
||||
|
||||
Notes when using this skill:
|
||||
- Always include actionable fixes or suggestions (including possible code snippets).
|
||||
- Use best-effort `File:Line` references when a file path and line numbers are available; otherwise, use the most specific identifier you can.
|
||||
|
||||
## Checklist
|
||||
|
||||
- db schema design: if the review scope includes code/files under `api/models/` or `api/migrations/`, follow [references/db-schema-rule.md](references/db-schema-rule.md) to perform the review
|
||||
- architecture: if the review scope involves controller/service/core-domain/libs/model layering, dependency direction, or moving responsibilities across modules, follow [references/architecture-rule.md](references/architecture-rule.md) to perform the review
|
||||
- repositories abstraction: if the review scope contains table/model operations (e.g., `select(...)`, `session.execute(...)`, joins, CRUD) and is not under `api/repositories`, `api/core/repositories`, or `api/extensions/*/repositories/`, follow [references/repositories-rule.md](references/repositories-rule.md) to perform the review
|
||||
- sqlalchemy patterns: if the review scope involves SQLAlchemy session/query usage, db transaction/crud usage, or raw SQL usage, follow [references/sqlalchemy-rule.md](references/sqlalchemy-rule.md) to perform the review
|
||||
|
||||
## General Review Rules
|
||||
|
||||
### 1. Security Review
|
||||
|
||||
Check for:
|
||||
- SQL injection vulnerabilities
|
||||
- Server-Side Request Forgery (SSRF)
|
||||
- Command injection
|
||||
- Insecure deserialization
|
||||
- Hardcoded secrets/credentials
|
||||
- Improper authentication/authorization
|
||||
- Insecure direct object references
|
||||
|
||||
### 2. Performance Review
|
||||
|
||||
Check for:
|
||||
- N+1 queries
|
||||
- Missing database indexes
|
||||
- Memory leaks
|
||||
- Blocking operations in async code
|
||||
- Missing caching opportunities
|
||||
|
||||
### 3. Code Quality Review
|
||||
|
||||
Check for:
|
||||
- Code forward compatibility
|
||||
- Code duplication (DRY violations)
|
||||
- Functions doing too much (SRP violations)
|
||||
- Deep nesting / complex conditionals
|
||||
- Magic numbers/strings
|
||||
- Poor naming
|
||||
- Missing error handling
|
||||
- Incomplete type coverage
|
||||
|
||||
### 4. Testing Review
|
||||
|
||||
Check for:
|
||||
- Missing test coverage for new code
|
||||
- Tests that don't test behavior
|
||||
- Flaky test patterns
|
||||
- Missing edge cases
|
||||
|
||||
## Required Output Format
|
||||
|
||||
When this skill invoked, the response must exactly follow one of the two templates:
|
||||
|
||||
### Template A (any findings)
|
||||
|
||||
```markdown
|
||||
# Code Review Summary
|
||||
|
||||
Found <X> critical issues need to be fixed:
|
||||
|
||||
## 🔴 Critical (Must Fix)
|
||||
|
||||
### 1. <brief description of the issue>
|
||||
|
||||
FilePath: <path> line <line>
|
||||
<relevant code snippet or pointer>
|
||||
|
||||
#### Explanation
|
||||
|
||||
<detailed explanation and references of the issue>
|
||||
|
||||
#### Suggested Fix
|
||||
|
||||
1. <brief description of suggested fix>
|
||||
2. <code example> (optional, omit if not applicable)
|
||||
|
||||
---
|
||||
... (repeat for each critical issue) ...
|
||||
|
||||
Found <Y> suggestions for improvement:
|
||||
|
||||
## 🟡 Suggestions (Should Consider)
|
||||
|
||||
### 1. <brief description of the suggestion>
|
||||
|
||||
FilePath: <path> line <line>
|
||||
<relevant code snippet or pointer>
|
||||
|
||||
#### Explanation
|
||||
|
||||
<detailed explanation and references of the suggestion>
|
||||
|
||||
#### Suggested Fix
|
||||
|
||||
1. <brief description of suggested fix>
|
||||
2. <code example> (optional, omit if not applicable)
|
||||
|
||||
---
|
||||
... (repeat for each suggestion) ...
|
||||
|
||||
Found <Z> optional nits:
|
||||
|
||||
## 🟢 Nits (Optional)
|
||||
### 1. <brief description of the nit>
|
||||
|
||||
FilePath: <path> line <line>
|
||||
<relevant code snippet or pointer>
|
||||
|
||||
#### Explanation
|
||||
|
||||
<explanation and references of the optional nit>
|
||||
|
||||
#### Suggested Fix
|
||||
|
||||
- <minor suggestions>
|
||||
|
||||
---
|
||||
... (repeat for each nits) ...
|
||||
|
||||
## ✅ What's Good
|
||||
|
||||
- <Positive feedback on good patterns>
|
||||
```
|
||||
|
||||
- If there are no critical issues or suggestions or option nits or good points, just omit that section.
|
||||
- If the issue number is more than 10, summarize as "Found 10+ critical issues/suggestions/optional nits" and only output the first 10 items.
|
||||
- Don't compress the blank lines between sections; keep them as-is for readability.
|
||||
- If there is any issue requires code changes, append a brief follow-up question to ask whether the user wants to apply the fix(es) after the structured output. For example: "Would you like me to use the Suggested fix(es) to address these issues?"
|
||||
|
||||
### Template B (no issues)
|
||||
|
||||
```markdown
|
||||
## Code Review Summary
|
||||
✅ No issues found.
|
||||
```
|
||||
@@ -0,0 +1,91 @@
|
||||
# Rule Catalog — Architecture
|
||||
|
||||
## Scope
|
||||
- Covers: controller/service/core-domain/libs/model layering, dependency direction, responsibility placement, observability-friendly flow.
|
||||
|
||||
## Rules
|
||||
|
||||
### Keep business logic out of controllers
|
||||
- Category: maintainability
|
||||
- Severity: critical
|
||||
- Description: Controllers should parse input, call services, and return serialized responses. Business decisions inside controllers make behavior hard to reuse and test.
|
||||
- Suggested fix: Move domain/business logic into the service or core/domain layer. Keep controller handlers thin and orchestration-focused.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
@bp.post("/apps/<app_id>/publish")
|
||||
def publish_app(app_id: str):
|
||||
payload = request.get_json() or {}
|
||||
if payload.get("force") and current_user.role != "admin":
|
||||
raise ValueError("only admin can force publish")
|
||||
app = App.query.get(app_id)
|
||||
app.status = "published"
|
||||
db.session.commit()
|
||||
return {"result": "ok"}
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
@bp.post("/apps/<app_id>/publish")
|
||||
def publish_app(app_id: str):
|
||||
payload = PublishRequest.model_validate(request.get_json() or {})
|
||||
app_service.publish_app(app_id=app_id, force=payload.force, actor_id=current_user.id)
|
||||
return {"result": "ok"}
|
||||
```
|
||||
|
||||
### Preserve layer dependency direction
|
||||
- Category: best practices
|
||||
- Severity: critical
|
||||
- Description: Controllers may depend on services, and services may depend on core/domain abstractions. Reversing this direction (for example, core importing controller/web modules) creates cycles and leaks transport concerns into domain code.
|
||||
- Suggested fix: Extract shared contracts into core/domain or service-level modules and make upper layers depend on lower, not the reverse.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
# core/policy/publish_policy.py
|
||||
from controllers.console.app import request_context
|
||||
|
||||
def can_publish() -> bool:
|
||||
return request_context.current_user.is_admin
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
# core/policy/publish_policy.py
|
||||
def can_publish(role: str) -> bool:
|
||||
return role == "admin"
|
||||
|
||||
# service layer adapts web/user context to domain input
|
||||
allowed = can_publish(role=current_user.role)
|
||||
```
|
||||
|
||||
### Keep libs business-agnostic
|
||||
- Category: maintainability
|
||||
- Severity: critical
|
||||
- Description: Modules under `api/libs/` should remain reusable, business-agnostic building blocks. They must not encode product/domain-specific rules, workflow orchestration, or business decisions.
|
||||
- Suggested fix:
|
||||
- If business logic appears in `api/libs/`, extract it into the appropriate `services/` or `core/` module and keep `libs` focused on generic, cross-cutting helpers.
|
||||
- Keep `libs` dependencies clean: avoid importing service/controller/domain-specific modules into `api/libs/`.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
# api/libs/conversation_filter.py
|
||||
from services.conversation_service import ConversationService
|
||||
|
||||
def should_archive_conversation(conversation, tenant_id: str) -> bool:
|
||||
# Domain policy and service dependency are leaking into libs.
|
||||
service = ConversationService()
|
||||
if service.has_paid_plan(tenant_id):
|
||||
return conversation.idle_days > 90
|
||||
return conversation.idle_days > 30
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
# api/libs/datetime_utils.py (business-agnostic helper)
|
||||
def older_than_days(idle_days: int, threshold_days: int) -> bool:
|
||||
return idle_days > threshold_days
|
||||
|
||||
# services/conversation_service.py (business logic stays in service/core)
|
||||
from libs.datetime_utils import older_than_days
|
||||
|
||||
def should_archive_conversation(conversation, tenant_id: str) -> bool:
|
||||
threshold_days = 90 if has_paid_plan(tenant_id) else 30
|
||||
return older_than_days(conversation.idle_days, threshold_days)
|
||||
```
|
||||
157
.agents/skills/backend-code-review/references/db-schema-rule.md
Normal file
157
.agents/skills/backend-code-review/references/db-schema-rule.md
Normal file
@@ -0,0 +1,157 @@
|
||||
# Rule Catalog — DB Schema Design
|
||||
|
||||
## Scope
|
||||
- Covers: model/base inheritance, schema boundaries in model properties, tenant-aware schema design, index redundancy checks, dialect portability in models, and cross-database compatibility in migrations.
|
||||
- Does NOT cover: session lifecycle, transaction boundaries, and query execution patterns (handled by `sqlalchemy-rule.md`).
|
||||
|
||||
## Rules
|
||||
|
||||
### Do not query other tables inside `@property`
|
||||
- Category: [maintainability, performance]
|
||||
- Severity: critical
|
||||
- Description: A model `@property` must not open sessions or query other tables. This hides dependencies across models, tightly couples schema objects to data access, and can cause N+1 query explosions when iterating collections.
|
||||
- Suggested fix:
|
||||
- Keep model properties pure and local to already-loaded fields.
|
||||
- Move cross-table data fetching to service/repository methods.
|
||||
- For list/batch reads, fetch required related data explicitly (join/preload/bulk query) before rendering derived values.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
class Conversation(TypeBase):
|
||||
__tablename__ = "conversations"
|
||||
|
||||
@property
|
||||
def app_name(self) -> str:
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
app = session.execute(select(App).where(App.id == self.app_id)).scalar_one()
|
||||
return app.name
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
class Conversation(TypeBase):
|
||||
__tablename__ = "conversations"
|
||||
|
||||
@property
|
||||
def display_title(self) -> str:
|
||||
return self.name or "Untitled"
|
||||
|
||||
|
||||
# Service/repository layer performs explicit batch fetch for related App rows.
|
||||
```
|
||||
|
||||
### Prefer including `tenant_id` in model definitions
|
||||
- Category: maintainability
|
||||
- Severity: suggestion
|
||||
- Description: In multi-tenant domains, include `tenant_id` in schema definitions whenever the entity belongs to tenant-owned data. This improves data isolation safety and keeps future partitioning/sharding strategies practical as data volume grows.
|
||||
- Suggested fix:
|
||||
- Add a `tenant_id` column and ensure related unique/index constraints include tenant dimension when applicable.
|
||||
- Propagate `tenant_id` through service/repository contracts to keep access paths tenant-aware.
|
||||
- Exception: if a table is explicitly designed as non-tenant-scoped global metadata, document that design decision clearly.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
from sqlalchemy.orm import Mapped
|
||||
|
||||
class Dataset(TypeBase):
|
||||
__tablename__ = "datasets"
|
||||
id: Mapped[str] = mapped_column(StringUUID, primary_key=True)
|
||||
name: Mapped[str] = mapped_column(sa.String(255), nullable=False)
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
from sqlalchemy.orm import Mapped
|
||||
|
||||
class Dataset(TypeBase):
|
||||
__tablename__ = "datasets"
|
||||
id: Mapped[str] = mapped_column(StringUUID, primary_key=True)
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False, index=True)
|
||||
name: Mapped[str] = mapped_column(sa.String(255), nullable=False)
|
||||
```
|
||||
|
||||
### Detect and avoid duplicate/redundant indexes
|
||||
- Category: performance
|
||||
- Severity: suggestion
|
||||
- Description: Review index definitions for leftmost-prefix redundancy. For example, index `(a, b, c)` can safely cover most lookups for `(a, b)`. Keeping both may increase write overhead and can mislead the optimizer into suboptimal execution plans.
|
||||
- Suggested fix:
|
||||
- Before adding an index, compare against existing composite indexes by leftmost-prefix rules.
|
||||
- Drop or avoid creating redundant prefixes unless there is a proven query-pattern need.
|
||||
- Apply the same review standard in both model `__table_args__` and migration index DDL.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
__table_args__ = (
|
||||
sa.Index("idx_msg_tenant_app", "tenant_id", "app_id"),
|
||||
sa.Index("idx_msg_tenant_app_created", "tenant_id", "app_id", "created_at"),
|
||||
)
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
__table_args__ = (
|
||||
# Keep the wider index unless profiling proves a dedicated short index is needed.
|
||||
sa.Index("idx_msg_tenant_app_created", "tenant_id", "app_id", "created_at"),
|
||||
)
|
||||
```
|
||||
|
||||
### Avoid PostgreSQL-only dialect usage in models; wrap in `models.types`
|
||||
- Category: maintainability
|
||||
- Severity: critical
|
||||
- Description: Model/schema definitions should avoid PostgreSQL-only constructs directly in business models. When database-specific behavior is required, encapsulate it in `api/models/types.py` using both PostgreSQL and MySQL dialect implementations, then consume that abstraction from model code.
|
||||
- Suggested fix:
|
||||
- Do not directly place dialect-only types/operators in model columns when a portable wrapper can be used.
|
||||
- Add or extend wrappers in `models.types` (for example, `AdjustedJSON`, `LongText`, `BinaryData`) to normalize behavior across PostgreSQL and MySQL.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped
|
||||
|
||||
class ToolConfig(TypeBase):
|
||||
__tablename__ = "tool_configs"
|
||||
config: Mapped[dict] = mapped_column(JSONB, nullable=False)
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
from sqlalchemy.orm import Mapped
|
||||
|
||||
from models.types import AdjustedJSON
|
||||
|
||||
class ToolConfig(TypeBase):
|
||||
__tablename__ = "tool_configs"
|
||||
config: Mapped[dict] = mapped_column(AdjustedJSON(), nullable=False)
|
||||
```
|
||||
|
||||
### Guard migration incompatibilities with dialect checks and shared types
|
||||
- Category: maintainability
|
||||
- Severity: critical
|
||||
- Description: Migration scripts under `api/migrations/versions/` must account for PostgreSQL/MySQL incompatibilities explicitly. For dialect-sensitive DDL or defaults, branch on the active dialect (for example, `conn.dialect.name == "postgresql"`), and prefer reusable compatibility abstractions from `models.types` where applicable.
|
||||
- Suggested fix:
|
||||
- In migration upgrades/downgrades, bind connection and branch by dialect for incompatible SQL fragments.
|
||||
- Reuse `models.types` wrappers in column definitions when that keeps behavior aligned with runtime models.
|
||||
- Avoid one-dialect-only migration logic unless there is a documented, deliberate compatibility exception.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
with op.batch_alter_table("dataset_keyword_tables") as batch_op:
|
||||
batch_op.add_column(
|
||||
sa.Column(
|
||||
"data_source_type",
|
||||
sa.String(255),
|
||||
server_default=sa.text("'database'::character varying"),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
def _is_pg(conn) -> bool:
|
||||
return conn.dialect.name == "postgresql"
|
||||
|
||||
|
||||
conn = op.get_bind()
|
||||
default_expr = sa.text("'database'::character varying") if _is_pg(conn) else sa.text("'database'")
|
||||
|
||||
with op.batch_alter_table("dataset_keyword_tables") as batch_op:
|
||||
batch_op.add_column(
|
||||
sa.Column("data_source_type", sa.String(255), server_default=default_expr, nullable=False)
|
||||
)
|
||||
```
|
||||
@@ -0,0 +1,61 @@
|
||||
# Rule Catalog - Repositories Abstraction
|
||||
|
||||
## Scope
|
||||
- Covers: when to reuse existing repository abstractions, when to introduce new repositories, and how to preserve dependency direction between service/core and infrastructure implementations.
|
||||
- Does NOT cover: SQLAlchemy session lifecycle and query-shape specifics (handled by `sqlalchemy-rule.md`), and table schema/migration design (handled by `db-schema-rule.md`).
|
||||
|
||||
## Rules
|
||||
|
||||
### Introduce repositories abstraction
|
||||
- Category: maintainability
|
||||
- Severity: suggestion
|
||||
- Description: If a table/model already has a repository abstraction, all reads/writes/queries for that table should use the existing repository. If no repository exists, introduce one only when complexity justifies it, such as large/high-volume tables, repeated complex query logic, or likely storage-strategy variation.
|
||||
- Suggested fix:
|
||||
- First check `api/repositories`, `api/core/repositories`, and `api/extensions/*/repositories/` to verify whether the table/model already has a repository abstraction. If it exists, route all operations through it and add missing repository methods instead of bypassing it with ad-hoc SQLAlchemy access.
|
||||
- If no repository exists, add one only when complexity warrants it (for example, repeated complex queries, large data domains, or multiple storage strategies), while preserving dependency direction (service/core depends on abstraction; infra provides implementation).
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
# Existing repository is ignored and service uses ad-hoc table queries.
|
||||
class AppService:
|
||||
def archive_app(self, app_id: str, tenant_id: str) -> None:
|
||||
app = self.session.execute(
|
||||
select(App).where(App.id == app_id, App.tenant_id == tenant_id)
|
||||
).scalar_one()
|
||||
app.archived = True
|
||||
self.session.commit()
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
# Case A: Existing repository must be reused for all table operations.
|
||||
class AppService:
|
||||
def archive_app(self, app_id: str, tenant_id: str) -> None:
|
||||
app = self.app_repo.get_by_id(app_id=app_id, tenant_id=tenant_id)
|
||||
app.archived = True
|
||||
self.app_repo.save(app)
|
||||
|
||||
# If the query is missing, extend the existing abstraction.
|
||||
active_apps = self.app_repo.list_active_for_tenant(tenant_id=tenant_id)
|
||||
```
|
||||
- Bad:
|
||||
```python
|
||||
# No repository exists, but large-domain query logic is scattered in service code.
|
||||
class ConversationService:
|
||||
def list_recent_for_app(self, app_id: str, tenant_id: str, limit: int) -> list[Conversation]:
|
||||
...
|
||||
# many filters/joins/pagination variants duplicated across services
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
# Case B: Introduce repository for large/complex domains or storage variation.
|
||||
class ConversationRepository(Protocol):
|
||||
def list_recent_for_app(self, app_id: str, tenant_id: str, limit: int) -> list[Conversation]: ...
|
||||
|
||||
class SqlAlchemyConversationRepository:
|
||||
def list_recent_for_app(self, app_id: str, tenant_id: str, limit: int) -> list[Conversation]:
|
||||
...
|
||||
|
||||
class ConversationService:
|
||||
def __init__(self, conversation_repo: ConversationRepository):
|
||||
self.conversation_repo = conversation_repo
|
||||
```
|
||||
139
.agents/skills/backend-code-review/references/sqlalchemy-rule.md
Normal file
139
.agents/skills/backend-code-review/references/sqlalchemy-rule.md
Normal file
@@ -0,0 +1,139 @@
|
||||
# Rule Catalog — SQLAlchemy Patterns
|
||||
|
||||
## Scope
|
||||
- Covers: SQLAlchemy session and transaction lifecycle, query construction, tenant scoping, raw SQL boundaries, and write-path concurrency safeguards.
|
||||
- Does NOT cover: table/model schema and migration design details (handled by `db-schema-rule.md`).
|
||||
|
||||
## Rules
|
||||
|
||||
### Use Session context manager with explicit transaction control behavior
|
||||
- Category: best practices
|
||||
- Severity: critical
|
||||
- Description: Session and transaction lifecycle must be explicit and bounded on write paths. Missing commits can silently drop intended updates, while ad-hoc or long-lived transactions increase contention, lock duration, and deadlock risk.
|
||||
- Suggested fix:
|
||||
- Use **explicit `session.commit()`** after completing a related write unit.
|
||||
- Or use **`session.begin()` context manager** for automatic commit/rollback on a scoped block.
|
||||
- Keep transaction windows short: avoid network I/O, heavy computation, or unrelated work inside the transaction.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
# Missing commit: write may never be persisted.
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
run = session.get(WorkflowRun, run_id)
|
||||
run.status = "cancelled"
|
||||
|
||||
# Long transaction: external I/O inside a DB transaction.
|
||||
with Session(db.engine, expire_on_commit=False) as session, session.begin():
|
||||
run = session.get(WorkflowRun, run_id)
|
||||
run.status = "cancelled"
|
||||
call_external_api()
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
# Option 1: explicit commit.
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
run = session.get(WorkflowRun, run_id)
|
||||
run.status = "cancelled"
|
||||
session.commit()
|
||||
|
||||
# Option 2: scoped transaction with automatic commit/rollback.
|
||||
with Session(db.engine, expire_on_commit=False) as session, session.begin():
|
||||
run = session.get(WorkflowRun, run_id)
|
||||
run.status = "cancelled"
|
||||
|
||||
# Keep non-DB work outside transaction scope.
|
||||
call_external_api()
|
||||
```
|
||||
|
||||
### Enforce tenant_id scoping on shared-resource queries
|
||||
- Category: security
|
||||
- Severity: critical
|
||||
- Description: Reads and writes against shared tables must be scoped by `tenant_id` to prevent cross-tenant data leakage or corruption.
|
||||
- Suggested fix: Add `tenant_id` predicate to all tenant-owned entity queries and propagate tenant context through service/repository interfaces.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
stmt = select(Workflow).where(Workflow.id == workflow_id)
|
||||
workflow = session.execute(stmt).scalar_one_or_none()
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
stmt = select(Workflow).where(
|
||||
Workflow.id == workflow_id,
|
||||
Workflow.tenant_id == tenant_id,
|
||||
)
|
||||
workflow = session.execute(stmt).scalar_one_or_none()
|
||||
```
|
||||
|
||||
### Prefer SQLAlchemy expressions over raw SQL by default
|
||||
- Category: maintainability
|
||||
- Severity: suggestion
|
||||
- Description: Raw SQL should be exceptional. ORM/Core expressions are easier to evolve, safer to compose, and more consistent with the codebase.
|
||||
- Suggested fix: Rewrite straightforward raw SQL into SQLAlchemy `select/update/delete` expressions; keep raw SQL only when required by clear technical constraints.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
row = session.execute(
|
||||
text("SELECT * FROM workflows WHERE id = :id AND tenant_id = :tenant_id"),
|
||||
{"id": workflow_id, "tenant_id": tenant_id},
|
||||
).first()
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
stmt = select(Workflow).where(
|
||||
Workflow.id == workflow_id,
|
||||
Workflow.tenant_id == tenant_id,
|
||||
)
|
||||
row = session.execute(stmt).scalar_one_or_none()
|
||||
```
|
||||
|
||||
### Protect write paths with concurrency safeguards
|
||||
- Category: quality
|
||||
- Severity: critical
|
||||
- Description: Multi-writer paths without explicit concurrency control can silently overwrite data. Choose the safeguard based on contention level, lock scope, and throughput cost instead of defaulting to one strategy.
|
||||
- Suggested fix:
|
||||
- **Optimistic locking**: Use when contention is usually low and retries are acceptable. Add a version (or updated_at) guard in `WHERE` and treat `rowcount == 0` as a conflict.
|
||||
- **Redis distributed lock**: Use when the critical section spans multiple steps/processes (or includes non-DB side effects) and you need cross-worker mutual exclusion.
|
||||
- **SELECT ... FOR UPDATE**: Use when contention is high on the same rows and strict in-transaction serialization is required. Keep transactions short to reduce lock wait/deadlock risk.
|
||||
- In all cases, scope by `tenant_id` and verify affected row counts for conditional writes.
|
||||
- Example:
|
||||
- Bad:
|
||||
```python
|
||||
# No tenant scope, no conflict detection, and no lock on a contested write path.
|
||||
session.execute(update(WorkflowRun).where(WorkflowRun.id == run_id).values(status="cancelled"))
|
||||
session.commit() # silently overwrites concurrent updates
|
||||
```
|
||||
- Good:
|
||||
```python
|
||||
# 1) Optimistic lock (low contention, retry on conflict)
|
||||
result = session.execute(
|
||||
update(WorkflowRun)
|
||||
.where(
|
||||
WorkflowRun.id == run_id,
|
||||
WorkflowRun.tenant_id == tenant_id,
|
||||
WorkflowRun.version == expected_version,
|
||||
)
|
||||
.values(status="cancelled", version=WorkflowRun.version + 1)
|
||||
)
|
||||
if result.rowcount == 0:
|
||||
raise WorkflowStateConflictError("stale version, retry")
|
||||
|
||||
# 2) Redis distributed lock (cross-worker critical section)
|
||||
lock_name = f"workflow_run_lock:{tenant_id}:{run_id}"
|
||||
with redis_client.lock(lock_name, timeout=20):
|
||||
session.execute(
|
||||
update(WorkflowRun)
|
||||
.where(WorkflowRun.id == run_id, WorkflowRun.tenant_id == tenant_id)
|
||||
.values(status="cancelled")
|
||||
)
|
||||
session.commit()
|
||||
|
||||
# 3) Pessimistic lock with SELECT ... FOR UPDATE (high contention)
|
||||
run = session.execute(
|
||||
select(WorkflowRun)
|
||||
.where(WorkflowRun.id == run_id, WorkflowRun.tenant_id == tenant_id)
|
||||
.with_for_update()
|
||||
).scalar_one()
|
||||
run.status = "cancelled"
|
||||
session.commit()
|
||||
```
|
||||
1
.claude/skills/backend-code-review
Symbolic link
1
.claude/skills/backend-code-review
Symbolic link
@@ -0,0 +1 @@
|
||||
../../.agents/skills/backend-code-review
|
||||
23
.github/dependabot.yml
vendored
23
.github/dependabot.yml
vendored
@@ -1,12 +1,25 @@
|
||||
version: 2
|
||||
|
||||
multi-ecosystem-groups:
|
||||
python:
|
||||
schedule:
|
||||
interval: "weekly" # or whatever schedule you want
|
||||
|
||||
updates:
|
||||
- package-ecosystem: "pip"
|
||||
directory: "/api"
|
||||
open-pull-requests-limit: 2
|
||||
patterns: ["*"]
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
- package-ecosystem: "uv"
|
||||
directory: "/api"
|
||||
open-pull-requests-limit: 2
|
||||
patterns: ["*"]
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
- package-ecosystem: "npm"
|
||||
directory: "/web"
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
open-pull-requests-limit: 2
|
||||
- package-ecosystem: "uv"
|
||||
directory: "/api"
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
open-pull-requests-limit: 2
|
||||
|
||||
8
.github/workflows/deploy-hitl.yml
vendored
8
.github/workflows/deploy-hitl.yml
vendored
@@ -4,8 +4,7 @@ on:
|
||||
workflow_run:
|
||||
workflows: ["Build and Push API & Web"]
|
||||
branches:
|
||||
- "feat/hitl-frontend"
|
||||
- "feat/hitl-backend"
|
||||
- "build/feat/hitl"
|
||||
types:
|
||||
- completed
|
||||
|
||||
@@ -14,10 +13,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
if: |
|
||||
github.event.workflow_run.conclusion == 'success' &&
|
||||
(
|
||||
github.event.workflow_run.head_branch == 'feat/hitl-frontend' ||
|
||||
github.event.workflow_run.head_branch == 'feat/hitl-backend'
|
||||
)
|
||||
github.event.workflow_run.head_branch == 'build/feat/hitl'
|
||||
steps:
|
||||
- name: Deploy to server
|
||||
uses: appleboy/ssh-action@v1
|
||||
|
||||
88
.github/workflows/pyrefly-diff-comment.yml
vendored
Normal file
88
.github/workflows/pyrefly-diff-comment.yml
vendored
Normal file
@@ -0,0 +1,88 @@
|
||||
name: Comment with Pyrefly Diff
|
||||
|
||||
on:
|
||||
workflow_run:
|
||||
workflows:
|
||||
- Pyrefly Diff Check
|
||||
types:
|
||||
- completed
|
||||
|
||||
permissions: {}
|
||||
|
||||
jobs:
|
||||
comment:
|
||||
name: Comment PR with pyrefly diff
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
actions: read
|
||||
contents: read
|
||||
issues: write
|
||||
pull-requests: write
|
||||
if: ${{ github.event.workflow_run.conclusion == 'success' && github.event.workflow_run.pull_requests[0].head.repo.full_name != github.repository }}
|
||||
steps:
|
||||
- name: Download pyrefly diff artifact
|
||||
uses: actions/github-script@v8
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
const fs = require('fs');
|
||||
const artifacts = await github.rest.actions.listWorkflowRunArtifacts({
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
run_id: ${{ github.event.workflow_run.id }},
|
||||
});
|
||||
const match = artifacts.data.artifacts.find((artifact) =>
|
||||
artifact.name === 'pyrefly_diff'
|
||||
);
|
||||
if (!match) {
|
||||
throw new Error('pyrefly_diff artifact not found');
|
||||
}
|
||||
const download = await github.rest.actions.downloadArtifact({
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
artifact_id: match.id,
|
||||
archive_format: 'zip',
|
||||
});
|
||||
fs.writeFileSync('pyrefly_diff.zip', Buffer.from(download.data));
|
||||
|
||||
- name: Unzip artifact
|
||||
run: unzip -o pyrefly_diff.zip
|
||||
|
||||
- name: Post comment
|
||||
uses: actions/github-script@v8
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
const fs = require('fs');
|
||||
let diff = fs.readFileSync('pyrefly_diff.txt', { encoding: 'utf8' });
|
||||
let prNumber = null;
|
||||
try {
|
||||
prNumber = parseInt(fs.readFileSync('pr_number.txt', { encoding: 'utf8' }), 10);
|
||||
} catch (err) {
|
||||
// Fallback to workflow_run payload if artifact is missing or incomplete.
|
||||
const prs = context.payload.workflow_run.pull_requests || [];
|
||||
if (prs.length > 0 && prs[0].number) {
|
||||
prNumber = prs[0].number;
|
||||
}
|
||||
}
|
||||
if (!prNumber) {
|
||||
throw new Error('PR number not found in artifact or workflow_run payload');
|
||||
}
|
||||
|
||||
const MAX_CHARS = 65000;
|
||||
if (diff.length > MAX_CHARS) {
|
||||
diff = diff.slice(0, MAX_CHARS);
|
||||
diff = diff.slice(0, diff.lastIndexOf('\\n'));
|
||||
diff += '\\n\\n... (truncated) ...';
|
||||
}
|
||||
|
||||
const body = diff.trim()
|
||||
? '### Pyrefly Diff\n<details>\n<summary>base → PR</summary>\n\n```diff\n' + diff + '\n```\n</details>'
|
||||
: '### Pyrefly Diff\nNo changes detected.';
|
||||
|
||||
await github.rest.issues.createComment({
|
||||
issue_number: prNumber,
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
body,
|
||||
});
|
||||
94
.github/workflows/pyrefly-diff.yml
vendored
Normal file
94
.github/workflows/pyrefly-diff.yml
vendored
Normal file
@@ -0,0 +1,94 @@
|
||||
name: Pyrefly Diff Check
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
paths:
|
||||
- 'api/**/*.py'
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
pyrefly-diff:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
issues: write
|
||||
pull-requests: write
|
||||
steps:
|
||||
- name: Checkout PR branch
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Setup Python & UV
|
||||
uses: astral-sh/setup-uv@v5
|
||||
with:
|
||||
enable-cache: true
|
||||
|
||||
- name: Install dependencies
|
||||
run: uv sync --project api --dev
|
||||
|
||||
- name: Run pyrefly on PR branch
|
||||
run: |
|
||||
uv run --directory api pyrefly check > /tmp/pyrefly_pr.txt 2>&1 || true
|
||||
|
||||
- name: Checkout base branch
|
||||
run: git checkout ${{ github.base_ref }}
|
||||
|
||||
- name: Run pyrefly on base branch
|
||||
run: |
|
||||
uv run --directory api pyrefly check > /tmp/pyrefly_base.txt 2>&1 || true
|
||||
|
||||
- name: Compute diff
|
||||
run: |
|
||||
diff /tmp/pyrefly_base.txt /tmp/pyrefly_pr.txt > pyrefly_diff.txt || true
|
||||
|
||||
- name: Save PR number
|
||||
run: |
|
||||
echo ${{ github.event.pull_request.number }} > pr_number.txt
|
||||
|
||||
- name: Upload pyrefly diff
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: pyrefly_diff
|
||||
path: |
|
||||
pyrefly_diff.txt
|
||||
pr_number.txt
|
||||
|
||||
- name: Comment PR with pyrefly diff
|
||||
if: ${{ github.event.pull_request.head.repo.full_name == github.repository }}
|
||||
uses: actions/github-script@v8
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
const fs = require('fs');
|
||||
let diff = fs.readFileSync('pyrefly_diff.txt', { encoding: 'utf8' });
|
||||
const prNumber = context.payload.pull_request.number;
|
||||
|
||||
const MAX_CHARS = 65000;
|
||||
if (diff.length > MAX_CHARS) {
|
||||
diff = diff.slice(0, MAX_CHARS);
|
||||
diff = diff.slice(0, diff.lastIndexOf('\n'));
|
||||
diff += '\n\n... (truncated) ...';
|
||||
}
|
||||
|
||||
const body = diff.trim()
|
||||
? [
|
||||
'### Pyrefly Diff',
|
||||
'<details>',
|
||||
'<summary>base → PR</summary>',
|
||||
'',
|
||||
'```diff',
|
||||
diff,
|
||||
'```',
|
||||
'</details>',
|
||||
].join('\n')
|
||||
: '### Pyrefly Diff\nNo changes detected.';
|
||||
|
||||
await github.rest.issues.createComment({
|
||||
issue_number: prNumber,
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
body,
|
||||
});
|
||||
63
.github/workflows/web-tests.yml
vendored
63
.github/workflows/web-tests.yml
vendored
@@ -3,14 +3,22 @@ name: Web Tests
|
||||
on:
|
||||
workflow_call:
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
concurrency:
|
||||
group: web-tests-${{ github.head_ref || github.run_id }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: Web Tests
|
||||
name: Web Tests (${{ matrix.shardIndex }}/${{ matrix.shardTotal }})
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
shardIndex: [1, 2, 3, 4]
|
||||
shardTotal: [4]
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
@@ -39,7 +47,58 @@ jobs:
|
||||
run: pnpm install --frozen-lockfile
|
||||
|
||||
- name: Run tests
|
||||
run: pnpm test:ci
|
||||
run: pnpm vitest run --reporter=blob --shard=${{ matrix.shardIndex }}/${{ matrix.shardTotal }} --coverage
|
||||
|
||||
- name: Upload blob report
|
||||
if: ${{ !cancelled() }}
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: blob-report-${{ matrix.shardIndex }}
|
||||
path: web/.vitest-reports/*
|
||||
include-hidden-files: true
|
||||
retention-days: 1
|
||||
|
||||
merge-reports:
|
||||
name: Merge Test Reports
|
||||
if: ${{ !cancelled() }}
|
||||
needs: [test]
|
||||
runs-on: ubuntu-latest
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
working-directory: ./web
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v6
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Install pnpm
|
||||
uses: pnpm/action-setup@v4
|
||||
with:
|
||||
package_json_file: web/package.json
|
||||
run_install: false
|
||||
|
||||
- name: Setup Node.js
|
||||
uses: actions/setup-node@v6
|
||||
with:
|
||||
node-version: 24
|
||||
cache: pnpm
|
||||
cache-dependency-path: ./web/pnpm-lock.yaml
|
||||
|
||||
- name: Install dependencies
|
||||
run: pnpm install --frozen-lockfile
|
||||
|
||||
- name: Download blob reports
|
||||
uses: actions/download-artifact@v6
|
||||
with:
|
||||
path: web/.vitest-reports
|
||||
pattern: blob-report-*
|
||||
merge-multiple: true
|
||||
|
||||
- name: Merge reports
|
||||
run: pnpm vitest --merge-reports --coverage --silent=passed-only
|
||||
|
||||
- name: Coverage Summary
|
||||
if: always()
|
||||
|
||||
2
.vscode/launch.json.template
vendored
2
.vscode/launch.json.template
vendored
@@ -37,7 +37,7 @@
|
||||
"-c",
|
||||
"1",
|
||||
"-Q",
|
||||
"dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention",
|
||||
"dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution",
|
||||
"--loglevel",
|
||||
"INFO"
|
||||
],
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||

|
||||
|
||||
<p align="center">
|
||||
📌 <a href="https://dify.ai/blog/introducing-dify-workflow-file-upload-a-demo-on-ai-podcast">Introducing Dify Workflow File Upload: Recreate Google NotebookLM Podcast</a>
|
||||
</p>
|
||||
|
||||
<p align="center">
|
||||
<a href="https://cloud.dify.ai">Dify Cloud</a> ·
|
||||
<a href="https://docs.dify.ai/getting-started/install-self-hosted">Self-hosting</a> ·
|
||||
|
||||
@@ -553,6 +553,8 @@ WORKFLOW_LOG_CLEANUP_ENABLED=false
|
||||
WORKFLOW_LOG_RETENTION_DAYS=30
|
||||
# Batch size for workflow log cleanup operations (default: 100)
|
||||
WORKFLOW_LOG_CLEANUP_BATCH_SIZE=100
|
||||
# Comma-separated list of workflow IDs to clean logs for
|
||||
WORKFLOW_LOG_CLEANUP_SPECIFIC_WORKFLOW_IDS=
|
||||
|
||||
# App configuration
|
||||
APP_MAX_EXECUTION_TIME=1200
|
||||
@@ -715,6 +717,7 @@ ANNOTATION_IMPORT_MAX_CONCURRENT=5
|
||||
# Sandbox expired records clean configuration
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200
|
||||
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000
|
||||
|
||||
|
||||
@@ -50,16 +50,11 @@ forbidden_modules =
|
||||
allow_indirect_imports = True
|
||||
ignore_imports =
|
||||
core.workflow.nodes.agent.agent_node -> extensions.ext_database
|
||||
core.workflow.nodes.datasource.datasource_node -> extensions.ext_database
|
||||
core.workflow.nodes.knowledge_index.knowledge_index_node -> extensions.ext_database
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_database
|
||||
core.workflow.nodes.llm.file_saver -> extensions.ext_database
|
||||
core.workflow.nodes.llm.llm_utils -> extensions.ext_database
|
||||
core.workflow.nodes.llm.node -> extensions.ext_database
|
||||
core.workflow.nodes.tool.tool_node -> extensions.ext_database
|
||||
core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis
|
||||
core.workflow.graph_engine.manager -> extensions.ext_redis
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_redis
|
||||
# TODO(QuantumGhost): use DI to avoid depending on global DB.
|
||||
core.workflow.nodes.human_input.human_input_node -> extensions.ext_database
|
||||
|
||||
@@ -93,7 +88,6 @@ forbidden_modules =
|
||||
core.logging
|
||||
core.mcp
|
||||
core.memory
|
||||
core.model_manager
|
||||
core.moderation
|
||||
core.ops
|
||||
core.plugin
|
||||
@@ -106,37 +100,19 @@ forbidden_modules =
|
||||
core.trigger
|
||||
core.variables
|
||||
ignore_imports =
|
||||
core.workflow.nodes.agent.agent_node -> core.db.session_factory
|
||||
core.workflow.nodes.agent.agent_node -> models.tools
|
||||
core.workflow.nodes.loop.loop_node -> core.app.workflow.node_factory
|
||||
core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis
|
||||
core.workflow.workflow_entry -> core.app.workflow.layers.observability
|
||||
core.workflow.nodes.agent.agent_node -> core.model_manager
|
||||
core.workflow.nodes.agent.agent_node -> core.provider_manager
|
||||
core.workflow.nodes.agent.agent_node -> core.tools.tool_manager
|
||||
core.workflow.nodes.code.code_node -> core.helper.code_executor.code_executor
|
||||
core.workflow.nodes.datasource.datasource_node -> models.model
|
||||
core.workflow.nodes.datasource.datasource_node -> models.tools
|
||||
core.workflow.nodes.datasource.datasource_node -> services.datasource_provider_service
|
||||
core.workflow.nodes.document_extractor.node -> configs
|
||||
core.workflow.nodes.document_extractor.node -> core.file.file_manager
|
||||
core.workflow.nodes.document_extractor.node -> core.helper.ssrf_proxy
|
||||
core.workflow.nodes.http_request.entities -> configs
|
||||
core.workflow.nodes.http_request.executor -> configs
|
||||
core.workflow.nodes.http_request.executor -> core.file.file_manager
|
||||
core.workflow.nodes.http_request.node -> configs
|
||||
core.workflow.nodes.http_request.node -> core.tools.tool_file_manager
|
||||
core.workflow.nodes.iteration.iteration_node -> core.app.workflow.node_factory
|
||||
core.workflow.nodes.knowledge_index.knowledge_index_node -> core.rag.index_processor.index_processor_factory
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.rag.datasource.retrieval_service
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.rag.retrieval.dataset_retrieval
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> models.dataset
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> services.feature_service
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.model_runtime.model_providers.__base.large_language_model
|
||||
core.workflow.nodes.llm.llm_utils -> configs
|
||||
core.workflow.nodes.llm.llm_utils -> core.app.entities.app_invoke_entities
|
||||
core.workflow.nodes.llm.llm_utils -> core.file.models
|
||||
core.workflow.nodes.llm.llm_utils -> core.model_manager
|
||||
core.workflow.nodes.llm.protocols -> core.model_manager
|
||||
core.workflow.nodes.llm.llm_utils -> core.model_runtime.model_providers.__base.large_language_model
|
||||
core.workflow.nodes.llm.llm_utils -> models.model
|
||||
core.workflow.nodes.llm.llm_utils -> models.provider
|
||||
@@ -153,7 +129,6 @@ ignore_imports =
|
||||
core.workflow.nodes.human_input.human_input_node -> core.app.entities.app_invoke_entities
|
||||
core.workflow.nodes.knowledge_index.knowledge_index_node -> core.app.entities.app_invoke_entities
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.app_config.entities
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.entities.app_invoke_entities
|
||||
core.workflow.nodes.llm.node -> core.app.entities.app_invoke_entities
|
||||
core.workflow.nodes.parameter_extractor.parameter_extractor_node -> core.app.entities.app_invoke_entities
|
||||
core.workflow.nodes.parameter_extractor.parameter_extractor_node -> core.prompt.advanced_prompt_transform
|
||||
@@ -167,50 +142,18 @@ ignore_imports =
|
||||
core.workflow.workflow_entry -> core.app.apps.exc
|
||||
core.workflow.workflow_entry -> core.app.entities.app_invoke_entities
|
||||
core.workflow.workflow_entry -> core.app.workflow.node_factory
|
||||
core.workflow.nodes.datasource.datasource_node -> core.datasource.datasource_manager
|
||||
core.workflow.nodes.datasource.datasource_node -> core.datasource.utils.message_transformer
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.entities.agent_entities
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.entities.model_entities
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.model_manager
|
||||
core.workflow.nodes.llm.llm_utils -> core.entities.provider_entities
|
||||
core.workflow.nodes.parameter_extractor.parameter_extractor_node -> core.model_manager
|
||||
core.workflow.nodes.question_classifier.question_classifier_node -> core.model_manager
|
||||
core.workflow.node_events.node -> core.file
|
||||
core.workflow.nodes.agent.agent_node -> core.file
|
||||
core.workflow.nodes.datasource.datasource_node -> core.file
|
||||
core.workflow.nodes.datasource.datasource_node -> core.file.enums
|
||||
core.workflow.nodes.document_extractor.node -> core.file
|
||||
core.workflow.nodes.http_request.executor -> core.file.enums
|
||||
core.workflow.nodes.http_request.node -> core.file
|
||||
core.workflow.nodes.http_request.node -> core.file.file_manager
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.file.models
|
||||
core.workflow.nodes.list_operator.node -> core.file
|
||||
core.workflow.nodes.llm.file_saver -> core.file
|
||||
core.workflow.nodes.llm.llm_utils -> core.variables.segments
|
||||
core.workflow.nodes.llm.node -> core.file
|
||||
core.workflow.nodes.llm.node -> core.file.file_manager
|
||||
core.workflow.nodes.llm.node -> core.file.models
|
||||
core.workflow.nodes.loop.entities -> core.variables.types
|
||||
core.workflow.nodes.parameter_extractor.parameter_extractor_node -> core.file
|
||||
core.workflow.nodes.protocols -> core.file
|
||||
core.workflow.nodes.question_classifier.question_classifier_node -> core.file.models
|
||||
core.workflow.nodes.tool.tool_node -> core.file
|
||||
core.workflow.nodes.tool.tool_node -> core.tools.utils.message_transformer
|
||||
core.workflow.nodes.tool.tool_node -> models
|
||||
core.workflow.nodes.trigger_webhook.node -> core.file
|
||||
core.workflow.runtime.variable_pool -> core.file
|
||||
core.workflow.runtime.variable_pool -> core.file.file_manager
|
||||
core.workflow.system_variable -> core.file.models
|
||||
core.workflow.utils.condition.processor -> core.file
|
||||
core.workflow.utils.condition.processor -> core.file.file_manager
|
||||
core.workflow.workflow_entry -> core.file.models
|
||||
core.workflow.workflow_type_encoder -> core.file.models
|
||||
core.workflow.nodes.agent.agent_node -> models.model
|
||||
core.workflow.nodes.code.code_node -> core.helper.code_executor.code_node_provider
|
||||
core.workflow.nodes.code.code_node -> core.helper.code_executor.javascript.javascript_code_provider
|
||||
core.workflow.nodes.code.code_node -> core.helper.code_executor.python3.python3_code_provider
|
||||
core.workflow.nodes.code.entities -> core.helper.code_executor.code_executor
|
||||
core.workflow.nodes.datasource.datasource_node -> core.variables.variables
|
||||
core.workflow.nodes.http_request.executor -> core.helper.ssrf_proxy
|
||||
core.workflow.nodes.http_request.node -> core.helper.ssrf_proxy
|
||||
core.workflow.nodes.llm.file_saver -> core.helper.ssrf_proxy
|
||||
@@ -220,7 +163,6 @@ ignore_imports =
|
||||
core.workflow.nodes.llm.node -> core.llm_generator.output_parser.structured_output
|
||||
core.workflow.nodes.llm.node -> core.model_manager
|
||||
core.workflow.nodes.agent.entities -> core.prompt.entities.advanced_prompt_entities
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.prompt.simple_prompt_transform
|
||||
core.workflow.nodes.llm.entities -> core.prompt.entities.advanced_prompt_entities
|
||||
core.workflow.nodes.llm.llm_utils -> core.prompt.entities.advanced_prompt_entities
|
||||
core.workflow.nodes.llm.node -> core.prompt.entities.advanced_prompt_entities
|
||||
@@ -236,7 +178,6 @@ ignore_imports =
|
||||
core.workflow.nodes.knowledge_index.knowledge_index_node -> services.summary_index_service
|
||||
core.workflow.nodes.knowledge_index.knowledge_index_node -> tasks.generate_summary_index_task
|
||||
core.workflow.nodes.knowledge_index.knowledge_index_node -> core.rag.index_processor.processor.paragraph_index_processor
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.rag.retrieval.retrieval_methods
|
||||
core.workflow.nodes.llm.node -> models.dataset
|
||||
core.workflow.nodes.agent.agent_node -> core.tools.utils.message_transformer
|
||||
core.workflow.nodes.llm.file_saver -> core.tools.signature
|
||||
@@ -249,7 +190,6 @@ ignore_imports =
|
||||
core.workflow.nodes.code.code_node -> core.variables.segments
|
||||
core.workflow.nodes.code.code_node -> core.variables.types
|
||||
core.workflow.nodes.code.entities -> core.variables.types
|
||||
core.workflow.nodes.datasource.datasource_node -> core.variables.segments
|
||||
core.workflow.nodes.document_extractor.node -> core.variables
|
||||
core.workflow.nodes.document_extractor.node -> core.variables.segments
|
||||
core.workflow.nodes.http_request.executor -> core.variables.segments
|
||||
@@ -291,12 +231,8 @@ ignore_imports =
|
||||
core.workflow.variable_loader -> core.variables
|
||||
core.workflow.variable_loader -> core.variables.consts
|
||||
core.workflow.workflow_type_encoder -> core.variables
|
||||
core.workflow.graph_engine.manager -> extensions.ext_redis
|
||||
core.workflow.nodes.agent.agent_node -> extensions.ext_database
|
||||
core.workflow.nodes.datasource.datasource_node -> extensions.ext_database
|
||||
core.workflow.nodes.knowledge_index.knowledge_index_node -> extensions.ext_database
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_database
|
||||
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_redis
|
||||
core.workflow.nodes.llm.file_saver -> extensions.ext_database
|
||||
core.workflow.nodes.llm.llm_utils -> extensions.ext_database
|
||||
core.workflow.nodes.llm.node -> extensions.ext_database
|
||||
@@ -311,6 +247,11 @@ ignore_imports =
|
||||
core.workflow.workflow_entry -> models.enums
|
||||
core.workflow.nodes.agent.agent_node -> services
|
||||
core.workflow.nodes.tool.tool_node -> services
|
||||
core.workflow.nodes.agent.agent_node -> core.model_runtime.token_buffer_memory
|
||||
core.workflow.nodes.llm.llm_utils -> core.model_runtime.token_buffer_memory
|
||||
core.workflow.nodes.llm.node -> core.model_runtime.token_buffer_memory
|
||||
core.workflow.nodes.parameter_extractor.parameter_extractor_node -> core.model_runtime.token_buffer_memory
|
||||
core.workflow.nodes.question_classifier.question_classifier_node -> core.model_runtime.token_buffer_memory
|
||||
|
||||
[importlinter:contract:model-runtime-no-internal-imports]
|
||||
name = Model Runtime Internal Imports
|
||||
@@ -363,6 +304,13 @@ ignore_imports =
|
||||
core.model_runtime.model_providers.model_provider_factory -> configs
|
||||
core.model_runtime.model_providers.model_provider_factory -> extensions.ext_redis
|
||||
core.model_runtime.model_providers.model_provider_factory -> models.provider_ids
|
||||
core.model_runtime.token_buffer_memory -> core.app.app_config.features.file_upload.manager
|
||||
core.model_runtime.token_buffer_memory -> core.model_manager
|
||||
core.model_runtime.token_buffer_memory -> core.prompt.utils.extract_thread_messages
|
||||
core.model_runtime.token_buffer_memory -> core.workflow.file.file_manager
|
||||
core.model_runtime.token_buffer_memory -> extensions.ext_database
|
||||
core.model_runtime.token_buffer_memory -> models.model
|
||||
core.model_runtime.token_buffer_memory -> models.workflow
|
||||
|
||||
[importlinter:contract:rsc]
|
||||
name = RSC
|
||||
|
||||
2
api/.vscode/launch.json.example
vendored
2
api/.vscode/launch.json.example
vendored
@@ -54,7 +54,7 @@
|
||||
"--loglevel",
|
||||
"DEBUG",
|
||||
"-Q",
|
||||
"dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor"
|
||||
"dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,workflow_based_app_execution,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor"
|
||||
]
|
||||
}
|
||||
]
|
||||
|
||||
@@ -42,7 +42,7 @@ The scripts resolve paths relative to their location, so you can run them from a
|
||||
|
||||
1. Set up your application by visiting `http://localhost:3000`.
|
||||
|
||||
1. Optional: start the worker service (async tasks, runs from `api`).
|
||||
1. Start the worker service (async and scheduler tasks, runs from `api`).
|
||||
|
||||
```bash
|
||||
./dev/start-worker
|
||||
@@ -54,86 +54,6 @@ The scripts resolve paths relative to their location, so you can run them from a
|
||||
./dev/start-beat
|
||||
```
|
||||
|
||||
### Manual commands
|
||||
|
||||
<details>
|
||||
<summary>Show manual setup and run steps</summary>
|
||||
|
||||
These commands assume you start from the repository root.
|
||||
|
||||
1. Start the docker-compose stack.
|
||||
|
||||
The backend requires middleware, including PostgreSQL, Redis, and Weaviate, which can be started together using `docker-compose`.
|
||||
|
||||
```bash
|
||||
cp docker/middleware.env.example docker/middleware.env
|
||||
# Use mysql or another vector database profile if you are not using postgres/weaviate.
|
||||
docker compose -f docker/docker-compose.middleware.yaml --profile postgresql --profile weaviate -p dify up -d
|
||||
```
|
||||
|
||||
1. Copy env files.
|
||||
|
||||
```bash
|
||||
cp api/.env.example api/.env
|
||||
cp web/.env.example web/.env.local
|
||||
```
|
||||
|
||||
1. Install UV if needed.
|
||||
|
||||
```bash
|
||||
pip install uv
|
||||
# Or on macOS
|
||||
brew install uv
|
||||
```
|
||||
|
||||
1. Install API dependencies.
|
||||
|
||||
```bash
|
||||
cd api
|
||||
uv sync --group dev
|
||||
```
|
||||
|
||||
1. Install web dependencies.
|
||||
|
||||
```bash
|
||||
cd web
|
||||
pnpm install
|
||||
cd ..
|
||||
```
|
||||
|
||||
1. Start backend (runs migrations first, in a new terminal).
|
||||
|
||||
```bash
|
||||
cd api
|
||||
uv run flask db upgrade
|
||||
uv run flask run --host 0.0.0.0 --port=5001 --debug
|
||||
```
|
||||
|
||||
1. Start Dify [web](../web) service (in a new terminal).
|
||||
|
||||
```bash
|
||||
cd web
|
||||
pnpm dev:inspect
|
||||
```
|
||||
|
||||
1. Set up your application by visiting `http://localhost:3000`.
|
||||
|
||||
1. Optional: start the worker service (async tasks, in a new terminal).
|
||||
|
||||
```bash
|
||||
cd api
|
||||
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention
|
||||
```
|
||||
|
||||
1. Optional: start Celery Beat (scheduled tasks, in a new terminal).
|
||||
|
||||
```bash
|
||||
cd api
|
||||
uv run celery -A app.celery beat
|
||||
```
|
||||
|
||||
</details>
|
||||
|
||||
### Environment notes
|
||||
|
||||
> [!IMPORTANT]
|
||||
|
||||
@@ -30,6 +30,7 @@ from extensions.ext_redis import redis_client
|
||||
from extensions.ext_storage import storage
|
||||
from extensions.storage.opendal_storage import OpenDALStorage
|
||||
from extensions.storage.storage_type import StorageType
|
||||
from libs.db_migration_lock import DbMigrationAutoRenewLock
|
||||
from libs.helper import email as email_validate
|
||||
from libs.password import hash_password, password_pattern, valid_password
|
||||
from libs.rsa import generate_key_pair
|
||||
@@ -54,6 +55,8 @@ from tasks.remove_app_and_related_data_task import delete_draft_variables_batch
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DB_UPGRADE_LOCK_TTL_SECONDS = 60
|
||||
|
||||
|
||||
@click.command("reset-password", help="Reset the account password.")
|
||||
@click.option("--email", prompt=True, help="Account email to reset password for")
|
||||
@@ -727,8 +730,15 @@ def create_tenant(email: str, language: str | None = None, name: str | None = No
|
||||
@click.command("upgrade-db", help="Upgrade the database")
|
||||
def upgrade_db():
|
||||
click.echo("Preparing database migration...")
|
||||
lock = redis_client.lock(name="db_upgrade_lock", timeout=60)
|
||||
lock = DbMigrationAutoRenewLock(
|
||||
redis_client=redis_client,
|
||||
name="db_upgrade_lock",
|
||||
ttl_seconds=DB_UPGRADE_LOCK_TTL_SECONDS,
|
||||
logger=logger,
|
||||
log_context="db_migration",
|
||||
)
|
||||
if lock.acquire(blocking=False):
|
||||
migration_succeeded = False
|
||||
try:
|
||||
click.echo(click.style("Starting database migration.", fg="green"))
|
||||
|
||||
@@ -737,6 +747,7 @@ def upgrade_db():
|
||||
|
||||
flask_migrate.upgrade()
|
||||
|
||||
migration_succeeded = True
|
||||
click.echo(click.style("Database migration successful!", fg="green"))
|
||||
|
||||
except Exception as e:
|
||||
@@ -744,7 +755,8 @@ def upgrade_db():
|
||||
click.echo(click.style(f"Database migration failed: {e}", fg="red"))
|
||||
raise SystemExit(1)
|
||||
finally:
|
||||
lock.release()
|
||||
status = "successful" if migration_succeeded else "failed"
|
||||
lock.release_safely(status=status)
|
||||
else:
|
||||
click.echo("Database migration skipped")
|
||||
|
||||
|
||||
@@ -265,6 +265,11 @@ class PluginConfig(BaseSettings):
|
||||
default=60 * 60,
|
||||
)
|
||||
|
||||
PLUGIN_MAX_FILE_SIZE: PositiveInt = Field(
|
||||
description="Maximum allowed size (bytes) for plugin-generated files",
|
||||
default=50 * 1024 * 1024,
|
||||
)
|
||||
|
||||
|
||||
class MarketplaceConfig(BaseSettings):
|
||||
"""
|
||||
@@ -1180,6 +1185,16 @@ class CeleryScheduleTasksConfig(BaseSettings):
|
||||
default=0,
|
||||
)
|
||||
|
||||
# API token last_used_at batch update
|
||||
ENABLE_API_TOKEN_LAST_USED_UPDATE_TASK: bool = Field(
|
||||
description="Enable periodic batch update of API token last_used_at timestamps",
|
||||
default=True,
|
||||
)
|
||||
API_TOKEN_LAST_USED_UPDATE_INTERVAL: int = Field(
|
||||
description="Interval in minutes for batch updating API token last_used_at (default 30)",
|
||||
default=30,
|
||||
)
|
||||
|
||||
# Trigger provider refresh (simple version)
|
||||
ENABLE_TRIGGER_PROVIDER_REFRESH_TASK: bool = Field(
|
||||
description="Enable trigger provider refresh poller",
|
||||
@@ -1304,6 +1319,9 @@ class WorkflowLogConfig(BaseSettings):
|
||||
WORKFLOW_LOG_CLEANUP_BATCH_SIZE: int = Field(
|
||||
default=100, description="Batch size for workflow run log cleanup operations"
|
||||
)
|
||||
WORKFLOW_LOG_CLEANUP_SPECIFIC_WORKFLOW_IDS: str = Field(
|
||||
default="", description="Comma-separated list of workflow IDs to clean logs for"
|
||||
)
|
||||
|
||||
|
||||
class SwaggerUIConfig(BaseSettings):
|
||||
@@ -1334,6 +1352,10 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings):
|
||||
description="Maximum number of records to process in each batch",
|
||||
default=1000,
|
||||
)
|
||||
SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL: PositiveInt = Field(
|
||||
description="Maximum interval in milliseconds between batches",
|
||||
default=200,
|
||||
)
|
||||
SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: PositiveInt = Field(
|
||||
description="Retention days for sandbox expired workflow_run records and message records",
|
||||
default=30,
|
||||
|
||||
@@ -259,11 +259,20 @@ class CeleryConfig(DatabaseConfig):
|
||||
description="Password of the Redis Sentinel master.",
|
||||
default=None,
|
||||
)
|
||||
|
||||
CELERY_SENTINEL_SOCKET_TIMEOUT: PositiveFloat | None = Field(
|
||||
description="Timeout for Redis Sentinel socket operations in seconds.",
|
||||
default=0.1,
|
||||
)
|
||||
|
||||
CELERY_TASK_ANNOTATIONS: dict[str, Any] | None = Field(
|
||||
description=(
|
||||
"Annotations for Celery tasks as a JSON mapping of task name -> options "
|
||||
"(for example, rate limits or other task-specific settings)."
|
||||
),
|
||||
default=None,
|
||||
)
|
||||
|
||||
@computed_field
|
||||
def CELERY_RESULT_BACKEND(self) -> str | None:
|
||||
if self.CELERY_BACKEND in ("database", "rabbitmq"):
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import Field, PositiveInt
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
@@ -49,3 +51,43 @@ class OceanBaseVectorConfig(BaseSettings):
|
||||
),
|
||||
default="ik",
|
||||
)
|
||||
|
||||
OCEANBASE_VECTOR_BATCH_SIZE: PositiveInt = Field(
|
||||
description="Number of documents to insert per batch",
|
||||
default=100,
|
||||
)
|
||||
|
||||
OCEANBASE_VECTOR_METRIC_TYPE: Literal["l2", "cosine", "inner_product"] = Field(
|
||||
description="Distance metric type for vector index: l2, cosine, or inner_product",
|
||||
default="l2",
|
||||
)
|
||||
|
||||
OCEANBASE_HNSW_M: PositiveInt = Field(
|
||||
description="HNSW M parameter (max number of connections per node)",
|
||||
default=16,
|
||||
)
|
||||
|
||||
OCEANBASE_HNSW_EF_CONSTRUCTION: PositiveInt = Field(
|
||||
description="HNSW efConstruction parameter (index build-time search width)",
|
||||
default=256,
|
||||
)
|
||||
|
||||
OCEANBASE_HNSW_EF_SEARCH: int = Field(
|
||||
description="HNSW efSearch parameter (query-time search width, -1 uses server default)",
|
||||
default=-1,
|
||||
)
|
||||
|
||||
OCEANBASE_VECTOR_POOL_SIZE: PositiveInt = Field(
|
||||
description="SQLAlchemy connection pool size",
|
||||
default=5,
|
||||
)
|
||||
|
||||
OCEANBASE_VECTOR_MAX_OVERFLOW: int = Field(
|
||||
description="SQLAlchemy connection pool max overflow connections",
|
||||
default=10,
|
||||
)
|
||||
|
||||
OCEANBASE_HNSW_REFRESH_THRESHOLD: int = Field(
|
||||
description="Minimum number of inserted documents to trigger an automatic HNSW index refresh (0 to disable)",
|
||||
default=1000,
|
||||
)
|
||||
|
||||
@@ -21,6 +21,7 @@ language_timezone_mapping = {
|
||||
"th-TH": "Asia/Bangkok",
|
||||
"id-ID": "Asia/Jakarta",
|
||||
"ar-TN": "Africa/Tunis",
|
||||
"nl-NL": "Europe/Amsterdam",
|
||||
}
|
||||
|
||||
languages = list(language_timezone_mapping.keys())
|
||||
|
||||
@@ -4,7 +4,7 @@ from typing import Any, TypeAlias
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, computed_field
|
||||
|
||||
from core.file import helpers as file_helpers
|
||||
from core.workflow.file import helpers as file_helpers
|
||||
from models.model import IconType
|
||||
|
||||
JSONValue: TypeAlias = str | int | float | bool | None | dict[str, Any] | list[Any]
|
||||
|
||||
@@ -5,8 +5,6 @@ from enum import StrEnum
|
||||
from flask_restx import Namespace
|
||||
from pydantic import BaseModel, TypeAdapter
|
||||
|
||||
from controllers.console import console_ns
|
||||
|
||||
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
|
||||
|
||||
|
||||
@@ -24,6 +22,9 @@ def register_schema_models(namespace: Namespace, *models: type[BaseModel]) -> No
|
||||
|
||||
|
||||
def get_or_create_model(model_name: str, field_def):
|
||||
# Import lazily to avoid circular imports between console controllers and schema helpers.
|
||||
from controllers.console import console_ns
|
||||
|
||||
existing = console_ns.models.get(model_name)
|
||||
if existing is None:
|
||||
existing = console_ns.model(model_name, field_def)
|
||||
|
||||
@@ -10,6 +10,7 @@ from libs.helper import TimestampField
|
||||
from libs.login import current_account_with_tenant, login_required
|
||||
from models.dataset import Dataset
|
||||
from models.model import ApiToken, App
|
||||
from services.api_token_service import ApiTokenCache
|
||||
|
||||
from . import console_ns
|
||||
from .wraps import account_initialization_required, edit_permission_required, setup_required
|
||||
@@ -131,6 +132,11 @@ class BaseApiKeyResource(Resource):
|
||||
if key is None:
|
||||
flask_restx.abort(HTTPStatus.NOT_FOUND, message="API key not found")
|
||||
|
||||
# Invalidate cache before deleting from database
|
||||
# Type assertion: key is guaranteed to be non-None here because abort() raises
|
||||
assert key is not None # nosec - for type checker only
|
||||
ApiTokenCache.delete(key.token, key.type)
|
||||
|
||||
db.session.query(ApiToken).where(ApiToken.id == api_key_id).delete()
|
||||
db.session.commit()
|
||||
|
||||
|
||||
@@ -23,10 +23,10 @@ from controllers.console.wraps import (
|
||||
is_admin_or_owner_required,
|
||||
setup_required,
|
||||
)
|
||||
from core.file import helpers as file_helpers
|
||||
from core.ops.ops_trace_manager import OpsTraceManager
|
||||
from core.rag.retrieval.retrieval_methods import RetrievalMethod
|
||||
from core.workflow.enums import NodeType, WorkflowExecutionStatus
|
||||
from core.workflow.file import helpers as file_helpers
|
||||
from extensions.ext_database import db
|
||||
from libs.login import current_account_with_tenant, login_required
|
||||
from models import App, DatasetPermissionEnum, Workflow
|
||||
@@ -660,6 +660,19 @@ class AppCopyApi(Resource):
|
||||
)
|
||||
session.commit()
|
||||
|
||||
# Inherit web app permission from original app
|
||||
if result.app_id and FeatureService.get_system_features().webapp_auth.enabled:
|
||||
try:
|
||||
# Get the original app's access mode
|
||||
original_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_model.id)
|
||||
access_mode = original_settings.access_mode
|
||||
except Exception:
|
||||
# If original app has no settings (old app), default to public to match fallback behavior
|
||||
access_mode = "public"
|
||||
|
||||
# Apply the same access mode to the copied app
|
||||
EnterpriseService.WebAppAuth.update_app_access_mode(result.app_id, access_mode)
|
||||
|
||||
stmt = select(App).where(App.id == result.app_id)
|
||||
app = session.scalar(stmt)
|
||||
|
||||
|
||||
@@ -599,7 +599,12 @@ def _get_conversation(app_model, conversation_id):
|
||||
db.session.execute(
|
||||
sa.update(Conversation)
|
||||
.where(Conversation.id == conversation_id, Conversation.read_at.is_(None))
|
||||
.values(read_at=naive_utc_now(), read_account_id=current_user.id)
|
||||
# Keep updated_at unchanged when only marking a conversation as read.
|
||||
.values(
|
||||
read_at=naive_utc_now(),
|
||||
read_account_id=current_user.id,
|
||||
updated_at=Conversation.updated_at,
|
||||
)
|
||||
)
|
||||
db.session.commit()
|
||||
db.session.refresh(conversation)
|
||||
|
||||
@@ -20,7 +20,6 @@ from core.app.app_config.features.file_upload.manager import FileUploadConfigMan
|
||||
from core.app.apps.base_app_queue_manager import AppQueueManager
|
||||
from core.app.apps.workflow.app_generator import SKIP_PREPARE_USER_INPUTS_KEY
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.file.models import File
|
||||
from core.helper.trace_id_helper import get_external_trace_id
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.plugin.impl.exc import PluginInvokeError
|
||||
@@ -31,8 +30,10 @@ from core.trigger.debug.event_selectors import (
|
||||
select_trigger_debug_events,
|
||||
)
|
||||
from core.workflow.enums import NodeType
|
||||
from core.workflow.file.models import File
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from factories import file_factory, variable_factory
|
||||
from fields.member_fields import simple_account_fields
|
||||
from fields.workflow_fields import workflow_fields, workflow_pagination_fields
|
||||
@@ -740,7 +741,7 @@ class WorkflowTaskStopApi(Resource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
|
||||
@@ -15,11 +15,11 @@ from controllers.console.app.error import (
|
||||
from controllers.console.app.wraps import get_app_model
|
||||
from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required
|
||||
from controllers.web.error import InvalidArgumentError, NotFoundError
|
||||
from core.file import helpers as file_helpers
|
||||
from core.variables.segment_group import SegmentGroup
|
||||
from core.variables.segments import ArrayFileSegment, FileSegment, Segment
|
||||
from core.variables.types import SegmentType
|
||||
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
|
||||
from core.workflow.file import helpers as file_helpers
|
||||
from extensions.ext_database import db
|
||||
from factories.file_factory import build_from_mapping, build_from_mappings
|
||||
from factories.variable_factory import build_segment_with_type
|
||||
@@ -112,11 +112,11 @@ _WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS = {
|
||||
"is_truncated": fields.Boolean(attribute=lambda model: model.file_id is not None),
|
||||
}
|
||||
|
||||
_WORKFLOW_DRAFT_VARIABLE_FIELDS = dict(
|
||||
_WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS,
|
||||
value=fields.Raw(attribute=_serialize_var_value),
|
||||
full_content=fields.Raw(attribute=_serialize_full_content),
|
||||
)
|
||||
_WORKFLOW_DRAFT_VARIABLE_FIELDS = {
|
||||
**_WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS,
|
||||
"value": fields.Raw(attribute=_serialize_var_value),
|
||||
"full_content": fields.Raw(attribute=_serialize_full_content),
|
||||
}
|
||||
|
||||
_WORKFLOW_DRAFT_ENV_VARIABLE_FIELDS = {
|
||||
"id": fields.String,
|
||||
|
||||
@@ -463,8 +463,9 @@ class WorkflowRunNodeExecutionListApi(Resource):
|
||||
class ConsoleWorkflowPauseDetailsApi(Resource):
|
||||
"""Console API for getting workflow pause details."""
|
||||
|
||||
@account_initialization_required
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self, workflow_run_id: str):
|
||||
"""
|
||||
Get workflow pause details.
|
||||
@@ -477,10 +478,14 @@ class ConsoleWorkflowPauseDetailsApi(Resource):
|
||||
# Query WorkflowRun to determine if workflow is suspended
|
||||
session_maker = sessionmaker(bind=db.engine)
|
||||
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker=session_maker)
|
||||
|
||||
workflow_run = db.session.get(WorkflowRun, workflow_run_id)
|
||||
if not workflow_run:
|
||||
raise NotFoundError("Workflow run not found")
|
||||
|
||||
if workflow_run.tenant_id != current_user.current_tenant_id:
|
||||
raise NotFoundError("Workflow run not found")
|
||||
|
||||
# Check if workflow is suspended
|
||||
is_paused = workflow_run.status == WorkflowExecutionStatus.PAUSED
|
||||
if not is_paused:
|
||||
|
||||
@@ -55,6 +55,7 @@ from libs.login import current_account_with_tenant, login_required
|
||||
from models import ApiToken, Dataset, Document, DocumentSegment, UploadFile
|
||||
from models.dataset import DatasetPermissionEnum
|
||||
from models.provider_ids import ModelProviderID
|
||||
from services.api_token_service import ApiTokenCache
|
||||
from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService
|
||||
|
||||
# Register models for flask_restx to avoid dict type issues in Swagger
|
||||
@@ -820,6 +821,11 @@ class DatasetApiDeleteApi(Resource):
|
||||
if key is None:
|
||||
console_ns.abort(404, message="API key not found")
|
||||
|
||||
# Invalidate cache before deleting from database
|
||||
# Type assertion: key is guaranteed to be non-None here because abort() raises
|
||||
assert key is not None # nosec - for type checker only
|
||||
ApiTokenCache.delete(key.token, key.type)
|
||||
|
||||
db.session.query(ApiToken).where(ApiToken.id == api_key_id).delete()
|
||||
db.session.commit()
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import services
|
||||
from controllers.common.fields import Parameters as ParametersResponse
|
||||
from controllers.common.fields import Site as SiteResponse
|
||||
from controllers.common.schema import get_or_create_model
|
||||
from controllers.console import api, console_ns
|
||||
from controllers.console import console_ns
|
||||
from controllers.console.app.error import (
|
||||
AppUnavailableError,
|
||||
AudioTooLargeError,
|
||||
@@ -44,6 +44,7 @@ from core.errors.error import (
|
||||
from core.model_runtime.errors.invoke import InvokeError
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from fields.app_fields import (
|
||||
app_detail_fields_with_site,
|
||||
deleted_tool_fields,
|
||||
@@ -225,7 +226,7 @@ class TrialAppWorkflowTaskStopApi(TrialAppResource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
@@ -469,7 +470,7 @@ class TrialSitApi(Resource):
|
||||
"""Resource for trial app sites."""
|
||||
|
||||
@trial_feature_enable
|
||||
@get_app_model_with_trial
|
||||
@get_app_model_with_trial(None)
|
||||
def get(self, app_model):
|
||||
"""Retrieve app site info.
|
||||
|
||||
@@ -491,7 +492,7 @@ class TrialAppParameterApi(Resource):
|
||||
"""Resource for app variables."""
|
||||
|
||||
@trial_feature_enable
|
||||
@get_app_model_with_trial
|
||||
@get_app_model_with_trial(None)
|
||||
def get(self, app_model):
|
||||
"""Retrieve app parameters."""
|
||||
|
||||
@@ -520,7 +521,7 @@ class TrialAppParameterApi(Resource):
|
||||
|
||||
class AppApi(Resource):
|
||||
@trial_feature_enable
|
||||
@get_app_model_with_trial
|
||||
@get_app_model_with_trial(None)
|
||||
@marshal_with(app_detail_with_site_model)
|
||||
def get(self, app_model):
|
||||
"""Get app detail"""
|
||||
@@ -533,7 +534,7 @@ class AppApi(Resource):
|
||||
|
||||
class AppWorkflowApi(Resource):
|
||||
@trial_feature_enable
|
||||
@get_app_model_with_trial
|
||||
@get_app_model_with_trial(None)
|
||||
@marshal_with(workflow_model)
|
||||
def get(self, app_model):
|
||||
"""Get workflow detail"""
|
||||
@@ -552,7 +553,7 @@ class AppWorkflowApi(Resource):
|
||||
|
||||
class DatasetListApi(Resource):
|
||||
@trial_feature_enable
|
||||
@get_app_model_with_trial
|
||||
@get_app_model_with_trial(None)
|
||||
def get(self, app_model):
|
||||
page = request.args.get("page", default=1, type=int)
|
||||
limit = request.args.get("limit", default=20, type=int)
|
||||
@@ -570,27 +571,31 @@ class DatasetListApi(Resource):
|
||||
return response
|
||||
|
||||
|
||||
api.add_resource(TrialChatApi, "/trial-apps/<uuid:app_id>/chat-messages", endpoint="trial_app_chat_completion")
|
||||
console_ns.add_resource(TrialChatApi, "/trial-apps/<uuid:app_id>/chat-messages", endpoint="trial_app_chat_completion")
|
||||
|
||||
api.add_resource(
|
||||
console_ns.add_resource(
|
||||
TrialMessageSuggestedQuestionApi,
|
||||
"/trial-apps/<uuid:app_id>/messages/<uuid:message_id>/suggested-questions",
|
||||
endpoint="trial_app_suggested_question",
|
||||
)
|
||||
|
||||
api.add_resource(TrialChatAudioApi, "/trial-apps/<uuid:app_id>/audio-to-text", endpoint="trial_app_audio")
|
||||
api.add_resource(TrialChatTextApi, "/trial-apps/<uuid:app_id>/text-to-audio", endpoint="trial_app_text")
|
||||
console_ns.add_resource(TrialChatAudioApi, "/trial-apps/<uuid:app_id>/audio-to-text", endpoint="trial_app_audio")
|
||||
console_ns.add_resource(TrialChatTextApi, "/trial-apps/<uuid:app_id>/text-to-audio", endpoint="trial_app_text")
|
||||
|
||||
api.add_resource(TrialCompletionApi, "/trial-apps/<uuid:app_id>/completion-messages", endpoint="trial_app_completion")
|
||||
console_ns.add_resource(
|
||||
TrialCompletionApi, "/trial-apps/<uuid:app_id>/completion-messages", endpoint="trial_app_completion"
|
||||
)
|
||||
|
||||
api.add_resource(TrialSitApi, "/trial-apps/<uuid:app_id>/site")
|
||||
console_ns.add_resource(TrialSitApi, "/trial-apps/<uuid:app_id>/site")
|
||||
|
||||
api.add_resource(TrialAppParameterApi, "/trial-apps/<uuid:app_id>/parameters", endpoint="trial_app_parameters")
|
||||
console_ns.add_resource(TrialAppParameterApi, "/trial-apps/<uuid:app_id>/parameters", endpoint="trial_app_parameters")
|
||||
|
||||
api.add_resource(AppApi, "/trial-apps/<uuid:app_id>", endpoint="trial_app")
|
||||
console_ns.add_resource(AppApi, "/trial-apps/<uuid:app_id>", endpoint="trial_app")
|
||||
|
||||
api.add_resource(TrialAppWorkflowRunApi, "/trial-apps/<uuid:app_id>/workflows/run", endpoint="trial_app_workflow_run")
|
||||
api.add_resource(TrialAppWorkflowTaskStopApi, "/trial-apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop")
|
||||
console_ns.add_resource(
|
||||
TrialAppWorkflowRunApi, "/trial-apps/<uuid:app_id>/workflows/run", endpoint="trial_app_workflow_run"
|
||||
)
|
||||
console_ns.add_resource(TrialAppWorkflowTaskStopApi, "/trial-apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop")
|
||||
|
||||
api.add_resource(AppWorkflowApi, "/trial-apps/<uuid:app_id>/workflows", endpoint="trial_app_workflow")
|
||||
api.add_resource(DatasetListApi, "/trial-apps/<uuid:app_id>/datasets", endpoint="trial_app_datasets")
|
||||
console_ns.add_resource(AppWorkflowApi, "/trial-apps/<uuid:app_id>/workflows", endpoint="trial_app_workflow")
|
||||
console_ns.add_resource(DatasetListApi, "/trial-apps/<uuid:app_id>/datasets", endpoint="trial_app_datasets")
|
||||
|
||||
@@ -23,6 +23,7 @@ from core.errors.error import (
|
||||
)
|
||||
from core.model_runtime.errors.invoke import InvokeError
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs import helper
|
||||
from libs.login import current_account_with_tenant
|
||||
from models.model import AppMode, InstalledApp
|
||||
@@ -100,6 +101,6 @@ class InstalledAppWorkflowTaskStopApi(InstalledAppResource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
@@ -105,9 +105,9 @@ def trial_app_required(view: Callable[Concatenate[App, P], R] | None = None):
|
||||
return decorator
|
||||
|
||||
|
||||
def trial_feature_enable(view: Callable[..., R]) -> Callable[..., R]:
|
||||
def trial_feature_enable(view: Callable[P, R]):
|
||||
@wraps(view)
|
||||
def decorated(*args, **kwargs):
|
||||
def decorated(*args: P.args, **kwargs: P.kwargs):
|
||||
features = FeatureService.get_system_features()
|
||||
if not features.enable_trial_app:
|
||||
abort(403, "Trial app feature is not enabled.")
|
||||
@@ -116,9 +116,9 @@ def trial_feature_enable(view: Callable[..., R]) -> Callable[..., R]:
|
||||
return decorated
|
||||
|
||||
|
||||
def explore_banner_enabled(view: Callable[..., R]) -> Callable[..., R]:
|
||||
def explore_banner_enabled(view: Callable[P, R]):
|
||||
@wraps(view)
|
||||
def decorated(*args, **kwargs):
|
||||
def decorated(*args: P.args, **kwargs: P.kwargs):
|
||||
features = FeatureService.get_system_features()
|
||||
if not features.enable_explore_banner:
|
||||
abort(403, "Explore banner feature is not enabled.")
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import urllib.parse
|
||||
|
||||
import httpx
|
||||
from flask_restx import Resource
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
import services
|
||||
@@ -10,12 +11,12 @@ from controllers.common.errors import (
|
||||
RemoteFileUploadError,
|
||||
UnsupportedFileTypeError,
|
||||
)
|
||||
from controllers.fastopenapi import console_router
|
||||
from core.file import helpers as file_helpers
|
||||
from controllers.console import console_ns
|
||||
from core.helper import ssrf_proxy
|
||||
from core.workflow.file import helpers as file_helpers
|
||||
from extensions.ext_database import db
|
||||
from fields.file_fields import FileWithSignedUrl, RemoteFileInfo
|
||||
from libs.login import current_account_with_tenant
|
||||
from libs.login import current_account_with_tenant, login_required
|
||||
from services.file_service import FileService
|
||||
|
||||
|
||||
@@ -23,69 +24,73 @@ class RemoteFileUploadPayload(BaseModel):
|
||||
url: str = Field(..., description="URL to fetch")
|
||||
|
||||
|
||||
@console_router.get(
|
||||
"/remote-files/<path:url>",
|
||||
response_model=RemoteFileInfo,
|
||||
tags=["console"],
|
||||
)
|
||||
def get_remote_file_info(url: str) -> RemoteFileInfo:
|
||||
decoded_url = urllib.parse.unquote(url)
|
||||
resp = ssrf_proxy.head(decoded_url)
|
||||
if resp.status_code != httpx.codes.OK:
|
||||
resp = ssrf_proxy.get(decoded_url, timeout=3)
|
||||
resp.raise_for_status()
|
||||
return RemoteFileInfo(
|
||||
file_type=resp.headers.get("Content-Type", "application/octet-stream"),
|
||||
file_length=int(resp.headers.get("Content-Length", 0)),
|
||||
)
|
||||
|
||||
|
||||
@console_router.post(
|
||||
"/remote-files/upload",
|
||||
response_model=FileWithSignedUrl,
|
||||
tags=["console"],
|
||||
status_code=201,
|
||||
)
|
||||
def upload_remote_file(payload: RemoteFileUploadPayload) -> FileWithSignedUrl:
|
||||
url = payload.url
|
||||
|
||||
try:
|
||||
resp = ssrf_proxy.head(url=url)
|
||||
@console_ns.route("/remote-files/<path:url>")
|
||||
class GetRemoteFileInfo(Resource):
|
||||
@login_required
|
||||
def get(self, url: str):
|
||||
decoded_url = urllib.parse.unquote(url)
|
||||
resp = ssrf_proxy.head(decoded_url)
|
||||
if resp.status_code != httpx.codes.OK:
|
||||
resp = ssrf_proxy.get(url=url, timeout=3, follow_redirects=True)
|
||||
if resp.status_code != httpx.codes.OK:
|
||||
raise RemoteFileUploadError(f"Failed to fetch file from {url}: {resp.text}")
|
||||
except httpx.RequestError as e:
|
||||
raise RemoteFileUploadError(f"Failed to fetch file from {url}: {str(e)}")
|
||||
resp = ssrf_proxy.get(decoded_url, timeout=3)
|
||||
resp.raise_for_status()
|
||||
return RemoteFileInfo(
|
||||
file_type=resp.headers.get("Content-Type", "application/octet-stream"),
|
||||
file_length=int(resp.headers.get("Content-Length", 0)),
|
||||
).model_dump(mode="json")
|
||||
|
||||
file_info = helpers.guess_file_info_from_response(resp)
|
||||
|
||||
if not FileService.is_file_size_within_limit(extension=file_info.extension, file_size=file_info.size):
|
||||
raise FileTooLargeError
|
||||
@console_ns.route("/remote-files/upload")
|
||||
class RemoteFileUpload(Resource):
|
||||
@login_required
|
||||
def post(self):
|
||||
payload = RemoteFileUploadPayload.model_validate(console_ns.payload)
|
||||
url = payload.url
|
||||
|
||||
content = resp.content if resp.request.method == "GET" else ssrf_proxy.get(url).content
|
||||
# Try to fetch remote file metadata/content first
|
||||
try:
|
||||
resp = ssrf_proxy.head(url=url)
|
||||
if resp.status_code != httpx.codes.OK:
|
||||
resp = ssrf_proxy.get(url=url, timeout=3, follow_redirects=True)
|
||||
if resp.status_code != httpx.codes.OK:
|
||||
# Normalize into a user-friendly error message expected by tests
|
||||
raise RemoteFileUploadError(f"Failed to fetch file from {url}: {resp.text}")
|
||||
except httpx.RequestError as e:
|
||||
raise RemoteFileUploadError(f"Failed to fetch file from {url}: {str(e)}")
|
||||
|
||||
try:
|
||||
user, _ = current_account_with_tenant()
|
||||
upload_file = FileService(db.engine).upload_file(
|
||||
filename=file_info.filename,
|
||||
content=content,
|
||||
mimetype=file_info.mimetype,
|
||||
user=user,
|
||||
source_url=url,
|
||||
file_info = helpers.guess_file_info_from_response(resp)
|
||||
|
||||
# Enforce file size limit with 400 (Bad Request) per tests' expectation
|
||||
if not FileService.is_file_size_within_limit(extension=file_info.extension, file_size=file_info.size):
|
||||
raise FileTooLargeError()
|
||||
|
||||
# Load content if needed
|
||||
content = resp.content if resp.request.method == "GET" else ssrf_proxy.get(url).content
|
||||
|
||||
try:
|
||||
user, _ = current_account_with_tenant()
|
||||
upload_file = FileService(db.engine).upload_file(
|
||||
filename=file_info.filename,
|
||||
content=content,
|
||||
mimetype=file_info.mimetype,
|
||||
user=user,
|
||||
source_url=url,
|
||||
)
|
||||
except services.errors.file.FileTooLargeError as file_too_large_error:
|
||||
raise FileTooLargeError(file_too_large_error.description)
|
||||
except services.errors.file.UnsupportedFileTypeError:
|
||||
raise UnsupportedFileTypeError()
|
||||
|
||||
# Success: return created resource with 201 status
|
||||
return (
|
||||
FileWithSignedUrl(
|
||||
id=upload_file.id,
|
||||
name=upload_file.name,
|
||||
size=upload_file.size,
|
||||
extension=upload_file.extension,
|
||||
url=file_helpers.get_signed_file_url(upload_file_id=upload_file.id),
|
||||
mime_type=upload_file.mime_type,
|
||||
created_by=upload_file.created_by,
|
||||
created_at=int(upload_file.created_at.timestamp()),
|
||||
).model_dump(mode="json"),
|
||||
201,
|
||||
)
|
||||
except services.errors.file.FileTooLargeError as file_too_large_error:
|
||||
raise FileTooLargeError(file_too_large_error.description)
|
||||
except services.errors.file.UnsupportedFileTypeError:
|
||||
raise UnsupportedFileTypeError()
|
||||
|
||||
return FileWithSignedUrl(
|
||||
id=upload_file.id,
|
||||
name=upload_file.name,
|
||||
size=upload_file.size,
|
||||
extension=upload_file.extension,
|
||||
url=file_helpers.get_signed_file_url(upload_file_id=upload_file.id),
|
||||
mime_type=upload_file.mime_type,
|
||||
created_by=upload_file.created_by,
|
||||
created_at=int(upload_file.created_at.timestamp()),
|
||||
)
|
||||
|
||||
@@ -42,7 +42,15 @@ class SetupResponse(BaseModel):
|
||||
tags=["console"],
|
||||
)
|
||||
def get_setup_status_api() -> SetupStatusResponse:
|
||||
"""Get system setup status."""
|
||||
"""Get system setup status.
|
||||
|
||||
NOTE: This endpoint is unauthenticated by design.
|
||||
|
||||
During first-time bootstrap there is no admin account yet, so frontend initialization must be
|
||||
able to query setup progress before any login flow exists.
|
||||
|
||||
Only bootstrap-safe status information should be returned by this endpoint.
|
||||
"""
|
||||
if dify_config.EDITION == "SELF_HOSTED":
|
||||
setup_status = get_setup_status()
|
||||
if setup_status and not isinstance(setup_status, bool):
|
||||
@@ -61,7 +69,12 @@ def get_setup_status_api() -> SetupStatusResponse:
|
||||
)
|
||||
@only_edition_self_hosted
|
||||
def setup_system(payload: SetupRequestPayload) -> SetupResponse:
|
||||
"""Initialize system setup with admin account."""
|
||||
"""Initialize system setup with admin account.
|
||||
|
||||
NOTE: This endpoint is unauthenticated by design for first-time bootstrap.
|
||||
Access is restricted by deployment mode (`SELF_HOSTED`), one-time setup guards,
|
||||
and init-password validation rather than user session authentication.
|
||||
"""
|
||||
if get_setup_status():
|
||||
raise AlreadySetupError()
|
||||
|
||||
|
||||
@@ -120,7 +120,7 @@ class TagUpdateDeleteApi(Resource):
|
||||
|
||||
TagService.delete_tag(tag_id)
|
||||
|
||||
return 204
|
||||
return "", 204
|
||||
|
||||
|
||||
@console_ns.route("/tag-bindings/create")
|
||||
|
||||
@@ -137,7 +137,7 @@ class FilePreviewApi(Resource):
|
||||
if args.as_attachment:
|
||||
encoded_filename = quote(upload_file.name)
|
||||
response.headers["Content-Disposition"] = f"attachment; filename*=UTF-8''{encoded_filename}"
|
||||
response.headers["Content-Type"] = "application/octet-stream"
|
||||
response.headers["Content-Type"] = "application/octet-stream"
|
||||
|
||||
enforce_download_for_html(
|
||||
response,
|
||||
|
||||
@@ -64,6 +64,10 @@ class ToolFileApi(Resource):
|
||||
|
||||
if not stream or not tool_file:
|
||||
raise NotFound("file is not found")
|
||||
|
||||
except NotFound:
|
||||
raise
|
||||
|
||||
except Exception:
|
||||
raise UnsupportedFileTypeError()
|
||||
|
||||
|
||||
@@ -7,8 +7,8 @@ from pydantic import BaseModel, Field
|
||||
from werkzeug.exceptions import Forbidden
|
||||
|
||||
import services
|
||||
from core.file.helpers import verify_plugin_file_signature
|
||||
from core.tools.tool_file_manager import ToolFileManager
|
||||
from core.workflow.file.helpers import verify_plugin_file_signature
|
||||
from fields.file_fields import FileResponse
|
||||
|
||||
from ..common.errors import (
|
||||
|
||||
@@ -4,7 +4,6 @@ from controllers.console.wraps import setup_required
|
||||
from controllers.inner_api import inner_api_ns
|
||||
from controllers.inner_api.plugin.wraps import get_user_tenant, plugin_data
|
||||
from controllers.inner_api.wraps import plugin_inner_api_only
|
||||
from core.file.helpers import get_signed_file_url_for_plugin
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.plugin.backwards_invocation.app import PluginAppBackwardsInvocation
|
||||
from core.plugin.backwards_invocation.base import BaseBackwardsInvocationResponse
|
||||
@@ -30,6 +29,7 @@ from core.plugin.entities.request import (
|
||||
RequestRequestUploadFile,
|
||||
)
|
||||
from core.tools.entities.tool_entities import ToolProviderType
|
||||
from core.workflow.file.helpers import get_signed_file_url_for_plugin
|
||||
from libs.helper import length_prefixed_response
|
||||
from models import Account, Tenant
|
||||
from models.model import EndUser
|
||||
|
||||
@@ -34,6 +34,8 @@ from .dataset import (
|
||||
metadata,
|
||||
segment,
|
||||
)
|
||||
from .dataset.rag_pipeline import rag_pipeline_workflow
|
||||
from .end_user import end_user
|
||||
from .workspace import models
|
||||
|
||||
__all__ = [
|
||||
@@ -44,6 +46,7 @@ __all__ = [
|
||||
"conversation",
|
||||
"dataset",
|
||||
"document",
|
||||
"end_user",
|
||||
"file",
|
||||
"file_preview",
|
||||
"hit_testing",
|
||||
@@ -51,6 +54,7 @@ __all__ = [
|
||||
"message",
|
||||
"metadata",
|
||||
"models",
|
||||
"rag_pipeline_workflow",
|
||||
"segment",
|
||||
"site",
|
||||
"workflow",
|
||||
|
||||
@@ -31,6 +31,7 @@ from core.model_runtime.errors.invoke import InvokeError
|
||||
from core.workflow.enums import WorkflowExecutionStatus
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from fields.workflow_app_log_fields import build_workflow_app_log_pagination_model
|
||||
from libs import helper
|
||||
from libs.helper import OptionalTimestampField, TimestampField
|
||||
@@ -280,7 +281,7 @@ class WorkflowTaskStopApi(Resource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
|
||||
@@ -396,7 +396,7 @@ class DatasetApi(DatasetApiResource):
|
||||
try:
|
||||
if DatasetService.delete_dataset(dataset_id_str, current_user):
|
||||
DatasetPermissionService.clear_partial_member_list(dataset_id_str)
|
||||
return 204
|
||||
return "", 204
|
||||
else:
|
||||
raise NotFound("Dataset not found.")
|
||||
except services.errors.dataset.DatasetInUseError:
|
||||
@@ -557,7 +557,7 @@ class DatasetTagsApi(DatasetApiResource):
|
||||
payload = TagDeletePayload.model_validate(service_api_ns.payload or {})
|
||||
TagService.delete_tag(payload.tag_id)
|
||||
|
||||
return 204
|
||||
return "", 204
|
||||
|
||||
|
||||
@service_api_ns.route("/datasets/tags/binding")
|
||||
@@ -581,7 +581,7 @@ class DatasetTagBindingApi(DatasetApiResource):
|
||||
payload = TagBindingPayload.model_validate(service_api_ns.payload or {})
|
||||
TagService.save_tag_binding({"tag_ids": payload.tag_ids, "target_id": payload.target_id, "type": "knowledge"})
|
||||
|
||||
return 204
|
||||
return "", 204
|
||||
|
||||
|
||||
@service_api_ns.route("/datasets/tags/unbinding")
|
||||
@@ -605,7 +605,7 @@ class DatasetTagUnbindingApi(DatasetApiResource):
|
||||
payload = TagUnbindingPayload.model_validate(service_api_ns.payload or {})
|
||||
TagService.delete_tag_binding({"tag_id": payload.tag_id, "target_id": payload.target_id, "type": "knowledge"})
|
||||
|
||||
return 204
|
||||
return "", 204
|
||||
|
||||
|
||||
@service_api_ns.route("/datasets/<uuid:dataset_id>/tags")
|
||||
|
||||
@@ -746,4 +746,4 @@ class DocumentApi(DatasetApiResource):
|
||||
except services.errors.document.DocumentIndexingError:
|
||||
raise DocumentIndexingError("Cannot delete document during indexing.")
|
||||
|
||||
return 204
|
||||
return "", 204
|
||||
|
||||
@@ -128,7 +128,7 @@ class DatasetMetadataServiceApi(DatasetApiResource):
|
||||
DatasetService.check_dataset_permission(dataset, current_user)
|
||||
|
||||
MetadataService.delete_metadata(dataset_id_str, metadata_id_str)
|
||||
return 204
|
||||
return "", 204
|
||||
|
||||
|
||||
@service_api_ns.route("/datasets/<uuid:dataset_id>/metadata/built-in")
|
||||
|
||||
@@ -1,24 +1,24 @@
|
||||
import string
|
||||
import uuid
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
|
||||
from flask import request
|
||||
from pydantic import BaseModel
|
||||
from werkzeug.exceptions import Forbidden
|
||||
from sqlalchemy import select
|
||||
from werkzeug.exceptions import Forbidden, NotFound
|
||||
|
||||
import services
|
||||
from controllers.common.errors import FilenameNotExistsError, NoFileUploadedError, TooManyFilesError
|
||||
from controllers.common.schema import register_schema_model
|
||||
from controllers.service_api import service_api_ns
|
||||
from controllers.service_api.dataset.error import PipelineRunError
|
||||
from controllers.service_api.dataset.rag_pipeline.serializers import serialize_upload_file
|
||||
from controllers.service_api.wraps import DatasetApiResource
|
||||
from core.app.apps.pipeline.pipeline_generator import PipelineGenerator
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from libs import helper
|
||||
from libs.login import current_user
|
||||
from models import Account
|
||||
from models.dataset import Pipeline
|
||||
from models.dataset import Dataset, Pipeline
|
||||
from models.engine import db
|
||||
from services.errors.file import FileTooLargeError, UnsupportedFileTypeError
|
||||
from services.file_service import FileService
|
||||
@@ -41,7 +41,7 @@ register_schema_model(service_api_ns, DatasourceNodeRunPayload)
|
||||
register_schema_model(service_api_ns, PipelineRunApiEntity)
|
||||
|
||||
|
||||
@service_api_ns.route(f"/datasets/{uuid:dataset_id}/pipeline/datasource-plugins")
|
||||
@service_api_ns.route("/datasets/<uuid:dataset_id>/pipeline/datasource-plugins")
|
||||
class DatasourcePluginsApi(DatasetApiResource):
|
||||
"""Resource for datasource plugins."""
|
||||
|
||||
@@ -66,6 +66,12 @@ class DatasourcePluginsApi(DatasetApiResource):
|
||||
)
|
||||
def get(self, tenant_id: str, dataset_id: str):
|
||||
"""Resource for getting datasource plugins."""
|
||||
# Verify dataset ownership
|
||||
stmt = select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id)
|
||||
dataset = db.session.scalar(stmt)
|
||||
if not dataset:
|
||||
raise NotFound("Dataset not found.")
|
||||
|
||||
# Get query parameter to determine published or draft
|
||||
is_published: bool = request.args.get("is_published", default=True, type=bool)
|
||||
|
||||
@@ -76,7 +82,7 @@ class DatasourcePluginsApi(DatasetApiResource):
|
||||
return datasource_plugins, 200
|
||||
|
||||
|
||||
@service_api_ns.route(f"/datasets/{uuid:dataset_id}/pipeline/datasource/nodes/{string:node_id}/run")
|
||||
@service_api_ns.route("/datasets/<uuid:dataset_id>/pipeline/datasource/nodes/<string:node_id>/run")
|
||||
class DatasourceNodeRunApi(DatasetApiResource):
|
||||
"""Resource for datasource node run."""
|
||||
|
||||
@@ -105,6 +111,12 @@ class DatasourceNodeRunApi(DatasetApiResource):
|
||||
@service_api_ns.expect(service_api_ns.models[DatasourceNodeRunPayload.__name__])
|
||||
def post(self, tenant_id: str, dataset_id: str, node_id: str):
|
||||
"""Resource for getting datasource plugins."""
|
||||
# Verify dataset ownership
|
||||
stmt = select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id)
|
||||
dataset = db.session.scalar(stmt)
|
||||
if not dataset:
|
||||
raise NotFound("Dataset not found.")
|
||||
|
||||
payload = DatasourceNodeRunPayload.model_validate(service_api_ns.payload or {})
|
||||
assert isinstance(current_user, Account)
|
||||
rag_pipeline_service: RagPipelineService = RagPipelineService()
|
||||
@@ -131,7 +143,7 @@ class DatasourceNodeRunApi(DatasetApiResource):
|
||||
)
|
||||
|
||||
|
||||
@service_api_ns.route(f"/datasets/{uuid:dataset_id}/pipeline/run")
|
||||
@service_api_ns.route("/datasets/<uuid:dataset_id>/pipeline/run")
|
||||
class PipelineRunApi(DatasetApiResource):
|
||||
"""Resource for datasource node run."""
|
||||
|
||||
@@ -162,6 +174,12 @@ class PipelineRunApi(DatasetApiResource):
|
||||
@service_api_ns.expect(service_api_ns.models[PipelineRunApiEntity.__name__])
|
||||
def post(self, tenant_id: str, dataset_id: str):
|
||||
"""Resource for running a rag pipeline."""
|
||||
# Verify dataset ownership
|
||||
stmt = select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id)
|
||||
dataset = db.session.scalar(stmt)
|
||||
if not dataset:
|
||||
raise NotFound("Dataset not found.")
|
||||
|
||||
payload = PipelineRunApiEntity.model_validate(service_api_ns.payload or {})
|
||||
|
||||
if not isinstance(current_user, Account):
|
||||
@@ -232,12 +250,4 @@ class KnowledgebasePipelineFileUploadApi(DatasetApiResource):
|
||||
except services.errors.file.UnsupportedFileTypeError:
|
||||
raise UnsupportedFileTypeError()
|
||||
|
||||
return {
|
||||
"id": upload_file.id,
|
||||
"name": upload_file.name,
|
||||
"size": upload_file.size,
|
||||
"extension": upload_file.extension,
|
||||
"mime_type": upload_file.mime_type,
|
||||
"created_by": upload_file.created_by,
|
||||
"created_at": upload_file.created_at,
|
||||
}, 201
|
||||
return serialize_upload_file(upload_file), 201
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
"""
|
||||
Serialization helpers for Service API knowledge pipeline endpoints.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from models.model import UploadFile
|
||||
|
||||
|
||||
def serialize_upload_file(upload_file: UploadFile) -> dict[str, Any]:
|
||||
return {
|
||||
"id": upload_file.id,
|
||||
"name": upload_file.name,
|
||||
"size": upload_file.size,
|
||||
"extension": upload_file.extension,
|
||||
"mime_type": upload_file.mime_type,
|
||||
"created_by": upload_file.created_by,
|
||||
"created_at": upload_file.created_at.isoformat() if upload_file.created_at else None,
|
||||
}
|
||||
@@ -233,7 +233,7 @@ class DatasetSegmentApi(DatasetApiResource):
|
||||
if not segment:
|
||||
raise NotFound("Segment not found.")
|
||||
SegmentService.delete_segment(segment, document, dataset)
|
||||
return 204
|
||||
return "", 204
|
||||
|
||||
@service_api_ns.expect(service_api_ns.models[SegmentUpdatePayload.__name__])
|
||||
@service_api_ns.doc("update_segment")
|
||||
@@ -499,7 +499,7 @@ class DatasetChildChunkApi(DatasetApiResource):
|
||||
except ChildChunkDeleteIndexServiceError as e:
|
||||
raise ChildChunkDeleteIndexError(str(e))
|
||||
|
||||
return 204
|
||||
return "", 204
|
||||
|
||||
@service_api_ns.expect(service_api_ns.models[ChildChunkUpdatePayload.__name__])
|
||||
@service_api_ns.doc("update_child_chunk")
|
||||
|
||||
3
api/controllers/service_api/end_user/__init__.py
Normal file
3
api/controllers/service_api/end_user/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from . import end_user
|
||||
|
||||
__all__ = ["end_user"]
|
||||
41
api/controllers/service_api/end_user/end_user.py
Normal file
41
api/controllers/service_api/end_user/end_user.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from uuid import UUID
|
||||
|
||||
from flask_restx import Resource
|
||||
|
||||
from controllers.service_api import service_api_ns
|
||||
from controllers.service_api.end_user.error import EndUserNotFoundError
|
||||
from controllers.service_api.wraps import validate_app_token
|
||||
from fields.end_user_fields import EndUserDetail
|
||||
from models.model import App
|
||||
from services.end_user_service import EndUserService
|
||||
|
||||
|
||||
@service_api_ns.route("/end-users/<uuid:end_user_id>")
|
||||
class EndUserApi(Resource):
|
||||
"""Resource for retrieving end user details by ID."""
|
||||
|
||||
@service_api_ns.doc("get_end_user")
|
||||
@service_api_ns.doc(description="Get an end user by ID")
|
||||
@service_api_ns.doc(
|
||||
params={"end_user_id": "End user ID"},
|
||||
responses={
|
||||
200: "End user retrieved successfully",
|
||||
401: "Unauthorized - invalid API token",
|
||||
404: "End user not found",
|
||||
},
|
||||
)
|
||||
@validate_app_token
|
||||
def get(self, app_model: App, end_user_id: UUID):
|
||||
"""Get end user detail.
|
||||
|
||||
This endpoint is scoped to the current app token's tenant/app to prevent
|
||||
cross-tenant/app access when an end-user ID is known.
|
||||
"""
|
||||
|
||||
end_user = EndUserService.get_end_user_by_id(
|
||||
tenant_id=app_model.tenant_id, app_id=app_model.id, end_user_id=str(end_user_id)
|
||||
)
|
||||
if end_user is None:
|
||||
raise EndUserNotFoundError()
|
||||
|
||||
return EndUserDetail.model_validate(end_user).model_dump(mode="json")
|
||||
7
api/controllers/service_api/end_user/error.py
Normal file
7
api/controllers/service_api/end_user/error.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from libs.exception import BaseHTTPException
|
||||
|
||||
|
||||
class EndUserNotFoundError(BaseHTTPException):
|
||||
error_code = "end_user_not_found"
|
||||
description = "End user not found."
|
||||
code = 404
|
||||
@@ -1,27 +1,24 @@
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from datetime import timedelta
|
||||
from enum import StrEnum, auto
|
||||
from functools import wraps
|
||||
from typing import Concatenate, ParamSpec, TypeVar
|
||||
from typing import Concatenate, ParamSpec, TypeVar, cast
|
||||
|
||||
from flask import current_app, request
|
||||
from flask_login import user_logged_in
|
||||
from flask_restx import Resource
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import Forbidden, NotFound, Unauthorized
|
||||
|
||||
from enums.cloud_plan import CloudPlan
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from libs.login import current_user
|
||||
from models import Account, Tenant, TenantAccountJoin, TenantStatus
|
||||
from models.dataset import Dataset, RateLimitLog
|
||||
from models.model import ApiToken, App
|
||||
from services.api_token_service import ApiTokenCache, fetch_token_with_single_flight, record_token_usage
|
||||
from services.end_user_service import EndUserService
|
||||
from services.feature_service import FeatureService
|
||||
|
||||
@@ -220,6 +217,8 @@ def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
|
||||
def decorator(view: Callable[Concatenate[T, P], R]):
|
||||
@wraps(view)
|
||||
def decorated(*args: P.args, **kwargs: P.kwargs):
|
||||
api_token = validate_and_get_api_token("dataset")
|
||||
|
||||
# get url path dataset_id from positional args or kwargs
|
||||
# Flask passes URL path parameters as positional arguments
|
||||
dataset_id = None
|
||||
@@ -256,12 +255,18 @@ def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
|
||||
# Validate dataset if dataset_id is provided
|
||||
if dataset_id:
|
||||
dataset_id = str(dataset_id)
|
||||
dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first()
|
||||
dataset = (
|
||||
db.session.query(Dataset)
|
||||
.where(
|
||||
Dataset.id == dataset_id,
|
||||
Dataset.tenant_id == api_token.tenant_id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
if not dataset:
|
||||
raise NotFound("Dataset not found.")
|
||||
if not dataset.enable_api:
|
||||
raise Forbidden("Dataset api access is not enabled.")
|
||||
api_token = validate_and_get_api_token("dataset")
|
||||
tenant_account_join = (
|
||||
db.session.query(Tenant, TenantAccountJoin)
|
||||
.where(Tenant.id == api_token.tenant_id)
|
||||
@@ -296,7 +301,14 @@ def validate_dataset_token(view: Callable[Concatenate[T, P], R] | None = None):
|
||||
|
||||
def validate_and_get_api_token(scope: str | None = None):
|
||||
"""
|
||||
Validate and get API token.
|
||||
Validate and get API token with Redis caching.
|
||||
|
||||
This function uses a two-tier approach:
|
||||
1. First checks Redis cache for the token
|
||||
2. If not cached, queries database and caches the result
|
||||
|
||||
The last_used_at field is updated asynchronously via Celery task
|
||||
to avoid blocking the request.
|
||||
"""
|
||||
auth_header = request.headers.get("Authorization")
|
||||
if auth_header is None or " " not in auth_header:
|
||||
@@ -308,29 +320,18 @@ def validate_and_get_api_token(scope: str | None = None):
|
||||
if auth_scheme != "bearer":
|
||||
raise Unauthorized("Authorization scheme must be 'Bearer'")
|
||||
|
||||
current_time = naive_utc_now()
|
||||
cutoff_time = current_time - timedelta(minutes=1)
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
update_stmt = (
|
||||
update(ApiToken)
|
||||
.where(
|
||||
ApiToken.token == auth_token,
|
||||
(ApiToken.last_used_at.is_(None) | (ApiToken.last_used_at < cutoff_time)),
|
||||
ApiToken.type == scope,
|
||||
)
|
||||
.values(last_used_at=current_time)
|
||||
)
|
||||
stmt = select(ApiToken).where(ApiToken.token == auth_token, ApiToken.type == scope)
|
||||
result = session.execute(update_stmt)
|
||||
api_token = session.scalar(stmt)
|
||||
# Try to get token from cache first
|
||||
# Returns a CachedApiToken (plain Python object), not a SQLAlchemy model
|
||||
cached_token = ApiTokenCache.get(auth_token, scope)
|
||||
if cached_token is not None:
|
||||
logger.debug("Token validation served from cache for scope: %s", scope)
|
||||
# Record usage in Redis for later batch update (no Celery task per request)
|
||||
record_token_usage(auth_token, scope)
|
||||
return cast(ApiToken, cached_token)
|
||||
|
||||
if hasattr(result, "rowcount") and result.rowcount > 0:
|
||||
session.commit()
|
||||
|
||||
if not api_token:
|
||||
raise Unauthorized("Access token is invalid")
|
||||
|
||||
return api_token
|
||||
# Cache miss - use Redis lock for single-flight mode
|
||||
# This ensures only one request queries DB for the same token concurrently
|
||||
return fetch_token_with_single_flight(auth_token, scope)
|
||||
|
||||
|
||||
class DatasetApiResource(Resource):
|
||||
|
||||
@@ -65,15 +65,12 @@ def _jsonify_form_definition(form: Form, site_payload: dict | None = None) -> Re
|
||||
return Response(json.dumps(payload, ensure_ascii=False), mimetype="application/json")
|
||||
|
||||
|
||||
# TODO(QuantumGhost): disable authorization for web app
|
||||
# form api temporarily
|
||||
|
||||
|
||||
@web_ns.route("/form/human_input/<string:form_token>")
|
||||
# class HumanInputFormApi(WebApiResource):
|
||||
class HumanInputFormApi(Resource):
|
||||
"""API for getting and submitting human input forms via the web app."""
|
||||
|
||||
# NOTE(QuantumGhost): this endpoint is unauthenticated on purpose for now.
|
||||
|
||||
# def get(self, _app_model: App, _end_user: EndUser, form_token: str):
|
||||
def get(self, form_token: str):
|
||||
"""
|
||||
|
||||
@@ -10,8 +10,8 @@ from controllers.common.errors import (
|
||||
RemoteFileUploadError,
|
||||
UnsupportedFileTypeError,
|
||||
)
|
||||
from core.file import helpers as file_helpers
|
||||
from core.helper import ssrf_proxy
|
||||
from core.workflow.file import helpers as file_helpers
|
||||
from extensions.ext_database import db
|
||||
from fields.file_fields import FileWithSignedUrl, RemoteFileInfo
|
||||
from services.file_service import FileService
|
||||
|
||||
@@ -24,6 +24,7 @@ from core.errors.error import (
|
||||
)
|
||||
from core.model_runtime.errors.invoke import InvokeError
|
||||
from core.workflow.graph_engine.manager import GraphEngineManager
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs import helper
|
||||
from models.model import App, AppMode, EndUser
|
||||
from services.app_generate_service import AppGenerateService
|
||||
@@ -121,6 +122,6 @@ class WorkflowTaskStopApi(WebApiResource):
|
||||
AppQueueManager.set_stop_flag_no_user_check(task_id)
|
||||
|
||||
# New graph engine command channel mechanism
|
||||
GraphEngineManager.send_stop_command(task_id)
|
||||
GraphEngineManager(redis_client).send_stop_command(task_id)
|
||||
|
||||
return {"result": "success"}
|
||||
|
||||
@@ -17,8 +17,6 @@ from core.app.entities.app_invoke_entities import (
|
||||
)
|
||||
from core.callback_handler.agent_tool_callback_handler import DifyAgentCallbackHandler
|
||||
from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
|
||||
from core.file import file_manager
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_manager import ModelInstance
|
||||
from core.model_runtime.entities import (
|
||||
AssistantPromptMessage,
|
||||
@@ -33,6 +31,7 @@ from core.model_runtime.entities import (
|
||||
from core.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
|
||||
from core.model_runtime.entities.model_entities import ModelFeature
|
||||
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
||||
from core.model_runtime.token_buffer_memory import TokenBufferMemory
|
||||
from core.prompt.utils.extract_thread_messages import extract_thread_messages
|
||||
from core.tools.__base.tool import Tool
|
||||
from core.tools.entities.tool_entities import (
|
||||
@@ -40,6 +39,7 @@ from core.tools.entities.tool_entities import (
|
||||
)
|
||||
from core.tools.tool_manager import ToolManager
|
||||
from core.tools.utils.dataset_retriever_tool import DatasetRetrieverTool
|
||||
from core.workflow.file import file_manager
|
||||
from extensions.ext_database import db
|
||||
from factories import file_factory
|
||||
from models.enums import CreatorUserRole
|
||||
@@ -112,7 +112,7 @@ class BaseAgentRunner(AppRunner):
|
||||
|
||||
# check if model supports stream tool call
|
||||
llm_model = cast(LargeLanguageModel, model_instance.model_type_instance)
|
||||
model_schema = llm_model.get_model_schema(model_instance.model, model_instance.credentials)
|
||||
model_schema = llm_model.get_model_schema(model_instance.model_name, model_instance.credentials)
|
||||
features = model_schema.features if model_schema and model_schema.features else []
|
||||
self.stream_tool_call = ModelFeature.STREAM_TOOL_CALL in features
|
||||
self.files = application_generate_entity.files if ModelFeature.VISION in features else []
|
||||
|
||||
@@ -245,7 +245,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
|
||||
iteration_step += 1
|
||||
|
||||
yield LLMResultChunk(
|
||||
model=model_instance.model,
|
||||
model=model_instance.model_name,
|
||||
prompt_messages=prompt_messages,
|
||||
delta=LLMResultChunkDelta(
|
||||
index=0, message=AssistantPromptMessage(content=final_answer), usage=llm_usage["usage"]
|
||||
@@ -268,7 +268,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
|
||||
self.queue_manager.publish(
|
||||
QueueMessageEndEvent(
|
||||
llm_result=LLMResult(
|
||||
model=model_instance.model,
|
||||
model=model_instance.model_name,
|
||||
prompt_messages=prompt_messages,
|
||||
message=AssistantPromptMessage(content=final_answer),
|
||||
usage=llm_usage["usage"] or LLMUsage.empty_usage(),
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import json
|
||||
|
||||
from core.agent.cot_agent_runner import CotAgentRunner
|
||||
from core.file import file_manager
|
||||
from core.model_runtime.entities import (
|
||||
AssistantPromptMessage,
|
||||
PromptMessage,
|
||||
@@ -11,6 +10,7 @@ from core.model_runtime.entities import (
|
||||
)
|
||||
from core.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.workflow.file import file_manager
|
||||
|
||||
|
||||
class CotChatAgentRunner(CotAgentRunner):
|
||||
|
||||
@@ -7,7 +7,6 @@ from typing import Any, Union
|
||||
from core.agent.base_agent_runner import BaseAgentRunner
|
||||
from core.app.apps.base_app_queue_manager import PublishFrom
|
||||
from core.app.entities.queue_entities import QueueAgentThoughtEvent, QueueMessageEndEvent, QueueMessageFileEvent
|
||||
from core.file import file_manager
|
||||
from core.model_runtime.entities import (
|
||||
AssistantPromptMessage,
|
||||
LLMResult,
|
||||
@@ -25,6 +24,7 @@ from core.model_runtime.entities.message_entities import ImagePromptMessageConte
|
||||
from core.prompt.agent_history_prompt_transform import AgentHistoryPromptTransform
|
||||
from core.tools.entities.tool_entities import ToolInvokeMeta
|
||||
from core.tools.tool_engine import ToolEngine
|
||||
from core.workflow.file import file_manager
|
||||
from core.workflow.nodes.agent.exc import AgentMaxIterationError
|
||||
from models.model import Message
|
||||
|
||||
@@ -178,7 +178,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
|
||||
)
|
||||
|
||||
yield LLMResultChunk(
|
||||
model=model_instance.model,
|
||||
model=model_instance.model_name,
|
||||
prompt_messages=result.prompt_messages,
|
||||
system_fingerprint=result.system_fingerprint,
|
||||
delta=LLMResultChunkDelta(
|
||||
@@ -308,7 +308,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
|
||||
self.queue_manager.publish(
|
||||
QueueMessageEndEvent(
|
||||
llm_result=LLMResult(
|
||||
model=model_instance.model,
|
||||
model=model_instance.model_name,
|
||||
prompt_messages=prompt_messages,
|
||||
message=AssistantPromptMessage(content=final_answer),
|
||||
usage=llm_usage["usage"] or LLMUsage.empty_usage(),
|
||||
|
||||
@@ -5,9 +5,9 @@ from typing import Any, Literal
|
||||
from jsonschema import Draft7Validator, SchemaError
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from core.file import FileTransferMethod, FileType, FileUploadConfig
|
||||
from core.model_runtime.entities.llm_entities import LLMMode
|
||||
from core.model_runtime.entities.message_entities import PromptMessageRole
|
||||
from core.workflow.file import FileTransferMethod, FileType, FileUploadConfig
|
||||
from models.model import AppMode
|
||||
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ from collections.abc import Mapping
|
||||
from typing import Any
|
||||
|
||||
from constants import DEFAULT_FILE_NUMBER_LIMITS
|
||||
from core.file import FileUploadConfig
|
||||
from core.workflow.file import FileUploadConfig
|
||||
|
||||
|
||||
class FileUploadConfigManager:
|
||||
|
||||
@@ -669,16 +669,14 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
|
||||
) -> Generator[StreamResponse, None, None]:
|
||||
"""Handle retriever resources events."""
|
||||
self._message_cycle_manager.handle_retriever_resources(event)
|
||||
return
|
||||
yield # Make this a generator
|
||||
yield from ()
|
||||
|
||||
def _handle_annotation_reply_event(
|
||||
self, event: QueueAnnotationReplyEvent, **kwargs
|
||||
) -> Generator[StreamResponse, None, None]:
|
||||
"""Handle annotation reply events."""
|
||||
self._message_cycle_manager.handle_annotation_reply(event)
|
||||
return
|
||||
yield # Make this a generator
|
||||
yield from ()
|
||||
|
||||
def _handle_message_replace_event(
|
||||
self, event: QueueMessageReplaceEvent, **kwargs
|
||||
|
||||
@@ -12,11 +12,11 @@ from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
|
||||
from core.app.apps.base_app_runner import AppRunner
|
||||
from core.app.entities.app_invoke_entities import AgentChatAppGenerateEntity
|
||||
from core.app.entities.queue_entities import QueueAnnotationReplyEvent
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_manager import ModelInstance
|
||||
from core.model_runtime.entities.llm_entities import LLMMode
|
||||
from core.model_runtime.entities.model_entities import ModelFeature, ModelPropertyKey
|
||||
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
||||
from core.model_runtime.token_buffer_memory import TokenBufferMemory
|
||||
from core.moderation.base import ModerationError
|
||||
from extensions.ext_database import db
|
||||
from models.model import App, Conversation, Message
|
||||
@@ -178,7 +178,7 @@ class AgentChatAppRunner(AppRunner):
|
||||
|
||||
# change function call strategy based on LLM model
|
||||
llm_model = cast(LargeLanguageModel, model_instance.model_type_instance)
|
||||
model_schema = llm_model.get_model_schema(model_instance.model, model_instance.credentials)
|
||||
model_schema = llm_model.get_model_schema(model_instance.model_name, model_instance.credentials)
|
||||
if not model_schema:
|
||||
raise ValueError("Model schema not found")
|
||||
|
||||
|
||||
@@ -5,8 +5,8 @@ from sqlalchemy.orm import Session
|
||||
|
||||
from core.app.app_config.entities import VariableEntityType
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.file import File, FileUploadConfig
|
||||
from core.workflow.enums import NodeType
|
||||
from core.workflow.file import File, FileUploadConfig
|
||||
from core.workflow.repositories.draft_variable_repository import (
|
||||
DraftVariableSaver,
|
||||
DraftVariableSaverFactory,
|
||||
|
||||
@@ -2,7 +2,7 @@ import logging
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import IntEnum, auto
|
||||
from typing import Any
|
||||
|
||||
@@ -31,7 +31,7 @@ class PublishFrom(IntEnum):
|
||||
TASK_PIPELINE = auto()
|
||||
|
||||
|
||||
class AppQueueManager:
|
||||
class AppQueueManager(ABC):
|
||||
def __init__(self, task_id: str, user_id: str, invoke_from: InvokeFrom):
|
||||
if not user_id:
|
||||
raise ValueError("user is required")
|
||||
@@ -122,7 +122,7 @@ class AppQueueManager:
|
||||
"""Attach the live graph runtime state reference for downstream consumers."""
|
||||
self._graph_runtime_state = graph_runtime_state
|
||||
|
||||
def publish(self, event: AppQueueEvent, pub_from: PublishFrom):
|
||||
def publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish event to queue
|
||||
:param event:
|
||||
@@ -133,7 +133,7 @@ class AppQueueManager:
|
||||
self._publish(event, pub_from)
|
||||
|
||||
@abstractmethod
|
||||
def _publish(self, event: AppQueueEvent, pub_from: PublishFrom):
|
||||
def _publish(self, event: AppQueueEvent, pub_from: PublishFrom) -> None:
|
||||
"""
|
||||
Publish event to queue
|
||||
:param event:
|
||||
|
||||
@@ -22,8 +22,6 @@ from core.app.entities.queue_entities import (
|
||||
from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature
|
||||
from core.app.features.hosting_moderation.hosting_moderation import HostingModerationFeature
|
||||
from core.external_data_tool.external_data_fetch import ExternalDataFetch
|
||||
from core.file.enums import FileTransferMethod, FileType
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_manager import ModelInstance
|
||||
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
|
||||
from core.model_runtime.entities.message_entities import (
|
||||
@@ -34,17 +32,19 @@ from core.model_runtime.entities.message_entities import (
|
||||
)
|
||||
from core.model_runtime.entities.model_entities import ModelPropertyKey
|
||||
from core.model_runtime.errors.invoke import InvokeBadRequestError
|
||||
from core.model_runtime.token_buffer_memory import TokenBufferMemory
|
||||
from core.moderation.input_moderation import InputModeration
|
||||
from core.prompt.advanced_prompt_transform import AdvancedPromptTransform
|
||||
from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate, MemoryConfig
|
||||
from core.prompt.simple_prompt_transform import ModelMode, SimplePromptTransform
|
||||
from core.tools.tool_file_manager import ToolFileManager
|
||||
from core.workflow.file.enums import FileTransferMethod, FileType
|
||||
from extensions.ext_database import db
|
||||
from models.enums import CreatorUserRole
|
||||
from models.model import App, AppMode, Message, MessageAnnotation, MessageFile
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.file.models import File
|
||||
from core.workflow.file.models import File
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -11,12 +11,12 @@ from core.app.entities.app_invoke_entities import (
|
||||
)
|
||||
from core.app.entities.queue_entities import QueueAnnotationReplyEvent
|
||||
from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
|
||||
from core.file import File
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_manager import ModelInstance
|
||||
from core.model_runtime.entities.message_entities import ImagePromptMessageContent
|
||||
from core.model_runtime.token_buffer_memory import TokenBufferMemory
|
||||
from core.moderation.base import ModerationError
|
||||
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
|
||||
from core.workflow.file import File
|
||||
from extensions.ext_database import db
|
||||
from models.model import App, Conversation, Message
|
||||
|
||||
|
||||
@@ -45,7 +45,6 @@ from core.app.entities.task_entities import (
|
||||
WorkflowPauseStreamResponse,
|
||||
WorkflowStartStreamResponse,
|
||||
)
|
||||
from core.file import FILE_MODEL_IDENTITY, File
|
||||
from core.plugin.impl.datasource import PluginDatasourceManager
|
||||
from core.tools.entities.tool_entities import ToolProviderType
|
||||
from core.tools.tool_manager import ToolManager
|
||||
@@ -60,6 +59,7 @@ from core.workflow.enums import (
|
||||
WorkflowNodeExecutionMetadataKey,
|
||||
WorkflowNodeExecutionStatus,
|
||||
)
|
||||
from core.workflow.file import FILE_MODEL_IDENTITY, File
|
||||
from core.workflow.runtime import GraphRuntimeState
|
||||
from core.workflow.system_variable import SystemVariable
|
||||
from core.workflow.workflow_entry import WorkflowEntry
|
||||
@@ -346,7 +346,7 @@ class WorkflowResponseConverter:
|
||||
paused_nodes=list(event.paused_nodes),
|
||||
outputs=encoded_outputs,
|
||||
reasons=pause_reasons,
|
||||
status=WorkflowExecutionStatus.PAUSED.value,
|
||||
status=WorkflowExecutionStatus.PAUSED,
|
||||
created_at=int(started_at.timestamp()),
|
||||
elapsed_time=elapsed_time,
|
||||
total_tokens=graph_runtime_state.total_tokens,
|
||||
@@ -422,7 +422,7 @@ class WorkflowResponseConverter:
|
||||
data=WorkflowFinishStreamResponse.Data(
|
||||
id=run_id,
|
||||
workflow_id=workflow_run.workflow_id,
|
||||
status=workflow_run.status.value,
|
||||
status=workflow_run.status,
|
||||
outputs=encoded_outputs,
|
||||
error=workflow_run.error,
|
||||
elapsed_time=elapsed_time,
|
||||
|
||||
@@ -10,11 +10,11 @@ from core.app.entities.app_invoke_entities import (
|
||||
CompletionAppGenerateEntity,
|
||||
)
|
||||
from core.callback_handler.index_tool_callback_handler import DatasetIndexToolCallbackHandler
|
||||
from core.file import File
|
||||
from core.model_manager import ModelInstance
|
||||
from core.model_runtime.entities.message_entities import ImagePromptMessageContent
|
||||
from core.moderation.base import ModerationError
|
||||
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
|
||||
from core.workflow.file import File
|
||||
from extensions.ext_database import db
|
||||
from models.model import App, Message
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ def stream_topic_events(
|
||||
on_subscribe()
|
||||
while True:
|
||||
try:
|
||||
msg = sub.receive(timeout=0.1)
|
||||
msg = sub.receive(timeout=1)
|
||||
except SubscriptionClosedError:
|
||||
return
|
||||
if msg is None:
|
||||
|
||||
@@ -7,8 +7,8 @@ from pydantic import BaseModel, ConfigDict, Field, ValidationInfo, field_validat
|
||||
from constants import UUID_NIL
|
||||
from core.app.app_config.entities import EasyUIBasedAppConfig, WorkflowUIBasedAppConfig
|
||||
from core.entities.provider_configuration import ProviderModelBundle
|
||||
from core.file import File, FileUploadConfig
|
||||
from core.model_runtime.entities.model_entities import AIModelEntity
|
||||
from core.workflow.file import File, FileUploadConfig
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager
|
||||
|
||||
@@ -262,7 +262,7 @@ class WorkflowPauseStreamResponse(StreamResponse):
|
||||
paused_nodes: Sequence[str] = Field(default_factory=list)
|
||||
outputs: Mapping[str, Any] = Field(default_factory=dict)
|
||||
reasons: Sequence[Mapping[str, Any]] = Field(default_factory=list)
|
||||
status: str
|
||||
status: WorkflowExecutionStatus
|
||||
created_at: int
|
||||
elapsed_time: float
|
||||
total_tokens: int
|
||||
|
||||
1
api/core/app/llm/__init__.py
Normal file
1
api/core/app/llm/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""LLM-related application services."""
|
||||
103
api/core/app/llm/model_access.py
Normal file
103
api/core/app/llm/model_access.py
Normal file
@@ -0,0 +1,103 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
|
||||
from core.errors.error import ProviderTokenNotInitError
|
||||
from core.model_manager import ModelInstance, ModelManager
|
||||
from core.model_runtime.entities.model_entities import ModelType
|
||||
from core.provider_manager import ProviderManager
|
||||
from core.workflow.nodes.llm.entities import ModelConfig
|
||||
from core.workflow.nodes.llm.exc import LLMModeRequiredError, ModelNotExistError
|
||||
from core.workflow.nodes.llm.protocols import CredentialsProvider, ModelFactory
|
||||
|
||||
|
||||
class DifyCredentialsProvider:
|
||||
tenant_id: str
|
||||
provider_manager: ProviderManager
|
||||
|
||||
def __init__(self, tenant_id: str, provider_manager: ProviderManager | None = None) -> None:
|
||||
self.tenant_id = tenant_id
|
||||
self.provider_manager = provider_manager or ProviderManager()
|
||||
|
||||
def fetch(self, provider_name: str, model_name: str) -> dict[str, Any]:
|
||||
provider_configurations = self.provider_manager.get_configurations(self.tenant_id)
|
||||
provider_configuration = provider_configurations.get(provider_name)
|
||||
if not provider_configuration:
|
||||
raise ValueError(f"Provider {provider_name} does not exist.")
|
||||
|
||||
provider_model = provider_configuration.get_provider_model(model_type=ModelType.LLM, model=model_name)
|
||||
if provider_model is None:
|
||||
raise ModelNotExistError(f"Model {model_name} not exist.")
|
||||
provider_model.raise_for_status()
|
||||
|
||||
credentials = provider_configuration.get_current_credentials(model_type=ModelType.LLM, model=model_name)
|
||||
if credentials is None:
|
||||
raise ProviderTokenNotInitError(f"Model {model_name} credentials is not initialized.")
|
||||
|
||||
return credentials
|
||||
|
||||
|
||||
class DifyModelFactory:
|
||||
tenant_id: str
|
||||
model_manager: ModelManager
|
||||
|
||||
def __init__(self, tenant_id: str, model_manager: ModelManager | None = None) -> None:
|
||||
self.tenant_id = tenant_id
|
||||
self.model_manager = model_manager or ModelManager()
|
||||
|
||||
def init_model_instance(self, provider_name: str, model_name: str) -> ModelInstance:
|
||||
return self.model_manager.get_model_instance(
|
||||
tenant_id=self.tenant_id,
|
||||
provider=provider_name,
|
||||
model_type=ModelType.LLM,
|
||||
model=model_name,
|
||||
)
|
||||
|
||||
|
||||
def build_dify_model_access(tenant_id: str) -> tuple[CredentialsProvider, ModelFactory]:
|
||||
return (
|
||||
DifyCredentialsProvider(tenant_id=tenant_id),
|
||||
DifyModelFactory(tenant_id=tenant_id),
|
||||
)
|
||||
|
||||
|
||||
def fetch_model_config(
|
||||
*,
|
||||
node_data_model: ModelConfig,
|
||||
credentials_provider: CredentialsProvider,
|
||||
model_factory: ModelFactory,
|
||||
) -> tuple[ModelInstance, ModelConfigWithCredentialsEntity]:
|
||||
if not node_data_model.mode:
|
||||
raise LLMModeRequiredError("LLM mode is required.")
|
||||
|
||||
credentials = credentials_provider.fetch(node_data_model.provider, node_data_model.name)
|
||||
model_instance = model_factory.init_model_instance(node_data_model.provider, node_data_model.name)
|
||||
provider_model_bundle = model_instance.provider_model_bundle
|
||||
|
||||
provider_model = provider_model_bundle.configuration.get_provider_model(
|
||||
model=node_data_model.name,
|
||||
model_type=ModelType.LLM,
|
||||
)
|
||||
if provider_model is None:
|
||||
raise ModelNotExistError(f"Model {node_data_model.name} not exist.")
|
||||
provider_model.raise_for_status()
|
||||
|
||||
stop: list[str] = []
|
||||
if "stop" in node_data_model.completion_params:
|
||||
stop = node_data_model.completion_params.pop("stop")
|
||||
|
||||
model_schema = model_instance.model_type_instance.get_model_schema(node_data_model.name, credentials)
|
||||
if not model_schema:
|
||||
raise ModelNotExistError(f"Model {node_data_model.name} not exist.")
|
||||
|
||||
return model_instance, ModelConfigWithCredentialsEntity(
|
||||
provider=node_data_model.provider,
|
||||
model=node_data_model.name,
|
||||
model_schema=model_schema,
|
||||
mode=node_data_model.mode,
|
||||
provider_model_bundle=provider_model_bundle,
|
||||
credentials=credentials,
|
||||
parameters=node_data_model.completion_params,
|
||||
stop=stop,
|
||||
)
|
||||
@@ -56,10 +56,13 @@ from core.ops.entities.trace_entity import TraceTaskName
|
||||
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
|
||||
from core.prompt.utils.prompt_message_util import PromptMessageUtil
|
||||
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
|
||||
from core.tools.signature import sign_tool_file
|
||||
from core.workflow.file import helpers as file_helpers
|
||||
from core.workflow.file.enums import FileTransferMethod
|
||||
from events.message_event import message_was_created
|
||||
from extensions.ext_database import db
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.model import AppMode, Conversation, Message, MessageAgentThought
|
||||
from models.model import AppMode, Conversation, Message, MessageAgentThought, MessageFile, UploadFile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -463,6 +466,85 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
|
||||
metadata=metadata_dict,
|
||||
)
|
||||
|
||||
def _record_files(self):
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
message_files = session.scalars(select(MessageFile).where(MessageFile.message_id == self._message_id)).all()
|
||||
if not message_files:
|
||||
return None
|
||||
|
||||
files_list = []
|
||||
upload_file_ids = [
|
||||
mf.upload_file_id
|
||||
for mf in message_files
|
||||
if mf.transfer_method == FileTransferMethod.LOCAL_FILE and mf.upload_file_id
|
||||
]
|
||||
upload_files_map = {}
|
||||
if upload_file_ids:
|
||||
upload_files = session.scalars(select(UploadFile).where(UploadFile.id.in_(upload_file_ids))).all()
|
||||
upload_files_map = {uf.id: uf for uf in upload_files}
|
||||
|
||||
for message_file in message_files:
|
||||
upload_file = None
|
||||
if message_file.transfer_method == FileTransferMethod.LOCAL_FILE and message_file.upload_file_id:
|
||||
upload_file = upload_files_map.get(message_file.upload_file_id)
|
||||
|
||||
url = None
|
||||
filename = "file"
|
||||
mime_type = "application/octet-stream"
|
||||
size = 0
|
||||
extension = ""
|
||||
|
||||
if message_file.transfer_method == FileTransferMethod.REMOTE_URL:
|
||||
url = message_file.url
|
||||
if message_file.url:
|
||||
filename = message_file.url.split("/")[-1].split("?")[0] # Remove query params
|
||||
elif message_file.transfer_method == FileTransferMethod.LOCAL_FILE:
|
||||
if upload_file:
|
||||
url = file_helpers.get_signed_file_url(upload_file_id=str(upload_file.id))
|
||||
filename = upload_file.name
|
||||
mime_type = upload_file.mime_type or "application/octet-stream"
|
||||
size = upload_file.size or 0
|
||||
extension = f".{upload_file.extension}" if upload_file.extension else ""
|
||||
elif message_file.upload_file_id:
|
||||
# Fallback: generate URL even if upload_file not found
|
||||
url = file_helpers.get_signed_file_url(upload_file_id=str(message_file.upload_file_id))
|
||||
elif message_file.transfer_method == FileTransferMethod.TOOL_FILE and message_file.url:
|
||||
# For tool files, use URL directly if it's HTTP, otherwise sign it
|
||||
if message_file.url.startswith("http"):
|
||||
url = message_file.url
|
||||
filename = message_file.url.split("/")[-1].split("?")[0]
|
||||
else:
|
||||
# Extract tool file id and extension from URL
|
||||
url_parts = message_file.url.split("/")
|
||||
if url_parts:
|
||||
file_part = url_parts[-1].split("?")[0] # Remove query params first
|
||||
# Use rsplit to correctly handle filenames with multiple dots
|
||||
if "." in file_part:
|
||||
tool_file_id, ext = file_part.rsplit(".", 1)
|
||||
extension = f".{ext}"
|
||||
else:
|
||||
tool_file_id = file_part
|
||||
extension = ".bin"
|
||||
url = sign_tool_file(tool_file_id=tool_file_id, extension=extension)
|
||||
filename = file_part
|
||||
|
||||
transfer_method_value = message_file.transfer_method
|
||||
remote_url = message_file.url if message_file.transfer_method == FileTransferMethod.REMOTE_URL else ""
|
||||
file_dict = {
|
||||
"related_id": message_file.id,
|
||||
"extension": extension,
|
||||
"filename": filename,
|
||||
"size": size,
|
||||
"mime_type": mime_type,
|
||||
"transfer_method": transfer_method_value,
|
||||
"type": message_file.type,
|
||||
"url": url or "",
|
||||
"upload_file_id": message_file.upload_file_id or message_file.id,
|
||||
"remote_url": remote_url,
|
||||
}
|
||||
files_list.append(file_dict)
|
||||
return files_list or None
|
||||
|
||||
def _agent_message_to_stream_response(self, answer: str, message_id: str) -> AgentMessageStreamResponse:
|
||||
"""
|
||||
Agent message to stream response.
|
||||
|
||||
@@ -64,7 +64,13 @@ class MessageCycleManager:
|
||||
|
||||
# Use SQLAlchemy 2.x style session.scalar(select(...))
|
||||
with session_factory.create_session() as session:
|
||||
message_file = session.scalar(select(MessageFile).where(MessageFile.message_id == message_id))
|
||||
message_file = session.scalar(
|
||||
select(MessageFile)
|
||||
.where(
|
||||
MessageFile.message_id == message_id,
|
||||
)
|
||||
.where(MessageFile.belongs_to == "assistant")
|
||||
)
|
||||
|
||||
if message_file:
|
||||
self._message_has_file.add(message_id)
|
||||
|
||||
47
api/core/app/workflow/file_runtime.py
Normal file
47
api/core/app/workflow/file_runtime.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
|
||||
from configs import dify_config
|
||||
from core.helper.ssrf_proxy import ssrf_proxy
|
||||
from core.tools.signature import sign_tool_file
|
||||
from core.workflow.file.protocols import HttpResponseProtocol, WorkflowFileRuntimeProtocol
|
||||
from core.workflow.file.runtime import set_workflow_file_runtime
|
||||
from extensions.ext_storage import storage
|
||||
|
||||
|
||||
class DifyWorkflowFileRuntime(WorkflowFileRuntimeProtocol):
|
||||
"""Production runtime wiring for ``core.workflow.file``."""
|
||||
|
||||
@property
|
||||
def files_url(self) -> str:
|
||||
return dify_config.FILES_URL
|
||||
|
||||
@property
|
||||
def internal_files_url(self) -> str | None:
|
||||
return dify_config.INTERNAL_FILES_URL
|
||||
|
||||
@property
|
||||
def secret_key(self) -> str:
|
||||
return dify_config.SECRET_KEY
|
||||
|
||||
@property
|
||||
def files_access_timeout(self) -> int:
|
||||
return dify_config.FILES_ACCESS_TIMEOUT
|
||||
|
||||
@property
|
||||
def multimodal_send_format(self) -> str:
|
||||
return dify_config.MULTIMODAL_SEND_FORMAT
|
||||
|
||||
def http_get(self, url: str, *, follow_redirects: bool = True) -> HttpResponseProtocol:
|
||||
return ssrf_proxy.get(url, follow_redirects=follow_redirects)
|
||||
|
||||
def storage_load(self, path: str, *, stream: bool = False) -> bytes | Generator:
|
||||
return storage.load(path, stream=stream)
|
||||
|
||||
def sign_tool_file(self, *, tool_file_id: str, extension: str, for_external: bool = True) -> str:
|
||||
return sign_tool_file(tool_file_id=tool_file_id, extension=extension, for_external=for_external)
|
||||
|
||||
|
||||
def bind_dify_workflow_file_runtime() -> None:
|
||||
set_workflow_file_runtime(DifyWorkflowFileRuntime())
|
||||
@@ -1,26 +1,34 @@
|
||||
from collections.abc import Callable, Sequence
|
||||
from typing import TYPE_CHECKING, final
|
||||
from collections.abc import Mapping
|
||||
from typing import TYPE_CHECKING, Any, final
|
||||
|
||||
from typing_extensions import override
|
||||
|
||||
from configs import dify_config
|
||||
from core.file.file_manager import file_manager
|
||||
from core.helper.code_executor.code_executor import CodeExecutor
|
||||
from core.app.llm.model_access import build_dify_model_access
|
||||
from core.datasource.datasource_manager import DatasourceManager
|
||||
from core.helper.code_executor.code_executor import CodeExecutionError, CodeExecutor
|
||||
from core.helper.code_executor.code_node_provider import CodeNodeProvider
|
||||
from core.helper.ssrf_proxy import ssrf_proxy
|
||||
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
|
||||
from core.tools.tool_file_manager import ToolFileManager
|
||||
from core.workflow.entities.graph_config import NodeConfigDict
|
||||
from core.workflow.enums import NodeType
|
||||
from core.workflow.file.file_manager import file_manager
|
||||
from core.workflow.graph.graph import NodeFactory
|
||||
from core.workflow.nodes.base.node import Node
|
||||
from core.workflow.nodes.code.code_node import CodeNode
|
||||
from core.workflow.nodes.code.code_node import CodeNode, WorkflowCodeExecutor
|
||||
from core.workflow.nodes.code.entities import CodeLanguage
|
||||
from core.workflow.nodes.code.limits import CodeNodeLimits
|
||||
from core.workflow.nodes.http_request.node import HttpRequestNode
|
||||
from core.workflow.nodes.datasource import DatasourceNode
|
||||
from core.workflow.nodes.document_extractor import DocumentExtractorNode, UnstructuredApiConfig
|
||||
from core.workflow.nodes.http_request import HttpRequestNode, build_http_request_config
|
||||
from core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node import KnowledgeRetrievalNode
|
||||
from core.workflow.nodes.llm.node import LLMNode
|
||||
from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
|
||||
from core.workflow.nodes.protocols import FileManagerProtocol, HttpClientProtocol
|
||||
from core.workflow.nodes.parameter_extractor.parameter_extractor_node import ParameterExtractorNode
|
||||
from core.workflow.nodes.question_classifier.question_classifier_node import QuestionClassifierNode
|
||||
from core.workflow.nodes.template_transform.template_renderer import (
|
||||
CodeExecutorJinja2TemplateRenderer,
|
||||
Jinja2TemplateRenderer,
|
||||
)
|
||||
from core.workflow.nodes.template_transform.template_transform_node import TemplateTransformNode
|
||||
|
||||
@@ -29,6 +37,24 @@ if TYPE_CHECKING:
|
||||
from core.workflow.runtime import GraphRuntimeState
|
||||
|
||||
|
||||
class DefaultWorkflowCodeExecutor:
|
||||
def execute(
|
||||
self,
|
||||
*,
|
||||
language: CodeLanguage,
|
||||
code: str,
|
||||
inputs: Mapping[str, Any],
|
||||
) -> Mapping[str, Any]:
|
||||
return CodeExecutor.execute_workflow_code_template(
|
||||
language=language,
|
||||
code=code,
|
||||
inputs=inputs,
|
||||
)
|
||||
|
||||
def is_execution_error(self, error: Exception) -> bool:
|
||||
return isinstance(error, CodeExecutionError)
|
||||
|
||||
|
||||
@final
|
||||
class DifyNodeFactory(NodeFactory):
|
||||
"""
|
||||
@@ -42,23 +68,12 @@ class DifyNodeFactory(NodeFactory):
|
||||
self,
|
||||
graph_init_params: "GraphInitParams",
|
||||
graph_runtime_state: "GraphRuntimeState",
|
||||
*,
|
||||
code_executor: type[CodeExecutor] | None = None,
|
||||
code_providers: Sequence[type[CodeNodeProvider]] | None = None,
|
||||
code_limits: CodeNodeLimits | None = None,
|
||||
template_renderer: Jinja2TemplateRenderer | None = None,
|
||||
template_transform_max_output_length: int | None = None,
|
||||
http_request_http_client: HttpClientProtocol | None = None,
|
||||
http_request_tool_file_manager_factory: Callable[[], ToolFileManager] = ToolFileManager,
|
||||
http_request_file_manager: FileManagerProtocol | None = None,
|
||||
) -> None:
|
||||
self.graph_init_params = graph_init_params
|
||||
self.graph_runtime_state = graph_runtime_state
|
||||
self._code_executor: type[CodeExecutor] = code_executor or CodeExecutor
|
||||
self._code_providers: tuple[type[CodeNodeProvider], ...] = (
|
||||
tuple(code_providers) if code_providers else CodeNode.default_code_providers()
|
||||
)
|
||||
self._code_limits = code_limits or CodeNodeLimits(
|
||||
self._code_executor: WorkflowCodeExecutor = DefaultWorkflowCodeExecutor()
|
||||
self._code_providers: tuple[type[CodeNodeProvider], ...] = CodeNode.default_code_providers()
|
||||
self._code_limits = CodeNodeLimits(
|
||||
max_string_length=dify_config.CODE_MAX_STRING_LENGTH,
|
||||
max_number=dify_config.CODE_MAX_NUMBER,
|
||||
min_number=dify_config.CODE_MIN_NUMBER,
|
||||
@@ -68,13 +83,27 @@ class DifyNodeFactory(NodeFactory):
|
||||
max_string_array_length=dify_config.CODE_MAX_STRING_ARRAY_LENGTH,
|
||||
max_object_array_length=dify_config.CODE_MAX_OBJECT_ARRAY_LENGTH,
|
||||
)
|
||||
self._template_renderer = template_renderer or CodeExecutorJinja2TemplateRenderer()
|
||||
self._template_transform_max_output_length = (
|
||||
template_transform_max_output_length or dify_config.TEMPLATE_TRANSFORM_MAX_LENGTH
|
||||
self._template_renderer = CodeExecutorJinja2TemplateRenderer()
|
||||
self._template_transform_max_output_length = dify_config.TEMPLATE_TRANSFORM_MAX_LENGTH
|
||||
self._http_request_http_client = ssrf_proxy
|
||||
self._http_request_tool_file_manager_factory = ToolFileManager
|
||||
self._http_request_file_manager = file_manager
|
||||
self._rag_retrieval = DatasetRetrieval()
|
||||
self._document_extractor_unstructured_api_config = UnstructuredApiConfig(
|
||||
api_url=dify_config.UNSTRUCTURED_API_URL,
|
||||
api_key=dify_config.UNSTRUCTURED_API_KEY or "",
|
||||
)
|
||||
self._http_request_http_client = http_request_http_client or ssrf_proxy
|
||||
self._http_request_tool_file_manager_factory = http_request_tool_file_manager_factory
|
||||
self._http_request_file_manager = http_request_file_manager or file_manager
|
||||
self._http_request_config = build_http_request_config(
|
||||
max_connect_timeout=dify_config.HTTP_REQUEST_MAX_CONNECT_TIMEOUT,
|
||||
max_read_timeout=dify_config.HTTP_REQUEST_MAX_READ_TIMEOUT,
|
||||
max_write_timeout=dify_config.HTTP_REQUEST_MAX_WRITE_TIMEOUT,
|
||||
max_binary_size=dify_config.HTTP_REQUEST_NODE_MAX_BINARY_SIZE,
|
||||
max_text_size=dify_config.HTTP_REQUEST_NODE_MAX_TEXT_SIZE,
|
||||
ssl_verify=dify_config.HTTP_REQUEST_NODE_SSL_VERIFY,
|
||||
ssrf_default_max_retries=dify_config.SSRF_DEFAULT_MAX_RETRIES,
|
||||
)
|
||||
|
||||
self._llm_credentials_provider, self._llm_model_factory = build_dify_model_access(graph_init_params.tenant_id)
|
||||
|
||||
@override
|
||||
def create_node(self, node_config: NodeConfigDict) -> Node:
|
||||
@@ -135,11 +164,69 @@ class DifyNodeFactory(NodeFactory):
|
||||
config=node_config,
|
||||
graph_init_params=self.graph_init_params,
|
||||
graph_runtime_state=self.graph_runtime_state,
|
||||
http_request_config=self._http_request_config,
|
||||
http_client=self._http_request_http_client,
|
||||
tool_file_manager_factory=self._http_request_tool_file_manager_factory,
|
||||
file_manager=self._http_request_file_manager,
|
||||
)
|
||||
|
||||
if node_type == NodeType.LLM:
|
||||
return LLMNode(
|
||||
id=node_id,
|
||||
config=node_config,
|
||||
graph_init_params=self.graph_init_params,
|
||||
graph_runtime_state=self.graph_runtime_state,
|
||||
credentials_provider=self._llm_credentials_provider,
|
||||
model_factory=self._llm_model_factory,
|
||||
)
|
||||
|
||||
if node_type == NodeType.DATASOURCE:
|
||||
return DatasourceNode(
|
||||
id=node_id,
|
||||
config=node_config,
|
||||
graph_init_params=self.graph_init_params,
|
||||
graph_runtime_state=self.graph_runtime_state,
|
||||
datasource_manager=DatasourceManager,
|
||||
)
|
||||
|
||||
if node_type == NodeType.KNOWLEDGE_RETRIEVAL:
|
||||
return KnowledgeRetrievalNode(
|
||||
id=node_id,
|
||||
config=node_config,
|
||||
graph_init_params=self.graph_init_params,
|
||||
graph_runtime_state=self.graph_runtime_state,
|
||||
rag_retrieval=self._rag_retrieval,
|
||||
)
|
||||
|
||||
if node_type == NodeType.DOCUMENT_EXTRACTOR:
|
||||
return DocumentExtractorNode(
|
||||
id=node_id,
|
||||
config=node_config,
|
||||
graph_init_params=self.graph_init_params,
|
||||
graph_runtime_state=self.graph_runtime_state,
|
||||
unstructured_api_config=self._document_extractor_unstructured_api_config,
|
||||
)
|
||||
|
||||
if node_type == NodeType.QUESTION_CLASSIFIER:
|
||||
return QuestionClassifierNode(
|
||||
id=node_id,
|
||||
config=node_config,
|
||||
graph_init_params=self.graph_init_params,
|
||||
graph_runtime_state=self.graph_runtime_state,
|
||||
credentials_provider=self._llm_credentials_provider,
|
||||
model_factory=self._llm_model_factory,
|
||||
)
|
||||
|
||||
if node_type == NodeType.PARAMETER_EXTRACTOR:
|
||||
return ParameterExtractorNode(
|
||||
id=node_id,
|
||||
config=node_config,
|
||||
graph_init_params=self.graph_init_params,
|
||||
graph_runtime_state=self.graph_runtime_state,
|
||||
credentials_provider=self._llm_credentials_provider,
|
||||
model_factory=self._llm_model_factory,
|
||||
)
|
||||
|
||||
return node_class(
|
||||
id=node_id,
|
||||
config=node_config,
|
||||
|
||||
@@ -213,6 +213,6 @@ class DatasourceFileManager:
|
||||
|
||||
|
||||
# init tool_file_parser
|
||||
# from core.file.datasource_file_parser import datasource_file_manager
|
||||
# from core.workflow.file.datasource_file_parser import datasource_file_manager
|
||||
#
|
||||
# datasource_file_manager["manager"] = DatasourceFileManager
|
||||
|
||||
@@ -1,16 +1,39 @@
|
||||
import logging
|
||||
from collections.abc import Generator
|
||||
from threading import Lock
|
||||
from typing import Any, cast
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
import contexts
|
||||
from core.datasource.__base.datasource_plugin import DatasourcePlugin
|
||||
from core.datasource.__base.datasource_provider import DatasourcePluginProviderController
|
||||
from core.datasource.entities.datasource_entities import DatasourceProviderType
|
||||
from core.datasource.entities.datasource_entities import (
|
||||
DatasourceMessage,
|
||||
DatasourceProviderType,
|
||||
GetOnlineDocumentPageContentRequest,
|
||||
OnlineDriveDownloadFileRequest,
|
||||
)
|
||||
from core.datasource.errors import DatasourceProviderNotFoundError
|
||||
from core.datasource.local_file.local_file_provider import LocalFileDatasourcePluginProviderController
|
||||
from core.datasource.online_document.online_document_plugin import OnlineDocumentDatasourcePlugin
|
||||
from core.datasource.online_document.online_document_provider import OnlineDocumentDatasourcePluginProviderController
|
||||
from core.datasource.online_drive.online_drive_plugin import OnlineDriveDatasourcePlugin
|
||||
from core.datasource.online_drive.online_drive_provider import OnlineDriveDatasourcePluginProviderController
|
||||
from core.datasource.utils.message_transformer import DatasourceFileMessageTransformer
|
||||
from core.datasource.website_crawl.website_crawl_provider import WebsiteCrawlDatasourcePluginProviderController
|
||||
from core.db.session_factory import session_factory
|
||||
from core.plugin.impl.datasource import PluginDatasourceManager
|
||||
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
|
||||
from core.workflow.enums import WorkflowNodeExecutionMetadataKey
|
||||
from core.workflow.file import File
|
||||
from core.workflow.file.enums import FileTransferMethod, FileType
|
||||
from core.workflow.node_events import NodeRunResult, StreamChunkEvent, StreamCompletedEvent
|
||||
from core.workflow.repositories.datasource_manager_protocol import DatasourceParameter, OnlineDriveDownloadFileParam
|
||||
from factories import file_factory
|
||||
from models.model import UploadFile
|
||||
from models.tools import ToolFile
|
||||
from services.datasource_provider_service import DatasourceProviderService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -103,3 +126,238 @@ class DatasourceManager:
|
||||
tenant_id,
|
||||
datasource_type,
|
||||
).get_datasource(datasource_name)
|
||||
|
||||
@classmethod
|
||||
def get_icon_url(cls, provider_id: str, tenant_id: str, datasource_name: str, datasource_type: str) -> str:
|
||||
datasource_runtime = cls.get_datasource_runtime(
|
||||
provider_id=provider_id,
|
||||
datasource_name=datasource_name,
|
||||
tenant_id=tenant_id,
|
||||
datasource_type=DatasourceProviderType.value_of(datasource_type),
|
||||
)
|
||||
return datasource_runtime.get_icon_url(tenant_id)
|
||||
|
||||
@classmethod
|
||||
def stream_online_results(
|
||||
cls,
|
||||
*,
|
||||
user_id: str,
|
||||
datasource_name: str,
|
||||
datasource_type: str,
|
||||
provider_id: str,
|
||||
tenant_id: str,
|
||||
provider: str,
|
||||
plugin_id: str,
|
||||
credential_id: str,
|
||||
datasource_param: DatasourceParameter | None = None,
|
||||
online_drive_request: OnlineDriveDownloadFileParam | None = None,
|
||||
) -> Generator[DatasourceMessage, None, Any]:
|
||||
"""
|
||||
Pull-based streaming of domain messages from datasource plugins.
|
||||
Returns a generator that yields DatasourceMessage and finally returns a minimal final payload.
|
||||
Only ONLINE_DOCUMENT and ONLINE_DRIVE are streamable here; other types are handled by nodes directly.
|
||||
"""
|
||||
ds_type = DatasourceProviderType.value_of(datasource_type)
|
||||
runtime = cls.get_datasource_runtime(
|
||||
provider_id=provider_id,
|
||||
datasource_name=datasource_name,
|
||||
tenant_id=tenant_id,
|
||||
datasource_type=ds_type,
|
||||
)
|
||||
|
||||
dsp_service = DatasourceProviderService()
|
||||
credentials = dsp_service.get_datasource_credentials(
|
||||
tenant_id=tenant_id,
|
||||
provider=provider,
|
||||
plugin_id=plugin_id,
|
||||
credential_id=credential_id,
|
||||
)
|
||||
|
||||
if ds_type == DatasourceProviderType.ONLINE_DOCUMENT:
|
||||
doc_runtime = cast(OnlineDocumentDatasourcePlugin, runtime)
|
||||
if credentials:
|
||||
doc_runtime.runtime.credentials = credentials
|
||||
if datasource_param is None:
|
||||
raise ValueError("datasource_param is required for ONLINE_DOCUMENT streaming")
|
||||
inner_gen: Generator[DatasourceMessage, None, None] = doc_runtime.get_online_document_page_content(
|
||||
user_id=user_id,
|
||||
datasource_parameters=GetOnlineDocumentPageContentRequest(
|
||||
workspace_id=datasource_param.workspace_id,
|
||||
page_id=datasource_param.page_id,
|
||||
type=datasource_param.type,
|
||||
),
|
||||
provider_type=ds_type,
|
||||
)
|
||||
elif ds_type == DatasourceProviderType.ONLINE_DRIVE:
|
||||
drive_runtime = cast(OnlineDriveDatasourcePlugin, runtime)
|
||||
if credentials:
|
||||
drive_runtime.runtime.credentials = credentials
|
||||
if online_drive_request is None:
|
||||
raise ValueError("online_drive_request is required for ONLINE_DRIVE streaming")
|
||||
inner_gen = drive_runtime.online_drive_download_file(
|
||||
user_id=user_id,
|
||||
request=OnlineDriveDownloadFileRequest(
|
||||
id=online_drive_request.id,
|
||||
bucket=online_drive_request.bucket,
|
||||
),
|
||||
provider_type=ds_type,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported datasource type for streaming: {ds_type}")
|
||||
|
||||
# Bridge through to caller while preserving generator return contract
|
||||
yield from inner_gen
|
||||
# No structured final data here; node/adapter will assemble outputs
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
def stream_node_events(
|
||||
cls,
|
||||
*,
|
||||
node_id: str,
|
||||
user_id: str,
|
||||
datasource_name: str,
|
||||
datasource_type: str,
|
||||
provider_id: str,
|
||||
tenant_id: str,
|
||||
provider: str,
|
||||
plugin_id: str,
|
||||
credential_id: str,
|
||||
parameters_for_log: dict[str, Any],
|
||||
datasource_info: dict[str, Any],
|
||||
variable_pool: Any,
|
||||
datasource_param: DatasourceParameter | None = None,
|
||||
online_drive_request: OnlineDriveDownloadFileParam | None = None,
|
||||
) -> Generator[StreamChunkEvent | StreamCompletedEvent, None, None]:
|
||||
ds_type = DatasourceProviderType.value_of(datasource_type)
|
||||
|
||||
messages = cls.stream_online_results(
|
||||
user_id=user_id,
|
||||
datasource_name=datasource_name,
|
||||
datasource_type=datasource_type,
|
||||
provider_id=provider_id,
|
||||
tenant_id=tenant_id,
|
||||
provider=provider,
|
||||
plugin_id=plugin_id,
|
||||
credential_id=credential_id,
|
||||
datasource_param=datasource_param,
|
||||
online_drive_request=online_drive_request,
|
||||
)
|
||||
|
||||
transformed = DatasourceFileMessageTransformer.transform_datasource_invoke_messages(
|
||||
messages=messages, user_id=user_id, tenant_id=tenant_id, conversation_id=None
|
||||
)
|
||||
|
||||
variables: dict[str, Any] = {}
|
||||
file_out: File | None = None
|
||||
|
||||
for message in transformed:
|
||||
mtype = message.type
|
||||
if mtype in {
|
||||
DatasourceMessage.MessageType.IMAGE_LINK,
|
||||
DatasourceMessage.MessageType.BINARY_LINK,
|
||||
DatasourceMessage.MessageType.IMAGE,
|
||||
}:
|
||||
wanted_ds_type = ds_type in {
|
||||
DatasourceProviderType.ONLINE_DRIVE,
|
||||
DatasourceProviderType.ONLINE_DOCUMENT,
|
||||
}
|
||||
if wanted_ds_type and isinstance(message.message, DatasourceMessage.TextMessage):
|
||||
url = message.message.text
|
||||
|
||||
datasource_file_id = str(url).split("/")[-1].split(".")[0]
|
||||
with session_factory.create_session() as session:
|
||||
stmt = select(ToolFile).where(
|
||||
ToolFile.id == datasource_file_id, ToolFile.tenant_id == tenant_id
|
||||
)
|
||||
datasource_file = session.scalar(stmt)
|
||||
if not datasource_file:
|
||||
raise ValueError(
|
||||
f"ToolFile not found for file_id={datasource_file_id}, tenant_id={tenant_id}"
|
||||
)
|
||||
mime_type = datasource_file.mimetype
|
||||
if datasource_file is not None:
|
||||
mapping = {
|
||||
"tool_file_id": datasource_file_id,
|
||||
"type": file_factory.get_file_type_by_mime_type(mime_type),
|
||||
"transfer_method": FileTransferMethod.TOOL_FILE,
|
||||
"url": url,
|
||||
}
|
||||
file_out = file_factory.build_from_mapping(mapping=mapping, tenant_id=tenant_id)
|
||||
elif mtype == DatasourceMessage.MessageType.TEXT:
|
||||
assert isinstance(message.message, DatasourceMessage.TextMessage)
|
||||
yield StreamChunkEvent(selector=[node_id, "text"], chunk=message.message.text, is_final=False)
|
||||
elif mtype == DatasourceMessage.MessageType.LINK:
|
||||
assert isinstance(message.message, DatasourceMessage.TextMessage)
|
||||
yield StreamChunkEvent(
|
||||
selector=[node_id, "text"], chunk=f"Link: {message.message.text}\n", is_final=False
|
||||
)
|
||||
elif mtype == DatasourceMessage.MessageType.VARIABLE:
|
||||
assert isinstance(message.message, DatasourceMessage.VariableMessage)
|
||||
name = message.message.variable_name
|
||||
value = message.message.variable_value
|
||||
if message.message.stream:
|
||||
assert isinstance(value, str), "stream variable_value must be str"
|
||||
variables[name] = variables.get(name, "") + value
|
||||
yield StreamChunkEvent(selector=[node_id, name], chunk=value, is_final=False)
|
||||
else:
|
||||
variables[name] = value
|
||||
elif mtype == DatasourceMessage.MessageType.FILE:
|
||||
if ds_type == DatasourceProviderType.ONLINE_DRIVE and message.meta:
|
||||
f = message.meta.get("file")
|
||||
if isinstance(f, File):
|
||||
file_out = f
|
||||
else:
|
||||
pass
|
||||
|
||||
yield StreamChunkEvent(selector=[node_id, "text"], chunk="", is_final=True)
|
||||
|
||||
if ds_type == DatasourceProviderType.ONLINE_DRIVE and file_out is not None:
|
||||
variable_pool.add([node_id, "file"], file_out)
|
||||
|
||||
if ds_type == DatasourceProviderType.ONLINE_DOCUMENT:
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
inputs=parameters_for_log,
|
||||
metadata={WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info},
|
||||
outputs={**variables},
|
||||
)
|
||||
)
|
||||
else:
|
||||
yield StreamCompletedEvent(
|
||||
node_run_result=NodeRunResult(
|
||||
status=WorkflowNodeExecutionStatus.SUCCEEDED,
|
||||
inputs=parameters_for_log,
|
||||
metadata={WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info},
|
||||
outputs={
|
||||
"file": file_out,
|
||||
"datasource_type": ds_type,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_upload_file_by_id(cls, file_id: str, tenant_id: str) -> File:
|
||||
with session_factory.create_session() as session:
|
||||
upload_file = (
|
||||
session.query(UploadFile).where(UploadFile.id == file_id, UploadFile.tenant_id == tenant_id).first()
|
||||
)
|
||||
if not upload_file:
|
||||
raise ValueError(f"UploadFile not found for file_id={file_id}, tenant_id={tenant_id}")
|
||||
|
||||
file_info = File(
|
||||
id=upload_file.id,
|
||||
filename=upload_file.name,
|
||||
extension="." + upload_file.extension,
|
||||
mime_type=upload_file.mime_type,
|
||||
tenant_id=tenant_id,
|
||||
type=FileType.CUSTOM,
|
||||
transfer_method=FileTransferMethod.LOCAL_FILE,
|
||||
remote_url=upload_file.source_url,
|
||||
related_id=upload_file.id,
|
||||
size=upload_file.size,
|
||||
storage_key=upload_file.key,
|
||||
url=upload_file.source_url,
|
||||
)
|
||||
return file_info
|
||||
|
||||
@@ -379,4 +379,11 @@ class OnlineDriveDownloadFileRequest(BaseModel):
|
||||
"""
|
||||
|
||||
id: str = Field(..., description="The id of the file")
|
||||
bucket: str | None = Field(None, description="The name of the bucket")
|
||||
bucket: str = Field("", description="The name of the bucket")
|
||||
|
||||
@field_validator("bucket", mode="before")
|
||||
@classmethod
|
||||
def _coerce_bucket(cls, v) -> str:
|
||||
if v is None:
|
||||
return ""
|
||||
return str(v)
|
||||
|
||||
@@ -3,8 +3,8 @@ from collections.abc import Generator
|
||||
from mimetypes import guess_extension, guess_type
|
||||
|
||||
from core.datasource.entities.datasource_entities import DatasourceMessage
|
||||
from core.file import File, FileTransferMethod, FileType
|
||||
from core.tools.tool_file_manager import ToolFileManager
|
||||
from core.workflow.file import File, FileTransferMethod, FileType
|
||||
from models.tools import ToolFile
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -10,12 +10,12 @@ from pydantic import BaseModel
|
||||
|
||||
from configs import dify_config
|
||||
from core.entities.provider_entities import BasicProviderConfig
|
||||
from core.file import helpers as file_helpers
|
||||
from core.helper import encrypter
|
||||
from core.helper.provider_cache import NoOpProviderCredentialCache
|
||||
from core.mcp.types import OAuthClientInformation, OAuthClientMetadata, OAuthTokens
|
||||
from core.tools.entities.common_entities import I18nObject
|
||||
from core.tools.entities.tool_entities import ToolProviderType
|
||||
from core.workflow.file import helpers as file_helpers
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from models.tools import MCPToolProvider
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
from collections.abc import Callable
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.tools.tool_file_manager import ToolFileManager
|
||||
|
||||
_tool_file_manager_factory: Callable[[], "ToolFileManager"] | None = None
|
||||
|
||||
|
||||
def set_tool_file_manager_factory(factory: Callable[[], "ToolFileManager"]):
|
||||
global _tool_file_manager_factory
|
||||
_tool_file_manager_factory = factory
|
||||
@@ -35,7 +35,7 @@ class ModelInstance:
|
||||
|
||||
def __init__(self, provider_model_bundle: ProviderModelBundle, model: str):
|
||||
self.provider_model_bundle = provider_model_bundle
|
||||
self.model = model
|
||||
self.model_name = model
|
||||
self.provider = provider_model_bundle.configuration.provider.provider
|
||||
self.credentials = self._fetch_credentials_from_bundle(provider_model_bundle, model)
|
||||
self.model_type_instance = self.provider_model_bundle.model_type_instance
|
||||
@@ -163,7 +163,7 @@ class ModelInstance:
|
||||
Union[LLMResult, Generator],
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.invoke,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
prompt_messages=prompt_messages,
|
||||
model_parameters=model_parameters,
|
||||
@@ -191,7 +191,7 @@ class ModelInstance:
|
||||
int,
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.get_num_tokens,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
prompt_messages=prompt_messages,
|
||||
tools=tools,
|
||||
@@ -215,7 +215,7 @@ class ModelInstance:
|
||||
EmbeddingResult,
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.invoke,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
texts=texts,
|
||||
user=user,
|
||||
@@ -243,7 +243,7 @@ class ModelInstance:
|
||||
EmbeddingResult,
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.invoke,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
multimodel_documents=multimodel_documents,
|
||||
user=user,
|
||||
@@ -264,7 +264,7 @@ class ModelInstance:
|
||||
list[int],
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.get_num_tokens,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
texts=texts,
|
||||
),
|
||||
@@ -294,7 +294,7 @@ class ModelInstance:
|
||||
RerankResult,
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.invoke,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
query=query,
|
||||
docs=docs,
|
||||
@@ -328,7 +328,7 @@ class ModelInstance:
|
||||
RerankResult,
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.invoke_multimodal_rerank,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
query=query,
|
||||
docs=docs,
|
||||
@@ -352,7 +352,7 @@ class ModelInstance:
|
||||
bool,
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.invoke,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
text=text,
|
||||
user=user,
|
||||
@@ -373,7 +373,7 @@ class ModelInstance:
|
||||
str,
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.invoke,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
file=file,
|
||||
user=user,
|
||||
@@ -396,7 +396,7 @@ class ModelInstance:
|
||||
Iterable[bytes],
|
||||
self._round_robin_invoke(
|
||||
function=self.model_type_instance.invoke,
|
||||
model=self.model,
|
||||
model=self.model_name,
|
||||
credentials=self.credentials,
|
||||
content_text=content_text,
|
||||
user=user,
|
||||
@@ -469,7 +469,7 @@ class ModelInstance:
|
||||
if not isinstance(self.model_type_instance, TTSModel):
|
||||
raise Exception("Model type instance is not TTSModel")
|
||||
return self.model_type_instance.get_tts_model_voices(
|
||||
model=self.model, credentials=self.credentials, language=language
|
||||
model=self.model_name, credentials=self.credentials, language=language
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ from sqlalchemy import select
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
|
||||
from core.file import file_manager
|
||||
from core.model_manager import ModelInstance
|
||||
from core.model_runtime.entities import (
|
||||
AssistantPromptMessage,
|
||||
@@ -16,6 +15,7 @@ from core.model_runtime.entities import (
|
||||
)
|
||||
from core.model_runtime.entities.message_entities import PromptMessageContentUnionTypes
|
||||
from core.prompt.utils.extract_thread_messages import extract_thread_messages
|
||||
from core.workflow.file import file_manager
|
||||
from extensions.ext_database import db
|
||||
from factories import file_factory
|
||||
from models.model import AppMode, Conversation, Message, MessageFile
|
||||
@@ -39,7 +39,7 @@ class Moderation(Extensible, ABC):
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def validate_config(cls, tenant_id: str, config: dict):
|
||||
def validate_config(cls, tenant_id: str, config: dict) -> None:
|
||||
"""
|
||||
Validate the incoming form config data.
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ from core.ops.aliyun_trace.data_exporter.traceclient import (
|
||||
)
|
||||
from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData, TraceMetadata
|
||||
from core.ops.aliyun_trace.entities.semconv import (
|
||||
DIFY_APP_ID,
|
||||
GEN_AI_COMPLETION,
|
||||
GEN_AI_INPUT_MESSAGE,
|
||||
GEN_AI_OUTPUT_MESSAGE,
|
||||
@@ -99,6 +100,16 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
logger.info("Aliyun get project url failed: %s", str(e), exc_info=True)
|
||||
raise ValueError(f"Aliyun get project url failed: {str(e)}")
|
||||
|
||||
def _extract_app_id(self, trace_info: BaseTraceInfo) -> str:
|
||||
"""Extract app_id from trace_info, trying metadata first then message_data."""
|
||||
app_id = trace_info.metadata.get("app_id")
|
||||
if app_id:
|
||||
return str(app_id)
|
||||
message_data = getattr(trace_info, "message_data", None)
|
||||
if message_data is not None:
|
||||
return str(getattr(message_data, "app_id", ""))
|
||||
return ""
|
||||
|
||||
def workflow_trace(self, trace_info: WorkflowTraceInfo):
|
||||
trace_metadata = TraceMetadata(
|
||||
trace_id=convert_to_trace_id(trace_info.workflow_run_id),
|
||||
@@ -143,13 +154,16 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
name="message",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_str,
|
||||
),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_str,
|
||||
),
|
||||
DIFY_APP_ID: self._extract_app_id(trace_info),
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
span_kind=SpanKind.SERVER,
|
||||
@@ -441,6 +455,8 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
inputs_json = serialize_json_data(trace_info.workflow_run_inputs)
|
||||
outputs_json = serialize_json_data(trace_info.workflow_run_outputs)
|
||||
|
||||
app_id = self._extract_app_id(trace_info)
|
||||
|
||||
if message_span_id:
|
||||
message_span = SpanData(
|
||||
trace_id=trace_metadata.trace_id,
|
||||
@@ -449,13 +465,16 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
name="message",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=trace_info.workflow_run_inputs.get("sys.query") or "",
|
||||
outputs=outputs_json,
|
||||
),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=trace_info.workflow_run_inputs.get("sys.query") or "",
|
||||
outputs=outputs_json,
|
||||
),
|
||||
DIFY_APP_ID: app_id,
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
span_kind=SpanKind.SERVER,
|
||||
@@ -469,13 +488,16 @@ class AliyunDataTrace(BaseTraceInstance):
|
||||
name="workflow",
|
||||
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
|
||||
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
|
||||
attributes=create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_json,
|
||||
),
|
||||
attributes={
|
||||
**create_common_span_attributes(
|
||||
session_id=trace_metadata.session_id,
|
||||
user_id=trace_metadata.user_id,
|
||||
span_kind=GenAISpanKind.CHAIN,
|
||||
inputs=inputs_json,
|
||||
outputs=outputs_json,
|
||||
),
|
||||
**({DIFY_APP_ID: app_id} if message_span_id is None else {}),
|
||||
},
|
||||
status=status,
|
||||
links=trace_metadata.links,
|
||||
span_kind=SpanKind.SERVER if message_span_id is None else SpanKind.INTERNAL,
|
||||
|
||||
@@ -3,6 +3,9 @@ from typing import Final
|
||||
|
||||
ACS_ARMS_SERVICE_FEATURE: Final[str] = "acs.arms.service.feature"
|
||||
|
||||
# Dify-specific attributes
|
||||
DIFY_APP_ID: Final[str] = "dify.app_id"
|
||||
|
||||
# Public attributes
|
||||
GEN_AI_SESSION_ID: Final[str] = "gen_ai.session.id"
|
||||
GEN_AI_USER_ID: Final[str] = "gen_ai.user.id"
|
||||
|
||||
@@ -129,11 +129,11 @@ class LangfuseSpan(BaseModel):
|
||||
default=None,
|
||||
description="The id of the user that triggered the execution. Used to provide user-level analytics.",
|
||||
)
|
||||
start_time: datetime | str | None = Field(
|
||||
start_time: datetime | None = Field(
|
||||
default_factory=datetime.now,
|
||||
description="The time at which the span started, defaults to the current time.",
|
||||
)
|
||||
end_time: datetime | str | None = Field(
|
||||
end_time: datetime | None = Field(
|
||||
default=None,
|
||||
description="The time at which the span ended. Automatically set by span.end().",
|
||||
)
|
||||
@@ -146,7 +146,7 @@ class LangfuseSpan(BaseModel):
|
||||
description="Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated "
|
||||
"via the API.",
|
||||
)
|
||||
level: str | None = Field(
|
||||
level: LevelEnum | None = Field(
|
||||
default=None,
|
||||
description="The level of the span. Can be DEBUG, DEFAULT, WARNING or ERROR. Used for sorting/filtering of "
|
||||
"traces with elevated error levels and for highlighting in the UI.",
|
||||
@@ -222,16 +222,16 @@ class LangfuseGeneration(BaseModel):
|
||||
default=None,
|
||||
description="Identifier of the generation. Useful for sorting/filtering in the UI.",
|
||||
)
|
||||
start_time: datetime | str | None = Field(
|
||||
start_time: datetime | None = Field(
|
||||
default_factory=datetime.now,
|
||||
description="The time at which the generation started, defaults to the current time.",
|
||||
)
|
||||
completion_start_time: datetime | str | None = Field(
|
||||
completion_start_time: datetime | None = Field(
|
||||
default=None,
|
||||
description="The time at which the completion started (streaming). Set it to get latency analytics broken "
|
||||
"down into time until completion started and completion duration.",
|
||||
)
|
||||
end_time: datetime | str | None = Field(
|
||||
end_time: datetime | None = Field(
|
||||
default=None,
|
||||
description="The time at which the generation ended. Automatically set by generation.end().",
|
||||
)
|
||||
|
||||
@@ -18,8 +18,7 @@ except ImportError:
|
||||
from importlib_metadata import version # type: ignore[import-not-found]
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from opentelemetry.metrics import Meter
|
||||
from opentelemetry.metrics._internal.instrument import Histogram
|
||||
from opentelemetry.metrics import Histogram, Meter
|
||||
from opentelemetry.sdk.metrics.export import MetricReader
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
|
||||
@@ -3,6 +3,8 @@ from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from configs import dify_config
|
||||
|
||||
# from core.plugin.entities.plugin import GenericProviderID, ToolProviderID
|
||||
from core.plugin.entities.plugin_daemon import CredentialType, PluginBasicBooleanResponse, PluginToolProviderEntity
|
||||
from core.plugin.impl.base import BasePluginClient
|
||||
@@ -122,7 +124,7 @@ class PluginToolManager(BasePluginClient):
|
||||
},
|
||||
)
|
||||
|
||||
return merge_blob_chunks(response)
|
||||
return merge_blob_chunks(response, max_file_size=dify_config.PLUGIN_MAX_FILE_SIZE)
|
||||
|
||||
def validate_provider_credentials(
|
||||
self, tenant_id: str, user_id: str, provider: str, credentials: dict[str, Any]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from typing import Any
|
||||
|
||||
from core.file.models import File
|
||||
from core.tools.entities.tool_entities import ToolSelector
|
||||
from core.workflow.file.models import File
|
||||
|
||||
|
||||
def convert_parameters_to_plugin_format(parameters: dict[str, Any]) -> dict[str, Any]:
|
||||
|
||||
@@ -2,10 +2,7 @@ from collections.abc import Mapping, Sequence
|
||||
from typing import cast
|
||||
|
||||
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
|
||||
from core.file import file_manager
|
||||
from core.file.models import File
|
||||
from core.helper.code_executor.jinja2.jinja2_formatter import Jinja2Formatter
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_runtime.entities import (
|
||||
AssistantPromptMessage,
|
||||
PromptMessage,
|
||||
@@ -15,9 +12,12 @@ from core.model_runtime.entities import (
|
||||
UserPromptMessage,
|
||||
)
|
||||
from core.model_runtime.entities.message_entities import ImagePromptMessageContent, PromptMessageContentUnionTypes
|
||||
from core.model_runtime.token_buffer_memory import TokenBufferMemory
|
||||
from core.prompt.entities.advanced_prompt_entities import ChatModelMessage, CompletionModelPromptTemplate, MemoryConfig
|
||||
from core.prompt.prompt_transform import PromptTransform
|
||||
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
|
||||
from core.workflow.file import file_manager
|
||||
from core.workflow.file.models import File
|
||||
from core.workflow.runtime import VariablePool
|
||||
|
||||
|
||||
|
||||
@@ -3,13 +3,13 @@ from typing import cast
|
||||
from core.app.entities.app_invoke_entities import (
|
||||
ModelConfigWithCredentialsEntity,
|
||||
)
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_runtime.entities.message_entities import (
|
||||
PromptMessage,
|
||||
SystemPromptMessage,
|
||||
UserPromptMessage,
|
||||
)
|
||||
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
||||
from core.model_runtime.token_buffer_memory import TokenBufferMemory
|
||||
from core.prompt.prompt_transform import PromptTransform
|
||||
|
||||
|
||||
@@ -47,7 +47,9 @@ class AgentHistoryPromptTransform(PromptTransform):
|
||||
model_type_instance = cast(LargeLanguageModel, model_type_instance)
|
||||
|
||||
curr_message_tokens = model_type_instance.get_num_tokens(
|
||||
self.memory.model_instance.model, self.memory.model_instance.credentials, self.history_messages
|
||||
self.model_config.model,
|
||||
self.model_config.credentials,
|
||||
self.history_messages,
|
||||
)
|
||||
if curr_message_tokens <= max_token_limit:
|
||||
return self.history_messages
|
||||
@@ -63,7 +65,9 @@ class AgentHistoryPromptTransform(PromptTransform):
|
||||
# a message is start with UserPromptMessage
|
||||
if isinstance(prompt_message, UserPromptMessage):
|
||||
curr_message_tokens = model_type_instance.get_num_tokens(
|
||||
self.memory.model_instance.model, self.memory.model_instance.credentials, prompt_messages
|
||||
self.model_config.model,
|
||||
self.model_config.credentials,
|
||||
prompt_messages,
|
||||
)
|
||||
# if current message token is overflow, drop all the prompts in current message and break
|
||||
if curr_message_tokens > max_token_limit:
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
from typing import Any
|
||||
|
||||
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_manager import ModelInstance
|
||||
from core.model_runtime.entities.message_entities import PromptMessage
|
||||
from core.model_runtime.entities.model_entities import ModelPropertyKey
|
||||
from core.model_runtime.token_buffer_memory import TokenBufferMemory
|
||||
from core.prompt.entities.advanced_prompt_entities import MemoryConfig
|
||||
|
||||
|
||||
|
||||
@@ -6,8 +6,6 @@ from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from core.app.app_config.entities import PromptTemplateEntity
|
||||
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
|
||||
from core.file import file_manager
|
||||
from core.memory.token_buffer_memory import TokenBufferMemory
|
||||
from core.model_runtime.entities.message_entities import (
|
||||
ImagePromptMessageContent,
|
||||
PromptMessage,
|
||||
@@ -16,13 +14,15 @@ from core.model_runtime.entities.message_entities import (
|
||||
TextPromptMessageContent,
|
||||
UserPromptMessage,
|
||||
)
|
||||
from core.model_runtime.token_buffer_memory import TokenBufferMemory
|
||||
from core.prompt.entities.advanced_prompt_entities import MemoryConfig
|
||||
from core.prompt.prompt_transform import PromptTransform
|
||||
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
|
||||
from core.workflow.file import file_manager
|
||||
from models.model import AppMode
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.file.models import File
|
||||
from core.workflow.file.models import File
|
||||
|
||||
|
||||
class ModelMode(StrEnum):
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
from typing import Any
|
||||
import re
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import BaseModel, model_validator
|
||||
from pyobvector import VECTOR, ObVecClient, l2_distance # type: ignore
|
||||
from pyobvector import VECTOR, ObVecClient, cosine_distance, inner_product, l2_distance # type: ignore
|
||||
from sqlalchemy import JSON, Column, String
|
||||
from sqlalchemy.dialects.mysql import LONGTEXT
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from configs import dify_config
|
||||
from core.rag.datasource.vdb.vector_base import BaseVector
|
||||
@@ -19,10 +20,14 @@ from models.dataset import Dataset
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_OCEANBASE_HNSW_BUILD_PARAM = {"M": 16, "efConstruction": 256}
|
||||
DEFAULT_OCEANBASE_HNSW_SEARCH_PARAM = {"efSearch": 64}
|
||||
OCEANBASE_SUPPORTED_VECTOR_INDEX_TYPE = "HNSW"
|
||||
DEFAULT_OCEANBASE_VECTOR_METRIC_TYPE = "l2"
|
||||
_VALID_TABLE_NAME_RE = re.compile(r"^[a-zA-Z0-9_]+$")
|
||||
|
||||
_DISTANCE_FUNC_MAP = {
|
||||
"l2": l2_distance,
|
||||
"cosine": cosine_distance,
|
||||
"inner_product": inner_product,
|
||||
}
|
||||
|
||||
|
||||
class OceanBaseVectorConfig(BaseModel):
|
||||
@@ -32,6 +37,14 @@ class OceanBaseVectorConfig(BaseModel):
|
||||
password: str
|
||||
database: str
|
||||
enable_hybrid_search: bool = False
|
||||
batch_size: int = 100
|
||||
metric_type: Literal["l2", "cosine", "inner_product"] = "l2"
|
||||
hnsw_m: int = 16
|
||||
hnsw_ef_construction: int = 256
|
||||
hnsw_ef_search: int = -1
|
||||
pool_size: int = 5
|
||||
max_overflow: int = 10
|
||||
hnsw_refresh_threshold: int = 1000
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
@@ -49,14 +62,23 @@ class OceanBaseVectorConfig(BaseModel):
|
||||
|
||||
class OceanBaseVector(BaseVector):
|
||||
def __init__(self, collection_name: str, config: OceanBaseVectorConfig):
|
||||
if not _VALID_TABLE_NAME_RE.match(collection_name):
|
||||
raise ValueError(
|
||||
f"Invalid collection name '{collection_name}': "
|
||||
"only alphanumeric characters and underscores are allowed."
|
||||
)
|
||||
super().__init__(collection_name)
|
||||
self._config = config
|
||||
self._hnsw_ef_search = -1
|
||||
self._hnsw_ef_search = self._config.hnsw_ef_search
|
||||
self._client = ObVecClient(
|
||||
uri=f"{self._config.host}:{self._config.port}",
|
||||
user=self._config.user,
|
||||
password=self._config.password,
|
||||
db_name=self._config.database,
|
||||
pool_size=self._config.pool_size,
|
||||
max_overflow=self._config.max_overflow,
|
||||
pool_recycle=3600,
|
||||
pool_pre_ping=True,
|
||||
)
|
||||
self._fields: list[str] = [] # List of fields in the collection
|
||||
if self._client.check_table_exists(collection_name):
|
||||
@@ -136,8 +158,8 @@ class OceanBaseVector(BaseVector):
|
||||
field_name="vector",
|
||||
index_type=OCEANBASE_SUPPORTED_VECTOR_INDEX_TYPE,
|
||||
index_name="vector_index",
|
||||
metric_type=DEFAULT_OCEANBASE_VECTOR_METRIC_TYPE,
|
||||
params=DEFAULT_OCEANBASE_HNSW_BUILD_PARAM,
|
||||
metric_type=self._config.metric_type,
|
||||
params={"M": self._config.hnsw_m, "efConstruction": self._config.hnsw_ef_construction},
|
||||
)
|
||||
|
||||
self._client.create_table_with_index_params(
|
||||
@@ -178,6 +200,17 @@ class OceanBaseVector(BaseVector):
|
||||
else:
|
||||
logger.debug("DEBUG: Hybrid search is NOT enabled for '%s'", self._collection_name)
|
||||
|
||||
try:
|
||||
self._client.perform_raw_text_sql(
|
||||
f"CREATE INDEX IF NOT EXISTS idx_metadata_doc_id ON `{self._collection_name}` "
|
||||
f"((CAST(metadata->>'$.document_id' AS CHAR(64))))"
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
logger.warning(
|
||||
"Failed to create metadata functional index on '%s'; metadata queries may be slow without it.",
|
||||
self._collection_name,
|
||||
)
|
||||
|
||||
self._client.refresh_metadata([self._collection_name])
|
||||
self._load_collection_fields()
|
||||
redis_client.set(collection_exist_cache_key, 1, ex=3600)
|
||||
@@ -205,24 +238,49 @@ class OceanBaseVector(BaseVector):
|
||||
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
ids = self._get_uuids(documents)
|
||||
for id, doc, emb in zip(ids, documents, embeddings):
|
||||
batch_size = self._config.batch_size
|
||||
total = len(documents)
|
||||
|
||||
all_data = [
|
||||
{
|
||||
"id": doc_id,
|
||||
"vector": emb,
|
||||
"text": doc.page_content,
|
||||
"metadata": doc.metadata,
|
||||
}
|
||||
for doc_id, doc, emb in zip(ids, documents, embeddings)
|
||||
]
|
||||
|
||||
for start in range(0, total, batch_size):
|
||||
batch = all_data[start : start + batch_size]
|
||||
try:
|
||||
self._client.insert(
|
||||
table_name=self._collection_name,
|
||||
data={
|
||||
"id": id,
|
||||
"vector": emb,
|
||||
"text": doc.page_content,
|
||||
"metadata": doc.metadata,
|
||||
},
|
||||
data=batch,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to insert document with id '%s' in collection '%s'",
|
||||
id,
|
||||
"Failed to insert batch [%d:%d] into collection '%s'",
|
||||
start,
|
||||
start + len(batch),
|
||||
self._collection_name,
|
||||
)
|
||||
raise Exception(
|
||||
f"Failed to insert batch [{start}:{start + len(batch)}] into collection '{self._collection_name}'"
|
||||
) from e
|
||||
|
||||
if self._config.hnsw_refresh_threshold > 0 and total >= self._config.hnsw_refresh_threshold:
|
||||
try:
|
||||
self._client.refresh_index(
|
||||
table_name=self._collection_name,
|
||||
index_name="vector_index",
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
logger.warning(
|
||||
"Failed to refresh HNSW index after inserting %d documents into '%s'",
|
||||
total,
|
||||
self._collection_name,
|
||||
)
|
||||
raise Exception(f"Failed to insert document with id '{id}'") from e
|
||||
|
||||
def text_exists(self, id: str) -> bool:
|
||||
try:
|
||||
@@ -412,7 +470,7 @@ class OceanBaseVector(BaseVector):
|
||||
vec_column_name="vector",
|
||||
vec_data=query_vector,
|
||||
topk=topk,
|
||||
distance_func=l2_distance,
|
||||
distance_func=self._get_distance_func(),
|
||||
output_column_names=["text", "metadata"],
|
||||
with_dist=True,
|
||||
where_clause=_where_clause,
|
||||
@@ -424,14 +482,31 @@ class OceanBaseVector(BaseVector):
|
||||
)
|
||||
raise Exception(f"Vector search failed for collection '{self._collection_name}'") from e
|
||||
|
||||
# Convert distance to score and prepare results for processing
|
||||
results = []
|
||||
for _text, metadata_str, distance in cur:
|
||||
score = 1 - distance / math.sqrt(2)
|
||||
score = self._distance_to_score(distance)
|
||||
results.append((_text, metadata_str, score))
|
||||
|
||||
return self._process_search_results(results, score_threshold=score_threshold)
|
||||
|
||||
def _get_distance_func(self):
|
||||
func = _DISTANCE_FUNC_MAP.get(self._config.metric_type)
|
||||
if func is None:
|
||||
raise ValueError(
|
||||
f"Unsupported metric_type '{self._config.metric_type}'. Supported: {', '.join(_DISTANCE_FUNC_MAP)}"
|
||||
)
|
||||
return func
|
||||
|
||||
def _distance_to_score(self, distance: float) -> float:
|
||||
metric = self._config.metric_type
|
||||
if metric == "l2":
|
||||
return 1.0 / (1.0 + distance)
|
||||
elif metric == "cosine":
|
||||
return 1.0 - distance
|
||||
elif metric == "inner_product":
|
||||
return -distance
|
||||
raise ValueError(f"Unsupported metric_type '{metric}'")
|
||||
|
||||
def delete(self):
|
||||
try:
|
||||
self._client.drop_table_if_exist(self._collection_name)
|
||||
@@ -464,5 +539,13 @@ class OceanBaseVectorFactory(AbstractVectorFactory):
|
||||
password=(dify_config.OCEANBASE_VECTOR_PASSWORD or ""),
|
||||
database=dify_config.OCEANBASE_VECTOR_DATABASE or "",
|
||||
enable_hybrid_search=dify_config.OCEANBASE_ENABLE_HYBRID_SEARCH or False,
|
||||
batch_size=dify_config.OCEANBASE_VECTOR_BATCH_SIZE,
|
||||
metric_type=dify_config.OCEANBASE_VECTOR_METRIC_TYPE,
|
||||
hnsw_m=dify_config.OCEANBASE_HNSW_M,
|
||||
hnsw_ef_construction=dify_config.OCEANBASE_HNSW_EF_CONSTRUCTION,
|
||||
hnsw_ef_search=dify_config.OCEANBASE_HNSW_EF_SEARCH,
|
||||
pool_size=dify_config.OCEANBASE_VECTOR_POOL_SIZE,
|
||||
max_overflow=dify_config.OCEANBASE_VECTOR_MAX_OVERFLOW,
|
||||
hnsw_refresh_threshold=dify_config.OCEANBASE_HNSW_REFRESH_THRESHOLD,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -15,11 +15,11 @@ class BaseVector(ABC):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs) -> list[str] | None:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
|
||||
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs) -> list[str]:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
@@ -27,14 +27,14 @@ class BaseVector(ABC):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def delete_by_ids(self, ids: list[str]):
|
||||
def delete_by_ids(self, ids: list[str]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_ids_by_metadata_field(self, key: str, value: str):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def delete_by_metadata_field(self, key: str, value: str):
|
||||
def delete_by_metadata_field(self, key: str, value: str) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
@@ -46,7 +46,7 @@ class BaseVector(ABC):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def delete(self):
|
||||
def delete(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def _filter_duplicate_texts(self, texts: list[Document]) -> list[Document]:
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user