fix: add tenant/dataset ownership checks to prevent IDOR vulnerabilities (#34436)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Sedo
2026-04-02 13:45:20 +08:00
committed by GitHub
parent 8f9dbf269e
commit 43c48ba4d7
7 changed files with 18 additions and 12 deletions

View File

@@ -173,8 +173,11 @@ class ExternalApiTemplateApi(Resource):
@login_required
@account_initialization_required
def get(self, external_knowledge_api_id):
_, current_tenant_id = current_account_with_tenant()
external_knowledge_api_id = str(external_knowledge_api_id)
external_knowledge_api = ExternalDatasetService.get_external_knowledge_api(external_knowledge_api_id)
external_knowledge_api = ExternalDatasetService.get_external_knowledge_api(
external_knowledge_api_id, current_tenant_id
)
if external_knowledge_api is None:
raise NotFound("API template not found.")

View File

@@ -274,7 +274,9 @@ class DatasetService:
db.session.flush()
if provider == "external" and external_knowledge_api_id:
external_knowledge_api = ExternalDatasetService.get_external_knowledge_api(external_knowledge_api_id)
external_knowledge_api = ExternalDatasetService.get_external_knowledge_api(
external_knowledge_api_id, tenant_id
)
if not external_knowledge_api:
raise ValueError("External API template not found.")
if external_knowledge_id is None:

View File

@@ -102,9 +102,9 @@ class ExternalDatasetService:
raise ValueError(f"Forbidden: Authorization failed with api_key: {api_key}")
@staticmethod
def get_external_knowledge_api(external_knowledge_api_id: str) -> ExternalKnowledgeApis:
def get_external_knowledge_api(external_knowledge_api_id: str, tenant_id: str) -> ExternalKnowledgeApis:
external_knowledge_api: ExternalKnowledgeApis | None = (
db.session.query(ExternalKnowledgeApis).filter_by(id=external_knowledge_api_id).first()
db.session.query(ExternalKnowledgeApis).filter_by(id=external_knowledge_api_id, tenant_id=tenant_id).first()
)
if external_knowledge_api is None:
raise ValueError("api template not found")

View File

@@ -65,7 +65,7 @@ class MetadataService:
raise ValueError("Metadata name already exists in Built-in fields.")
try:
MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)
metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id).first()
metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id, dataset_id=dataset_id).first()
if metadata is None:
raise ValueError("Metadata not found.")
old_name = metadata.name
@@ -101,7 +101,7 @@ class MetadataService:
lock_key = f"dataset_metadata_lock_{dataset_id}"
try:
MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)
metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id).first()
metadata = db.session.query(DatasetMetadata).filter_by(id=metadata_id, dataset_id=dataset_id).first()
if metadata is None:
raise ValueError("Metadata not found.")
db.session.delete(metadata)

View File

@@ -411,7 +411,7 @@ class BuiltinToolManageService:
"""
with Session(db.engine) as session:
# get provider
target_provider = session.query(BuiltinToolProvider).filter_by(id=id).first()
target_provider = session.query(BuiltinToolProvider).filter_by(id=id, tenant_id=tenant_id).first()
if target_provider is None:
raise ValueError("provider not found")

View File

@@ -294,7 +294,7 @@ class TestExternalDatasetServiceCrudExternalKnowledgeApi:
api = Mock(spec=ExternalKnowledgeApis)
mock_db_session.query.return_value.filter_by.return_value.first.return_value = api
result = ExternalDatasetService.get_external_knowledge_api("api-id")
result = ExternalDatasetService.get_external_knowledge_api("api-id", "tenant-id")
assert result is api
def test_get_external_knowledge_api_not_found_raises(self, mock_db_session: MagicMock):
@@ -305,7 +305,7 @@ class TestExternalDatasetServiceCrudExternalKnowledgeApi:
mock_db_session.query.return_value.filter_by.return_value.first.return_value = None
with pytest.raises(ValueError, match="api template not found"):
ExternalDatasetService.get_external_knowledge_api("missing-id")
ExternalDatasetService.get_external_knowledge_api("missing-id", "tenant-id")
def test_update_external_knowledge_api_success_with_hidden_api_key(self, mock_db_session: MagicMock):
"""

View File

@@ -805,11 +805,12 @@ class TestExternalDatasetServiceGetAPI:
mock_query.first.return_value = expected_api
# Act
result = ExternalDatasetService.get_external_knowledge_api(api_id)
tenant_id = "tenant-123"
result = ExternalDatasetService.get_external_knowledge_api(api_id, tenant_id)
# Assert
assert result.id == api_id
mock_query.filter_by.assert_called_once_with(id=api_id)
mock_query.filter_by.assert_called_once_with(id=api_id, tenant_id=tenant_id)
@patch("services.external_knowledge_service.db")
def test_get_external_knowledge_api_not_found(self, mock_db, factory):
@@ -822,7 +823,7 @@ class TestExternalDatasetServiceGetAPI:
# Act & Assert
with pytest.raises(ValueError, match="api template not found"):
ExternalDatasetService.get_external_knowledge_api("nonexistent-id")
ExternalDatasetService.get_external_knowledge_api("nonexistent-id", "tenant-123")
class TestExternalDatasetServiceUpdateAPI: