mirror of
https://github.com/langgenius/dify.git
synced 2026-04-05 16:26:25 +08:00
Signed-off-by: -LAN- <laipz8200@outlook.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: WH-2099 <wh2099@pm.me>
92 lines
3.3 KiB
Python
92 lines
3.3 KiB
Python
"""Remote file metadata helpers used by workflow file normalization.
|
|
|
|
These helpers are part of the ``factories.file_factory`` package surface
|
|
because both workflow builders and tests rely on the same RFC5987 filename
|
|
parsing and HEAD-response normalization rules.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import urllib.parse
|
|
import uuid
|
|
|
|
import httpx
|
|
from werkzeug.http import parse_options_header
|
|
|
|
from core.helper import ssrf_proxy
|
|
|
|
|
|
def extract_filename(url_path: str, content_disposition: str | None) -> str | None:
|
|
"""Extract a safe filename from Content-Disposition or the request URL path."""
|
|
filename: str | None = None
|
|
if content_disposition:
|
|
filename_star_match = re.search(r"filename\*=([^;]+)", content_disposition)
|
|
if filename_star_match:
|
|
raw_star = filename_star_match.group(1).strip()
|
|
raw_star = raw_star.removesuffix('"')
|
|
try:
|
|
parts = raw_star.split("'", 2)
|
|
charset = (parts[0] or "utf-8").lower() if len(parts) >= 1 else "utf-8"
|
|
value = parts[2] if len(parts) == 3 else parts[-1]
|
|
filename = urllib.parse.unquote(value, encoding=charset, errors="replace")
|
|
except Exception:
|
|
if "''" in raw_star:
|
|
filename = urllib.parse.unquote(raw_star.split("''")[-1])
|
|
else:
|
|
filename = urllib.parse.unquote(raw_star)
|
|
|
|
if not filename:
|
|
_, params = parse_options_header(content_disposition)
|
|
raw = params.get("filename")
|
|
if raw:
|
|
if len(raw) >= 2 and raw[0] == raw[-1] == '"':
|
|
raw = raw[1:-1]
|
|
filename = urllib.parse.unquote(raw)
|
|
|
|
if not filename:
|
|
candidate = os.path.basename(url_path)
|
|
filename = urllib.parse.unquote(candidate) if candidate else None
|
|
|
|
if filename:
|
|
filename = os.path.basename(filename)
|
|
if not filename or not filename.strip():
|
|
filename = None
|
|
|
|
return filename or None
|
|
|
|
|
|
def _guess_mime_type(filename: str) -> str:
|
|
guessed_mime, _ = mimetypes.guess_type(filename)
|
|
return guessed_mime or ""
|
|
|
|
|
|
def get_remote_file_info(url: str) -> tuple[str, str, int]:
|
|
"""Resolve remote file metadata with SSRF-safe HEAD probing."""
|
|
file_size = -1
|
|
parsed_url = urllib.parse.urlparse(url)
|
|
url_path = parsed_url.path
|
|
filename = os.path.basename(url_path)
|
|
mime_type = _guess_mime_type(filename)
|
|
|
|
resp = ssrf_proxy.head(url, follow_redirects=True)
|
|
if resp.status_code == httpx.codes.OK:
|
|
content_disposition = resp.headers.get("Content-Disposition")
|
|
extracted_filename = extract_filename(url_path, content_disposition)
|
|
if extracted_filename:
|
|
filename = extracted_filename
|
|
mime_type = _guess_mime_type(filename)
|
|
file_size = int(resp.headers.get("Content-Length", file_size))
|
|
if not mime_type:
|
|
mime_type = resp.headers.get("Content-Type", "").split(";")[0].strip()
|
|
|
|
if not filename:
|
|
extension = mimetypes.guess_extension(mime_type) or ".bin"
|
|
filename = f"{uuid.uuid4().hex}{extension}"
|
|
if not mime_type:
|
|
mime_type = _guess_mime_type(filename)
|
|
|
|
return mime_type, filename, file_size
|