feat: add cfspider library source code

This commit is contained in:
test01
2026-01-06 11:14:04 +08:00
parent 17b26c4fb4
commit 26c5c84caf
15 changed files with 4919 additions and 3 deletions

3
.gitignore vendored
View File

@@ -2,9 +2,6 @@ pages/
node_modules/ node_modules/
wrangler.toml wrangler.toml
.wrangler/ .wrangler/
cfspider/
setup.py
pyproject.toml
dist/ dist/
build/ build/
*.egg-info/ *.egg-info/

230
cfspider/__init__.py Normal file
View File

@@ -0,0 +1,230 @@
"""
CFspider - Cloudflare 代理 IP 池 Python 库
一个基于 Cloudflare Workers 的代理 IP 池库,提供:
- 同步/异步 HTTP 请求(兼容 requests/httpx
- TLS 指纹模拟(基于 curl_cffi支持 25+ 浏览器指纹)
- 隐身模式(自动添加完整浏览器请求头,避免反爬检测)
- 浏览器自动化(基于 Playwright支持 VLESS 代理)
- IP 地图可视化(生成 Cyberpunk 风格的地图)
- 网页镜像(保存网页到本地,自动重写资源链接)
快速开始:
>>> import cfspider
>>>
>>> # 基本 GET 请求(无代理)
>>> response = cfspider.get("https://httpbin.org/ip")
>>> print(response.json())
>>>
>>> # 使用 Cloudflare Workers 代理
>>> response = cfspider.get(
... "https://httpbin.org/ip",
... cf_proxies="https://your-workers.dev"
... )
>>> print(response.cf_colo) # Cloudflare 节点代码
>>>
>>> # 启用隐身模式(自动添加 15+ 浏览器请求头)
>>> response = cfspider.get(
... "https://example.com",
... stealth=True,
... stealth_browser='chrome'
... )
>>>
>>> # TLS 指纹模拟
>>> response = cfspider.get(
... "https://example.com",
... impersonate="chrome131"
... )
版本信息:
- 版本号: 1.7.0
- 协议: Apache License 2.0
- 文档: https://spider.violetteam.cloud
依赖关系:
必需requests
可选:
- httpx[http2]: HTTP/2 和异步请求支持
- curl_cffi: TLS 指纹模拟
- playwright: 浏览器自动化
- beautifulsoup4: 网页镜像
"""
from .api import (
get, post, put, delete, head, options, patch, request,
clear_map_records, get_map_collector
)
from .session import Session
from .cli import install_browser
# IP 地图可视化
from .ip_map import (
IPMapCollector, generate_map_html, add_ip_record,
get_collector as get_ip_collector, clear_records as clear_ip_records,
COLO_COORDINATES
)
# 网页镜像
from .mirror import mirror, MirrorResult, WebMirror
# 异步 API基于 httpx
from .async_api import (
aget, apost, aput, adelete, ahead, aoptions, apatch,
arequest, astream,
AsyncCFSpiderResponse, AsyncStreamResponse
)
from .async_session import AsyncSession
# TLS 指纹模拟 API基于 curl_cffi
from .impersonate import (
impersonate_get, impersonate_post, impersonate_put,
impersonate_delete, impersonate_head, impersonate_options,
impersonate_patch, impersonate_request,
ImpersonateSession, ImpersonateResponse,
get_supported_browsers, SUPPORTED_BROWSERS
)
# 隐身模式(反爬虫规避)
from .stealth import (
StealthSession,
get_stealth_headers, get_random_browser_headers,
random_delay, get_referer, update_sec_fetch_headers,
BROWSER_PROFILES, SUPPORTED_BROWSERS as STEALTH_BROWSERS,
CHROME_HEADERS, FIREFOX_HEADERS, SAFARI_HEADERS, EDGE_HEADERS, CHROME_MOBILE_HEADERS
)
# 延迟导入 Browser避免强制依赖 playwright
def Browser(cf_proxies=None, headless=True, timeout=30, vless_uuid=None):
"""
创建浏览器实例
Args:
cf_proxies: 代理地址,支持以下格式:
- VLESS 链接: "vless://uuid@host:port?path=/xxx#name"(推荐)
- HTTP 代理: "http://ip:port""ip:port"
- SOCKS5 代理: "socks5://ip:port"
- edgetunnel 域名: "v2.example.com"(需配合 vless_uuid
如不指定,则直接使用本地网络
headless: 是否无头模式,默认 True
timeout: 请求超时时间(秒),默认 30
vless_uuid: VLESS UUID仅当使用域名方式时需要指定
如果使用完整 VLESS 链接,则无需此参数
Returns:
Browser: 浏览器实例
Example:
>>> import cfspider
>>> # 使用完整 VLESS 链接(推荐,无需 vless_uuid
>>> browser = cfspider.Browser(
... cf_proxies="vless://uuid@v2.example.com:443?path=/"
... )
>>> html = browser.html("https://example.com")
>>> browser.close()
>>>
>>> # 使用域名 + UUID旧方式
>>> browser = cfspider.Browser(
... cf_proxies="v2.example.com",
... vless_uuid="your-vless-uuid"
... )
>>>
>>> # 直接使用(无代理)
>>> browser = cfspider.Browser()
"""
from .browser import Browser as _Browser
return _Browser(cf_proxies, headless, timeout, vless_uuid)
def parse_vless_link(vless_link):
"""
解析 VLESS 链接
Args:
vless_link: VLESS 链接字符串,如 "vless://uuid@host:port?path=/xxx#name"
Returns:
dict: 包含 uuid, host, port, path 的字典,解析失败返回 None
Example:
>>> import cfspider
>>> info = cfspider.parse_vless_link("vless://abc123@v2.example.com:443?path=/ws#proxy")
>>> print(info)
{'uuid': 'abc123', 'host': 'v2.example.com', 'port': 443, 'path': '/ws'}
"""
from .browser import parse_vless_link as _parse
return _parse(vless_link)
class CFSpiderError(Exception):
"""
CFspider 基础异常类
所有 CFspider 相关的异常都继承自此类。
Example:
>>> try:
... response = cfspider.get("https://invalid-url")
... except cfspider.CFSpiderError as e:
... print(f"请求失败: {e}")
"""
pass
class BrowserNotInstalledError(CFSpiderError):
"""
浏览器未安装错误
当尝试使用浏览器模式但 Chromium 未安装时抛出。
解决方案:
>>> import cfspider
>>> cfspider.install_browser() # 自动安装 Chromium
或使用命令行:
$ cfspider install
"""
pass
class PlaywrightNotInstalledError(CFSpiderError):
"""
Playwright 未安装错误
当尝试使用浏览器模式但 Playwright 库未安装时抛出。
解决方案:
$ pip install playwright
"""
pass
__version__ = "1.7.3"
__all__ = [
# 同步 API (requests)
"get", "post", "put", "delete", "head", "options", "patch", "request",
"Session", "Browser", "install_browser", "parse_vless_link",
"CFSpiderError", "BrowserNotInstalledError", "PlaywrightNotInstalledError",
# 异步 API (httpx)
"aget", "apost", "aput", "adelete", "ahead", "aoptions", "apatch",
"arequest", "astream",
"AsyncSession", "AsyncCFSpiderResponse", "AsyncStreamResponse",
# TLS 指纹模拟 API (curl_cffi)
"impersonate_get", "impersonate_post", "impersonate_put",
"impersonate_delete", "impersonate_head", "impersonate_options",
"impersonate_patch", "impersonate_request",
"ImpersonateSession", "ImpersonateResponse",
"get_supported_browsers", "SUPPORTED_BROWSERS",
# 隐身模式(反爬虫规避)
"StealthSession",
"get_stealth_headers", "get_random_browser_headers",
"random_delay", "get_referer", "update_sec_fetch_headers",
"BROWSER_PROFILES", "STEALTH_BROWSERS",
"CHROME_HEADERS", "FIREFOX_HEADERS", "SAFARI_HEADERS", "EDGE_HEADERS", "CHROME_MOBILE_HEADERS",
# IP 地图可视化
"IPMapCollector", "generate_map_html", "add_ip_record",
"get_ip_collector", "clear_ip_records", "COLO_COORDINATES",
"clear_map_records", "get_map_collector",
# 网页镜像
"mirror", "MirrorResult", "WebMirror"
]

655
cfspider/api.py Normal file
View File

@@ -0,0 +1,655 @@
"""
CFspider 核心 API 模块
提供同步 HTTP 请求功能,支持:
- 通过 Cloudflare Workers 代理请求
- TLS 指纹模拟 (curl_cffi)
- HTTP/2 支持 (httpx)
- 隐身模式(完整浏览器请求头)
- IP 地图可视化
"""
import requests
import time
from urllib.parse import urlencode, quote
from typing import Optional, Any
# 延迟导入 IP 地图模块
from . import ip_map
# 延迟导入 httpx仅在需要 HTTP/2 时使用
_httpx = None
def _get_httpx():
"""延迟加载 httpx 模块"""
global _httpx
if _httpx is None:
try:
import httpx
_httpx = httpx
except ImportError:
raise ImportError(
"httpx is required for HTTP/2 support. "
"Install it with: pip install httpx[http2]"
)
return _httpx
# 延迟导入 curl_cffi仅在需要 TLS 指纹时使用
_curl_cffi = None
def _get_curl_cffi():
"""延迟加载 curl_cffi 模块"""
global _curl_cffi
if _curl_cffi is None:
try:
from curl_cffi import requests as curl_requests
_curl_cffi = curl_requests
except ImportError:
raise ImportError(
"curl_cffi is required for TLS fingerprint impersonation. "
"Install it with: pip install curl_cffi"
)
return _curl_cffi
class CFSpiderResponse:
"""
CFspider 响应对象
封装 HTTP 响应,提供与 requests.Response 兼容的接口,
并额外提供 Cloudflare 特有的信息如节点代码、Ray ID
Attributes:
cf_colo (str): Cloudflare 数据中心代码(如 NRT=东京, SIN=新加坡, LAX=洛杉矶)
使用 Workers 代理时可用,表示请求经过的 CF 节点
cf_ray (str): Cloudflare Ray ID每个请求的唯一标识符
可用于调试和追踪请求
text (str): 响应文本内容(自动解码)
content (bytes): 响应原始字节内容
status_code (int): HTTP 状态码(如 200, 404, 500
headers (dict): 响应头字典
cookies: 响应 Cookie
url (str): 最终请求的 URL跟随重定向后
encoding (str): 响应编码
Methods:
json(**kwargs): 将响应解析为 JSON
raise_for_status(): 当状态码非 2xx 时抛出 HTTPError
Example:
>>> response = cfspider.get("https://httpbin.org/ip", cf_proxies="...")
>>> print(response.status_code) # 200
>>> print(response.cf_colo) # NRT (东京节点)
>>> print(response.cf_ray) # 8a1b2c3d4e5f-NRT
>>> data = response.json()
>>> print(data['origin']) # Cloudflare IP
"""
def __init__(self, response, cf_colo=None, cf_ray=None):
"""
初始化响应对象
Args:
response: 原始 requests/httpx/curl_cffi 响应对象
cf_colo: Cloudflare 数据中心代码(从响应头获取)
cf_ray: Cloudflare Ray ID从响应头获取
"""
self._response = response
self.cf_colo = cf_colo
self.cf_ray = cf_ray
@property
def text(self) -> str:
"""响应文本内容(自动解码)"""
return self._response.text
@property
def content(self) -> bytes:
"""响应原始字节内容"""
return self._response.content
@property
def status_code(self) -> int:
"""HTTP 状态码"""
return self._response.status_code
@property
def headers(self):
"""响应头字典"""
return self._response.headers
@property
def cookies(self):
"""响应 Cookie"""
return self._response.cookies
@property
def url(self) -> str:
"""最终请求的 URL跟随重定向后"""
return self._response.url
@property
def encoding(self) -> Optional[str]:
"""响应编码"""
return self._response.encoding
@encoding.setter
def encoding(self, value: str):
"""设置响应编码"""
self._response.encoding = value
def json(self, **kwargs) -> Any:
"""
将响应解析为 JSON
Args:
**kwargs: 传递给 json.loads() 的参数
Returns:
解析后的 JSON 数据dict 或 list
Raises:
JSONDecodeError: 当响应不是有效的 JSON 时
"""
return self._response.json(**kwargs)
def raise_for_status(self):
"""
当状态码非 2xx 时抛出 HTTPError
Raises:
requests.HTTPError: 当状态码表示错误时
"""
self._response.raise_for_status()
def request(method, url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""
发送 HTTP 请求
这是 CFspider 的核心函数,支持多种代理模式和反爬虫功能。
Args:
method (str): HTTP 方法GET, POST, PUT, DELETE, HEAD, OPTIONS, PATCH
url (str): 目标 URL必须包含协议https://
cf_proxies (str, optional): 代理地址,根据 cf_workers 参数有不同含义:
- 当 cf_workers=True 时:填写 CFspider Workers 地址(如 "https://your-workers.dev"
- 当 cf_workers=False 时:填写普通 HTTP/SOCKS5 代理(如 "http://127.0.0.1:8080"
- 不填写时:直接请求目标 URL不使用代理
cf_workers (bool): 是否使用 CFspider Workers API默认 True
- True: cf_proxies 是 Workers 地址,请求通过 Workers API 转发
- False: cf_proxies 是普通代理,使用 requests/httpx 的 proxies 参数
http2 (bool): 是否启用 HTTP/2 协议(默认 False
- True: 使用 httpx 客户端,支持 HTTP/2
- False: 使用 requests 库(默认行为)
- 注意http2 和 impersonate 不能同时使用
impersonate (str, optional): TLS 指纹模拟,模拟真实浏览器的 TLS 握手特征
- 可选值chrome131, chrome124, safari18_0, firefox133, edge101 等
- 设置后自动使用 curl_cffi 发送请求
- 完整列表cfspider.get_supported_browsers()
map_output (bool): 是否生成 IP 地图 HTML 文件(默认 False
- True: 请求完成后生成包含代理 IP 信息的交互式地图
map_file (str): 地图输出文件名(默认 "cfspider_map.html"
stealth (bool): 是否启用隐身模式(默认 False
- True: 自动添加 15+ 个完整浏览器请求头,模拟真实浏览器访问
- 添加的请求头包括User-Agent, Accept, Accept-Language, Sec-Fetch-*, Sec-CH-UA 等
stealth_browser (str): 隐身模式使用的浏览器类型(默认 'chrome'
- 可选值chrome, firefox, safari, edge, chrome_mobile
delay (tuple, optional): 请求前的随机延迟范围(秒)
- 如 (1, 3) 表示请求前随机等待 1-3 秒
- 用于模拟人类行为,避免被反爬系统检测
**kwargs: 其他参数,与 requests 库完全兼容
- params (dict): URL 查询参数
- headers (dict): 自定义请求头(会与隐身模式头合并)
- data (dict/str): 表单数据
- json (dict): JSON 数据(自动设置 Content-Type
- cookies (dict): Cookie
- timeout (int/float): 超时时间(秒),默认 30
- allow_redirects (bool): 是否跟随重定向,默认 True
- verify (bool): 是否验证 SSL 证书,默认 True
Returns:
CFSpiderResponse: 响应对象,包含以下属性:
- text: 响应文本
- content: 响应字节
- json(): 解析 JSON
- status_code: HTTP 状态码
- headers: 响应头
- cf_colo: Cloudflare 节点代码(使用 Workers 时可用)
- cf_ray: Cloudflare Ray ID
Raises:
ImportError: 当需要的可选依赖未安装时
- http2=True 需要 httpx[http2]
- impersonate 需要 curl_cffi
ValueError: 当 http2 和 impersonate 同时启用时
requests.RequestException: 网络请求失败时
Examples:
>>> import cfspider
>>>
>>> # 基本 GET 请求
>>> response = cfspider.get("https://httpbin.org/ip")
>>> print(response.json())
>>>
>>> # 使用 Workers 代理
>>> response = cfspider.get(
... "https://httpbin.org/ip",
... cf_proxies="https://your-workers.dev"
... )
>>> print(response.cf_colo) # NRT, SIN, LAX 等
>>>
>>> # 隐身模式 + TLS 指纹
>>> response = cfspider.get(
... "https://example.com",
... stealth=True,
... impersonate="chrome131"
... )
Notes:
- http2 和 impersonate 使用不同的后端httpx/curl_cffi不能同时启用
- 隐身模式的请求头优先级:用户自定义 > stealth 默认头
- 使用 Workers 代理时,自定义请求头通过 X-CFSpider-Header-* 传递
"""
# 应用随机延迟
if delay:
from .stealth import random_delay
random_delay(delay[0], delay[1])
params = kwargs.pop("params", None)
headers = kwargs.pop("headers", {})
# 如果启用隐身模式,添加完整的浏览器请求头
if stealth:
from .stealth import get_stealth_headers
stealth_headers = get_stealth_headers(stealth_browser)
# 用户自定义的 headers 优先级更高
final_headers = stealth_headers.copy()
final_headers.update(headers)
headers = final_headers
data = kwargs.pop("data", None)
json_data = kwargs.pop("json", None)
cookies = kwargs.pop("cookies", None)
timeout = kwargs.pop("timeout", 30)
# 记录请求开始时间
start_time = time.time()
# 如果指定了 impersonate使用 curl_cffi
if impersonate:
response = _request_impersonate(
method, url, cf_proxies, cf_workers, impersonate,
params=params, headers=headers, data=data,
json_data=json_data, cookies=cookies, timeout=timeout,
**kwargs
)
_handle_map_output(response, url, start_time, map_output, map_file)
return response
# 如果启用 HTTP/2使用 httpx
if http2:
response = _request_httpx(
method, url, cf_proxies, cf_workers,
params=params, headers=headers, data=data,
json_data=json_data, cookies=cookies, timeout=timeout,
**kwargs
)
_handle_map_output(response, url, start_time, map_output, map_file)
return response
# 如果没有指定 cf_proxies直接使用 requests
if not cf_proxies:
resp = requests.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
timeout=timeout,
**kwargs
)
response = CFSpiderResponse(resp)
_handle_map_output(response, url, start_time, map_output, map_file)
return response
# cf_workers=False使用普通代理
if not cf_workers:
# 处理代理格式
proxy_url = cf_proxies
if not proxy_url.startswith(('http://', 'https://', 'socks5://')):
proxy_url = f"http://{proxy_url}"
proxies = {
"http": proxy_url,
"https": proxy_url
}
resp = requests.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
timeout=timeout,
proxies=proxies,
**kwargs
)
response = CFSpiderResponse(resp)
_handle_map_output(response, url, start_time, map_output, map_file)
return response
# cf_workers=True使用 CFspider Workers API 代理
cf_proxies_url = cf_proxies.rstrip("/")
# 确保有协议前缀
if not cf_proxies_url.startswith(('http://', 'https://')):
cf_proxies_url = f"https://{cf_proxies_url}"
target_url = url
if params:
target_url = f"{url}?{urlencode(params)}"
proxy_url = f"{cf_proxies_url}/proxy?url={quote(target_url, safe='')}&method={method.upper()}"
request_headers = {}
if headers:
for key, value in headers.items():
request_headers[f"X-CFSpider-Header-{key}"] = value
if cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
request_headers["X-CFSpider-Header-Cookie"] = cookie_str
resp = requests.post(
proxy_url,
headers=request_headers,
data=data,
json=json_data,
timeout=timeout,
**kwargs
)
cf_colo = resp.headers.get("X-CF-Colo")
cf_ray = resp.headers.get("CF-Ray")
response = CFSpiderResponse(resp, cf_colo=cf_colo, cf_ray=cf_ray)
_handle_map_output(response, url, start_time, map_output, map_file)
return response
def _handle_map_output(response, url, start_time, map_output, map_file):
"""处理 IP 地图输出"""
if not map_output:
return
# 计算响应时间
response_time = (time.time() - start_time) * 1000 # 毫秒
# 收集 IP 记录
ip_map.add_ip_record(
url=url,
ip=None, # 无法直接获取 IP但有 cf_colo
cf_colo=getattr(response, 'cf_colo', None),
cf_ray=getattr(response, 'cf_ray', None),
status_code=response.status_code,
response_time=response_time
)
# 生成地图 HTML
ip_map.generate_map_html(output_file=map_file)
def _request_impersonate(method, url, cf_proxies, cf_workers, impersonate,
params=None, headers=None, data=None, json_data=None,
cookies=None, timeout=30, **kwargs):
"""使用 curl_cffi 发送请求(支持 TLS 指纹模拟)"""
curl_requests = _get_curl_cffi()
# 如果没有指定 cf_proxies直接请求
if not cf_proxies:
response = curl_requests.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
timeout=timeout,
impersonate=impersonate,
**kwargs
)
return CFSpiderResponse(response)
# cf_workers=False使用普通代理
if not cf_workers:
proxy_url = cf_proxies
if not proxy_url.startswith(('http://', 'https://', 'socks5://')):
proxy_url = f"http://{proxy_url}"
response = curl_requests.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
timeout=timeout,
impersonate=impersonate,
proxies={"http": proxy_url, "https": proxy_url},
**kwargs
)
return CFSpiderResponse(response)
# cf_workers=True使用 CFspider Workers API 代理
cf_proxies = cf_proxies.rstrip("/")
if not cf_proxies.startswith(('http://', 'https://')):
cf_proxies = f"https://{cf_proxies}"
target_url = url
if params:
target_url = f"{url}?{urlencode(params)}"
proxy_url = f"{cf_proxies}/proxy?url={quote(target_url, safe='')}&method={method.upper()}"
request_headers = {}
if headers:
for key, value in headers.items():
request_headers[f"X-CFSpider-Header-{key}"] = value
if cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
request_headers["X-CFSpider-Header-Cookie"] = cookie_str
response = curl_requests.post(
proxy_url,
headers=request_headers,
data=data,
json=json_data,
timeout=timeout,
impersonate=impersonate,
**kwargs
)
cf_colo = response.headers.get("X-CF-Colo")
cf_ray = response.headers.get("CF-Ray")
return CFSpiderResponse(response, cf_colo=cf_colo, cf_ray=cf_ray)
def _request_httpx(method, url, cf_proxies, cf_workers, params=None, headers=None,
data=None, json_data=None, cookies=None, timeout=30, **kwargs):
"""使用 httpx 发送请求(支持 HTTP/2"""
httpx = _get_httpx()
# 如果没有指定 cf_proxies直接请求
if not cf_proxies:
with httpx.Client(http2=True, timeout=timeout) as client:
response = client.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
**kwargs
)
return CFSpiderResponse(response)
# cf_workers=False使用普通代理
if not cf_workers:
proxy_url = cf_proxies
if not proxy_url.startswith(('http://', 'https://', 'socks5://')):
proxy_url = f"http://{proxy_url}"
with httpx.Client(http2=True, timeout=timeout, proxy=proxy_url) as client:
response = client.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
**kwargs
)
return CFSpiderResponse(response)
# cf_workers=True使用 CFspider Workers API 代理
cf_proxies = cf_proxies.rstrip("/")
if not cf_proxies.startswith(('http://', 'https://')):
cf_proxies = f"https://{cf_proxies}"
target_url = url
if params:
target_url = f"{url}?{urlencode(params)}"
proxy_url = f"{cf_proxies}/proxy?url={quote(target_url, safe='')}&method={method.upper()}"
request_headers = {}
if headers:
for key, value in headers.items():
request_headers[f"X-CFSpider-Header-{key}"] = value
if cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
request_headers["X-CFSpider-Header-Cookie"] = cookie_str
with httpx.Client(http2=True, timeout=timeout) as client:
response = client.post(
proxy_url,
headers=request_headers,
data=data,
json=json_data,
**kwargs
)
cf_colo = response.headers.get("X-CF-Colo")
cf_ray = response.headers.get("CF-Ray")
return CFSpiderResponse(response, cf_colo=cf_colo, cf_ray=cf_ray)
def get(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""
发送 GET 请求
Args:
url: 目标 URL
cf_proxies: 代理地址
cf_workers: 是否使用 Workers API默认 True
http2: 是否启用 HTTP/2
impersonate: TLS 指纹(如 "chrome131", "safari18_0", "firefox133"
map_output: 是否生成 IP 地图 HTML 文件
map_file: 地图输出文件名
stealth: 是否启用隐身模式(自动添加完整浏览器请求头)
stealth_browser: 隐身模式浏览器类型chrome/firefox/safari/edge/chrome_mobile
delay: 请求前随机延迟范围,如 (1, 3)
"""
return request("GET", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
def post(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 POST 请求"""
return request("POST", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
def put(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 PUT 请求"""
return request("PUT", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
def delete(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 DELETE 请求"""
return request("DELETE", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
def head(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 HEAD 请求"""
return request("HEAD", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
def options(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 OPTIONS 请求"""
return request("OPTIONS", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
def patch(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 PATCH 请求"""
return request("PATCH", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
def clear_map_records():
"""清空 IP 地图记录"""
ip_map.clear_records()
def get_map_collector():
"""获取 IP 地图收集器"""
return ip_map.get_collector()

412
cfspider/async_api.py Normal file
View File

@@ -0,0 +1,412 @@
"""
CFspider 异步 API 模块
基于 httpx 实现,提供:
- 异步 HTTP 请求async/await
- HTTP/2 协议支持
- 流式响应(大文件下载)
- 并发请求控制
使用前需要安装 httpx
pip install httpx[http2]
快速开始:
>>> import cfspider
>>> import asyncio
>>>
>>> async def main():
... # 异步 GET 请求
... response = await cfspider.aget("https://httpbin.org/ip")
... print(response.json())
...
... # 并发请求
... urls = ["https://httpbin.org/ip"] * 5
... tasks = [cfspider.aget(url, cf_proxies="...") for url in urls]
... responses = await asyncio.gather(*tasks)
>>>
>>> asyncio.run(main())
性能对比:
- 同步请求 10 个 URL约 10 秒(串行)
- 异步请求 10 个 URL约 1 秒(并发)
"""
import httpx
from urllib.parse import urlencode, quote
from typing import Optional, Dict, Any, AsyncIterator
from contextlib import asynccontextmanager
class AsyncCFSpiderResponse:
"""
异步响应对象
封装 httpx.Response提供与同步 CFSpiderResponse 一致的接口,
并额外支持异步迭代(用于流式处理)。
Attributes:
cf_colo (str): Cloudflare 数据中心代码
cf_ray (str): Cloudflare Ray ID
text (str): 响应文本
content (bytes): 响应字节
status_code (int): HTTP 状态码
headers: 响应头
http_version (str): HTTP 版本HTTP/1.1 或 HTTP/2
Methods:
json(): 解析 JSON
aiter_bytes(): 异步迭代响应字节
aiter_text(): 异步迭代响应文本
aiter_lines(): 异步迭代响应行
Example:
>>> response = await cfspider.aget("https://httpbin.org/ip")
>>> print(response.http_version) # HTTP/2
>>> data = response.json()
"""
def __init__(self, response: httpx.Response, cf_colo: Optional[str] = None, cf_ray: Optional[str] = None):
self._response = response
self.cf_colo = cf_colo
self.cf_ray = cf_ray
@property
def text(self) -> str:
return self._response.text
@property
def content(self) -> bytes:
return self._response.content
@property
def status_code(self) -> int:
return self._response.status_code
@property
def headers(self) -> httpx.Headers:
return self._response.headers
@property
def cookies(self) -> httpx.Cookies:
return self._response.cookies
@property
def url(self) -> httpx.URL:
return self._response.url
@property
def encoding(self) -> Optional[str]:
return self._response.encoding
@property
def http_version(self) -> str:
"""获取 HTTP 协议版本(如 HTTP/1.1 或 HTTP/2"""
return self._response.http_version
def json(self, **kwargs) -> Any:
return self._response.json(**kwargs)
def raise_for_status(self) -> None:
self._response.raise_for_status()
async def aiter_bytes(self, chunk_size: Optional[int] = None) -> AsyncIterator[bytes]:
"""异步迭代响应字节"""
async for chunk in self._response.aiter_bytes(chunk_size):
yield chunk
async def aiter_text(self, chunk_size: Optional[int] = None) -> AsyncIterator[str]:
"""异步迭代响应文本"""
async for chunk in self._response.aiter_text(chunk_size):
yield chunk
async def aiter_lines(self) -> AsyncIterator[str]:
"""异步迭代响应行"""
async for line in self._response.aiter_lines():
yield line
class AsyncStreamResponse:
"""流式响应对象,用于大文件下载"""
def __init__(self, response: httpx.Response, cf_colo: Optional[str] = None, cf_ray: Optional[str] = None):
self._response = response
self.cf_colo = cf_colo
self.cf_ray = cf_ray
@property
def status_code(self) -> int:
return self._response.status_code
@property
def headers(self) -> httpx.Headers:
return self._response.headers
@property
def http_version(self) -> str:
return self._response.http_version
async def aiter_bytes(self, chunk_size: Optional[int] = None) -> AsyncIterator[bytes]:
"""异步迭代响应字节"""
async for chunk in self._response.aiter_bytes(chunk_size):
yield chunk
async def aiter_text(self, chunk_size: Optional[int] = None) -> AsyncIterator[str]:
"""异步迭代响应文本"""
async for chunk in self._response.aiter_text(chunk_size):
yield chunk
async def aiter_lines(self) -> AsyncIterator[str]:
"""异步迭代响应行"""
async for line in self._response.aiter_lines():
yield line
async def aread(self) -> bytes:
"""读取全部响应内容"""
return await self._response.aread()
async def aclose(self) -> None:
"""关闭响应"""
await self._response.aclose()
async def arequest(
method: str,
url: str,
cf_proxies: Optional[str] = None,
cf_workers: bool = True,
http2: bool = True,
**kwargs
) -> AsyncCFSpiderResponse:
"""
发送异步 HTTP 请求
Args:
method: HTTP 方法
url: 目标 URL
cf_proxies: 代理地址(选填)
- 当 cf_workers=True 时,填写 CFspider Workers 地址
- 当 cf_workers=False 时,填写普通代理地址
cf_workers: 是否使用 CFspider Workers API默认 True
http2: 是否启用 HTTP/2默认 True
**kwargs: 其他参数
Returns:
AsyncCFSpiderResponse: 异步响应对象
"""
params = kwargs.pop("params", None)
headers = kwargs.pop("headers", {})
data = kwargs.pop("data", None)
json_data = kwargs.pop("json", None)
cookies = kwargs.pop("cookies", None)
timeout = kwargs.pop("timeout", 30)
# 如果没有指定 cf_proxies直接请求
if not cf_proxies:
async with httpx.AsyncClient(http2=http2, timeout=timeout) as client:
response = await client.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
**kwargs
)
return AsyncCFSpiderResponse(response)
# cf_workers=False使用普通代理
if not cf_workers:
proxy_url = cf_proxies
if not proxy_url.startswith(('http://', 'https://', 'socks5://')):
proxy_url = f"http://{proxy_url}"
async with httpx.AsyncClient(http2=http2, timeout=timeout, proxy=proxy_url) as client:
response = await client.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
**kwargs
)
return AsyncCFSpiderResponse(response)
# cf_workers=True使用 CFspider Workers API 代理
cf_proxies = cf_proxies.rstrip("/")
if not cf_proxies.startswith(('http://', 'https://')):
cf_proxies = f"https://{cf_proxies}"
target_url = url
if params:
target_url = f"{url}?{urlencode(params)}"
proxy_url = f"{cf_proxies}/proxy?url={quote(target_url, safe='')}&method={method.upper()}"
request_headers = {}
if headers:
for key, value in headers.items():
request_headers[f"X-CFSpider-Header-{key}"] = value
if cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
request_headers["X-CFSpider-Header-Cookie"] = cookie_str
async with httpx.AsyncClient(http2=http2, timeout=timeout) as client:
response = await client.post(
proxy_url,
headers=request_headers,
data=data,
json=json_data,
**kwargs
)
cf_colo = response.headers.get("X-CF-Colo")
cf_ray = response.headers.get("CF-Ray")
return AsyncCFSpiderResponse(response, cf_colo=cf_colo, cf_ray=cf_ray)
@asynccontextmanager
async def astream(
method: str,
url: str,
cf_proxies: Optional[str] = None,
cf_workers: bool = True,
http2: bool = True,
**kwargs
) -> AsyncIterator[AsyncStreamResponse]:
"""
流式请求上下文管理器
Args:
method: HTTP 方法
url: 目标 URL
cf_proxies: 代理地址(选填)
cf_workers: 是否使用 CFspider Workers API默认 True
http2: 是否启用 HTTP/2默认 True
**kwargs: 其他参数
Yields:
AsyncStreamResponse: 流式响应对象
Example:
async with cfspider.astream("GET", url) as response:
async for chunk in response.aiter_bytes():
process(chunk)
"""
params = kwargs.pop("params", None)
headers = kwargs.pop("headers", {})
data = kwargs.pop("data", None)
json_data = kwargs.pop("json", None)
cookies = kwargs.pop("cookies", None)
timeout = kwargs.pop("timeout", 30)
# 如果没有指定 cf_proxies直接请求
if not cf_proxies:
async with httpx.AsyncClient(http2=http2, timeout=timeout) as client:
async with client.stream(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
**kwargs
) as response:
yield AsyncStreamResponse(response)
return
# cf_workers=False使用普通代理
if not cf_workers:
proxy_url = cf_proxies
if not proxy_url.startswith(('http://', 'https://', 'socks5://')):
proxy_url = f"http://{proxy_url}"
async with httpx.AsyncClient(http2=http2, timeout=timeout, proxy=proxy_url) as client:
async with client.stream(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
**kwargs
) as response:
yield AsyncStreamResponse(response)
return
# cf_workers=True使用 CFspider Workers API 代理
cf_proxies_url = cf_proxies.rstrip("/")
if not cf_proxies_url.startswith(('http://', 'https://')):
cf_proxies_url = f"https://{cf_proxies_url}"
target_url = url
if params:
target_url = f"{url}?{urlencode(params)}"
proxy_endpoint = f"{cf_proxies_url}/proxy?url={quote(target_url, safe='')}&method={method.upper()}"
request_headers = {}
if headers:
for key, value in headers.items():
request_headers[f"X-CFSpider-Header-{key}"] = value
if cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
request_headers["X-CFSpider-Header-Cookie"] = cookie_str
async with httpx.AsyncClient(http2=http2, timeout=timeout) as client:
async with client.stream(
"POST",
proxy_endpoint,
headers=request_headers,
data=data,
json=json_data,
**kwargs
) as response:
cf_colo = response.headers.get("X-CF-Colo")
cf_ray = response.headers.get("CF-Ray")
yield AsyncStreamResponse(response, cf_colo=cf_colo, cf_ray=cf_ray)
# 便捷方法
async def aget(url: str, cf_proxies: Optional[str] = None, cf_workers: bool = True, http2: bool = True, **kwargs) -> AsyncCFSpiderResponse:
"""异步 GET 请求"""
return await arequest("GET", url, cf_proxies=cf_proxies, cf_workers=cf_workers, http2=http2, **kwargs)
async def apost(url: str, cf_proxies: Optional[str] = None, cf_workers: bool = True, http2: bool = True, **kwargs) -> AsyncCFSpiderResponse:
"""异步 POST 请求"""
return await arequest("POST", url, cf_proxies=cf_proxies, cf_workers=cf_workers, http2=http2, **kwargs)
async def aput(url: str, cf_proxies: Optional[str] = None, cf_workers: bool = True, http2: bool = True, **kwargs) -> AsyncCFSpiderResponse:
"""异步 PUT 请求"""
return await arequest("PUT", url, cf_proxies=cf_proxies, cf_workers=cf_workers, http2=http2, **kwargs)
async def adelete(url: str, cf_proxies: Optional[str] = None, cf_workers: bool = True, http2: bool = True, **kwargs) -> AsyncCFSpiderResponse:
"""异步 DELETE 请求"""
return await arequest("DELETE", url, cf_proxies=cf_proxies, cf_workers=cf_workers, http2=http2, **kwargs)
async def ahead(url: str, cf_proxies: Optional[str] = None, cf_workers: bool = True, http2: bool = True, **kwargs) -> AsyncCFSpiderResponse:
"""异步 HEAD 请求"""
return await arequest("HEAD", url, cf_proxies=cf_proxies, cf_workers=cf_workers, http2=http2, **kwargs)
async def aoptions(url: str, cf_proxies: Optional[str] = None, cf_workers: bool = True, http2: bool = True, **kwargs) -> AsyncCFSpiderResponse:
"""异步 OPTIONS 请求"""
return await arequest("OPTIONS", url, cf_proxies=cf_proxies, cf_workers=cf_workers, http2=http2, **kwargs)
async def apatch(url: str, cf_proxies: Optional[str] = None, cf_workers: bool = True, http2: bool = True, **kwargs) -> AsyncCFSpiderResponse:
"""异步 PATCH 请求"""
return await arequest("PATCH", url, cf_proxies=cf_proxies, cf_workers=cf_workers, http2=http2, **kwargs)

276
cfspider/async_session.py Normal file
View File

@@ -0,0 +1,276 @@
"""
CFspider 异步会话模块
基于 httpx 实现,提供可复用的异步 HTTP 客户端,支持 HTTP/2 和连接池。
"""
import httpx
from urllib.parse import urlencode, quote
from typing import Optional, Dict, Any, AsyncIterator
from contextlib import asynccontextmanager
from .async_api import AsyncCFSpiderResponse, AsyncStreamResponse
class AsyncSession:
"""
异步会话类
提供可复用的 httpx.AsyncClient支持 HTTP/2 和连接池。
Example:
async with cfspider.AsyncSession(cf_proxies="workers.dev") as session:
r1 = await session.get("https://example.com")
r2 = await session.post("https://example.com", json={"key": "value"})
"""
def __init__(
self,
cf_proxies: Optional[str] = None,
cf_workers: bool = True,
http2: bool = True,
timeout: float = 30,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
**kwargs
):
"""
初始化异步会话
Args:
cf_proxies: 代理地址(选填)
cf_workers: 是否使用 CFspider Workers API默认 True
http2: 是否启用 HTTP/2默认 True
timeout: 默认超时时间(秒)
headers: 默认请求头
cookies: 默认 Cookies
**kwargs: 传递给 httpx.AsyncClient 的其他参数
"""
self.cf_proxies = cf_proxies
self.cf_workers = cf_workers
self.http2 = http2
self.timeout = timeout
self.headers = headers or {}
self.cookies = cookies or {}
self._client_kwargs = kwargs
self._client: Optional[httpx.AsyncClient] = None
async def __aenter__(self) -> "AsyncSession":
"""进入异步上下文"""
await self._ensure_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""退出异步上下文"""
await self.close()
async def _ensure_client(self) -> None:
"""确保客户端已创建"""
if self._client is None:
# 处理代理
proxy = None
if self.cf_proxies and not self.cf_workers:
proxy = self.cf_proxies
if not proxy.startswith(('http://', 'https://', 'socks5://')):
proxy = f"http://{proxy}"
self._client = httpx.AsyncClient(
http2=self.http2,
timeout=self.timeout,
proxy=proxy,
headers=self.headers,
cookies=self.cookies,
**self._client_kwargs
)
async def close(self) -> None:
"""关闭会话"""
if self._client is not None:
await self._client.aclose()
self._client = None
async def request(
self,
method: str,
url: str,
**kwargs
) -> AsyncCFSpiderResponse:
"""
发送请求
Args:
method: HTTP 方法
url: 目标 URL
**kwargs: 请求参数
Returns:
AsyncCFSpiderResponse: 异步响应对象
"""
await self._ensure_client()
params = kwargs.pop("params", None)
headers = kwargs.pop("headers", {})
data = kwargs.pop("data", None)
json_data = kwargs.pop("json", None)
cookies = kwargs.pop("cookies", None)
timeout = kwargs.pop("timeout", None)
# 合并 headers
merged_headers = {**self.headers, **headers}
# 如果没有 cf_proxies 或不使用 Workers API直接请求
if not self.cf_proxies or not self.cf_workers:
response = await self._client.request(
method,
url,
params=params,
headers=merged_headers,
data=data,
json=json_data,
cookies=cookies,
timeout=timeout,
**kwargs
)
return AsyncCFSpiderResponse(response)
# 使用 CFspider Workers API 代理
cf_proxies_url = self.cf_proxies.rstrip("/")
if not cf_proxies_url.startswith(('http://', 'https://')):
cf_proxies_url = f"https://{cf_proxies_url}"
target_url = url
if params:
target_url = f"{url}?{urlencode(params)}"
proxy_url = f"{cf_proxies_url}/proxy?url={quote(target_url, safe='')}&method={method.upper()}"
request_headers = {}
for key, value in merged_headers.items():
request_headers[f"X-CFSpider-Header-{key}"] = value
all_cookies = {**self.cookies, **(cookies or {})}
if all_cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in all_cookies.items()])
request_headers["X-CFSpider-Header-Cookie"] = cookie_str
response = await self._client.post(
proxy_url,
headers=request_headers,
data=data,
json=json_data,
timeout=timeout,
**kwargs
)
cf_colo = response.headers.get("X-CF-Colo")
cf_ray = response.headers.get("CF-Ray")
return AsyncCFSpiderResponse(response, cf_colo=cf_colo, cf_ray=cf_ray)
@asynccontextmanager
async def stream(
self,
method: str,
url: str,
**kwargs
) -> AsyncIterator[AsyncStreamResponse]:
"""
流式请求
Args:
method: HTTP 方法
url: 目标 URL
**kwargs: 请求参数
Yields:
AsyncStreamResponse: 流式响应对象
"""
await self._ensure_client()
params = kwargs.pop("params", None)
headers = kwargs.pop("headers", {})
data = kwargs.pop("data", None)
json_data = kwargs.pop("json", None)
cookies = kwargs.pop("cookies", None)
timeout = kwargs.pop("timeout", None)
merged_headers = {**self.headers, **headers}
# 如果没有 cf_proxies 或不使用 Workers API直接请求
if not self.cf_proxies or not self.cf_workers:
async with self._client.stream(
method,
url,
params=params,
headers=merged_headers,
data=data,
json=json_data,
cookies=cookies,
timeout=timeout,
**kwargs
) as response:
yield AsyncStreamResponse(response)
return
# 使用 CFspider Workers API 代理
cf_proxies_url = self.cf_proxies.rstrip("/")
if not cf_proxies_url.startswith(('http://', 'https://')):
cf_proxies_url = f"https://{cf_proxies_url}"
target_url = url
if params:
target_url = f"{url}?{urlencode(params)}"
proxy_url = f"{cf_proxies_url}/proxy?url={quote(target_url, safe='')}&method={method.upper()}"
request_headers = {}
for key, value in merged_headers.items():
request_headers[f"X-CFSpider-Header-{key}"] = value
all_cookies = {**self.cookies, **(cookies or {})}
if all_cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in all_cookies.items()])
request_headers["X-CFSpider-Header-Cookie"] = cookie_str
async with self._client.stream(
"POST",
proxy_url,
headers=request_headers,
data=data,
json=json_data,
timeout=timeout,
**kwargs
) as response:
cf_colo = response.headers.get("X-CF-Colo")
cf_ray = response.headers.get("CF-Ray")
yield AsyncStreamResponse(response, cf_colo=cf_colo, cf_ray=cf_ray)
async def get(self, url: str, **kwargs) -> AsyncCFSpiderResponse:
"""异步 GET 请求"""
return await self.request("GET", url, **kwargs)
async def post(self, url: str, **kwargs) -> AsyncCFSpiderResponse:
"""异步 POST 请求"""
return await self.request("POST", url, **kwargs)
async def put(self, url: str, **kwargs) -> AsyncCFSpiderResponse:
"""异步 PUT 请求"""
return await self.request("PUT", url, **kwargs)
async def delete(self, url: str, **kwargs) -> AsyncCFSpiderResponse:
"""异步 DELETE 请求"""
return await self.request("DELETE", url, **kwargs)
async def head(self, url: str, **kwargs) -> AsyncCFSpiderResponse:
"""异步 HEAD 请求"""
return await self.request("HEAD", url, **kwargs)
async def options(self, url: str, **kwargs) -> AsyncCFSpiderResponse:
"""异步 OPTIONS 请求"""
return await self.request("OPTIONS", url, **kwargs)
async def patch(self, url: str, **kwargs) -> AsyncCFSpiderResponse:
"""异步 PATCH 请求"""
return await self.request("PATCH", url, **kwargs)

335
cfspider/browser.py Normal file
View File

@@ -0,0 +1,335 @@
"""
CFspider 浏览器模块
基于 Playwright 封装,支持通过 Cloudflare Workers 代理浏览器流量
"""
from urllib.parse import urlparse, parse_qs, unquote
from .vless_client import LocalVlessProxy
def parse_vless_link(vless_link):
"""
解析 VLESS 链接
支持格式:
vless://uuid@host:port?type=ws&path=/xxx#name
vless://uuid@host:port?path=%2Fxxx
vless://uuid@host:port
Args:
vless_link: VLESS 链接字符串
Returns:
dict: 包含 uuid, host, port, path 的字典,解析失败返回 None
"""
if not vless_link or not vless_link.startswith('vless://'):
return None
try:
# 移除 vless:// 前缀
link = vless_link[8:]
# 分离 fragment#后面的名称)
if '#' in link:
link = link.split('#')[0]
# 分离 query string
query_str = ""
if '?' in link:
link, query_str = link.split('?', 1)
# 解析 uuid@host:port
if '@' not in link:
return None
uuid, host_port = link.split('@', 1)
# 解析 host:port
if ':' in host_port:
host, port = host_port.rsplit(':', 1)
port = int(port)
else:
host = host_port
port = 443
# 解析 query 参数
path = "/"
if query_str:
params = parse_qs(query_str)
if 'path' in params:
path = unquote(params['path'][0])
return {
'uuid': uuid,
'host': host,
'port': port,
'path': path
}
except Exception:
return None
try:
from playwright.sync_api import sync_playwright, Page, Browser as PlaywrightBrowser
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
Page = None
PlaywrightBrowser = None
class BrowserNotInstalledError(Exception):
"""浏览器未安装错误"""
pass
class PlaywrightNotInstalledError(Exception):
"""Playwright 未安装错误"""
pass
class Browser:
"""
CFspider 浏览器类
封装 Playwright支持通过 Cloudflare Workers (edgetunnel) 代理浏览器流量
Example:
>>> import cfspider
>>> # 通过 edgetunnel Workers 代理
>>> browser = cfspider.Browser(cf_proxies="wss://v2.kami666.xyz")
>>> html = browser.html("https://example.com")
>>> browser.close()
>>>
>>> # 直接使用(无代理)
>>> browser = cfspider.Browser()
>>> html = browser.html("https://example.com")
>>> browser.close()
"""
def __init__(self, cf_proxies=None, headless=True, timeout=30, vless_uuid=None):
"""
初始化浏览器
Args:
cf_proxies: 代理地址(选填),支持以下格式:
- VLESS 链接: "vless://uuid@host:port?path=/xxx#name"(推荐)
- HTTP 代理: "http://ip:port""ip:port"
- SOCKS5 代理: "socks5://ip:port"
- edgetunnel 域名: "v2.example.com"(需配合 vless_uuid
不填则直接使用本地网络
headless: 是否无头模式,默认 True
timeout: 请求超时时间(秒),默认 30
vless_uuid: VLESS UUID选填使用域名方式时需要指定
如果使用完整 VLESS 链接,则无需此参数
Examples:
# 使用完整 VLESS 链接(推荐,无需填写 vless_uuid
browser = Browser(cf_proxies="vless://uuid@v2.example.com:443?path=/")
# 使用域名 + UUID旧方式
browser = Browser(cf_proxies="v2.example.com", vless_uuid="your-uuid")
# 使用 HTTP 代理
browser = Browser(cf_proxies="127.0.0.1:8080")
# 使用 SOCKS5 代理
browser = Browser(cf_proxies="socks5://127.0.0.1:1080")
"""
if not PLAYWRIGHT_AVAILABLE:
raise PlaywrightNotInstalledError(
"Playwright 未安装,请运行: pip install cfspider[browser]"
)
self.cf_proxies = cf_proxies
self.headless = headless
self.timeout = timeout
self._vless_proxy = None
# 解析代理地址
proxy_url = None
if cf_proxies:
# 1. 检查是否是 VLESS 链接
vless_info = parse_vless_link(cf_proxies)
if vless_info:
# 使用 VLESS 链接
ws_url = f"wss://{vless_info['host']}{vless_info['path']}"
self._vless_proxy = LocalVlessProxy(ws_url, vless_info['uuid'])
port = self._vless_proxy.start()
proxy_url = f"http://127.0.0.1:{port}"
# 2. HTTP/SOCKS5 代理格式
elif cf_proxies.startswith('http://') or cf_proxies.startswith('https://') or cf_proxies.startswith('socks5://'):
proxy_url = cf_proxies
# 3. IP:PORT 格式
elif ':' in cf_proxies and cf_proxies.replace('.', '').replace(':', '').isdigit():
proxy_url = f"http://{cf_proxies}"
# 4. 域名 + UUID旧方式
elif vless_uuid:
hostname = cf_proxies.replace('https://', '').replace('http://', '').replace('wss://', '').replace('ws://', '').split('/')[0]
ws_url = f'wss://{hostname}/{vless_uuid}'
self._vless_proxy = LocalVlessProxy(ws_url, vless_uuid)
port = self._vless_proxy.start()
proxy_url = f"http://127.0.0.1:{port}"
# 5. 默认当作 HTTP 代理
else:
proxy_url = f"http://{cf_proxies}"
# 启动 Playwright
self._playwright = sync_playwright().start()
# 启动浏览器
launch_options = {"headless": headless}
if proxy_url:
launch_options["proxy"] = {"server": proxy_url}
try:
self._browser = self._playwright.chromium.launch(**launch_options)
except Exception as e:
if self._vless_proxy:
self._vless_proxy.stop()
self._playwright.stop()
if "Executable doesn't exist" in str(e):
raise BrowserNotInstalledError(
"Chromium 浏览器未安装,请运行: cfspider install"
)
raise
# 创建默认上下文
self._context = self._browser.new_context(
ignore_https_errors=True
)
self._context.set_default_timeout(timeout * 1000)
def get(self, url):
"""
打开页面并返回 Page 对象
Args:
url: 目标 URL
Returns:
Page: Playwright Page 对象,可用于自动化操作
"""
page = self._context.new_page()
page.goto(url, wait_until="networkidle")
return page
def html(self, url, wait_until="domcontentloaded"):
"""
获取页面渲染后的 HTML
Args:
url: 目标 URL
wait_until: 等待策略,可选 "load", "domcontentloaded", "networkidle"
Returns:
str: 渲染后的 HTML 内容
"""
page = self._context.new_page()
try:
page.goto(url, wait_until=wait_until)
return page.content()
finally:
page.close()
def screenshot(self, url, path=None, full_page=False):
"""
页面截图
Args:
url: 目标 URL
path: 保存路径,如 "screenshot.png"
full_page: 是否截取整个页面,默认 False
Returns:
bytes: 截图的二进制数据
"""
page = self._context.new_page()
try:
page.goto(url, wait_until="networkidle")
return page.screenshot(path=path, full_page=full_page)
finally:
page.close()
def pdf(self, url, path=None):
"""
生成页面 PDF
Args:
url: 目标 URL
path: 保存路径,如 "page.pdf"
Returns:
bytes: PDF 的二进制数据
Note:
PDF 生成仅在无头模式下可用
"""
if not self.headless:
raise ValueError("PDF 生成仅在无头模式 (headless=True) 下可用")
page = self._context.new_page()
try:
page.goto(url, wait_until="networkidle")
return page.pdf(path=path)
finally:
page.close()
def execute_script(self, url, script):
"""
在页面中执行 JavaScript
Args:
url: 目标 URL
script: 要执行的 JavaScript 代码
Returns:
执行结果
"""
page = self._context.new_page()
try:
page.goto(url, wait_until="networkidle")
return page.evaluate(script)
finally:
page.close()
def new_page(self):
"""
创建新页面
Returns:
Page: 新的 Playwright Page 对象
"""
return self._context.new_page()
def close(self):
"""关闭浏览器和代理"""
try:
self._context.close()
except:
pass
try:
self._browser.close()
except:
pass
try:
self._playwright.stop()
except:
pass
if self._vless_proxy:
try:
self._vless_proxy.stop()
except:
pass
def __enter__(self):
"""支持 with 语句"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""支持 with 语句"""
self.close()
return False

81
cfspider/cli.py Normal file
View File

@@ -0,0 +1,81 @@
"""
CFspider 命令行工具
"""
import sys
import subprocess
def install_browser():
"""
安装 Chromium 浏览器
Example:
>>> import cfspider
>>> cfspider.install_browser()
"""
try:
# 使用 playwright 命令行安装
result = subprocess.run(
[sys.executable, '-m', 'playwright', 'install', 'chromium'],
capture_output=False
)
return result.returncode == 0
except Exception as e:
print(f"安装失败: {e}")
return False
def main():
"""命令行入口"""
if len(sys.argv) < 2:
print_help()
return
command = sys.argv[1].lower()
if command == 'install':
print("正在安装 Chromium 浏览器...")
if install_browser():
print("安装完成!")
else:
print("安装失败,请检查网络连接或手动安装")
sys.exit(1)
elif command == 'version':
from . import __version__
print(f"cfspider {__version__}")
elif command == 'help' or command == '-h' or command == '--help':
print_help()
else:
print(f"未知命令: {command}")
print_help()
sys.exit(1)
def print_help():
"""打印帮助信息"""
print("""
CFspider - Cloudflare 代理 IP 池
用法:
cfspider <command>
命令:
install 安装 Chromium 浏览器(用于 Browser 功能)
version 显示版本号
help 显示帮助信息
示例:
cfspider install # 安装浏览器
cfspider version # 显示版本
更多信息请访问: https://github.com/violettoolssite/CFspider
""")
if __name__ == '__main__':
main()

385
cfspider/impersonate.py Normal file
View File

@@ -0,0 +1,385 @@
"""
CFspider TLS 指纹模拟模块
基于 curl_cffi 实现,可模拟各种浏览器的 TLS 指纹,绕过反爬检测。
"""
from urllib.parse import urlencode, quote
from typing import Optional, Dict, Any, List
# 延迟导入 curl_cffi
_curl_cffi = None
def _get_curl_cffi():
"""延迟加载 curl_cffi 模块"""
global _curl_cffi
if _curl_cffi is None:
try:
from curl_cffi import requests as curl_requests
_curl_cffi = curl_requests
except ImportError:
raise ImportError(
"curl_cffi is required for TLS fingerprint impersonation. "
"Install it with: pip install curl_cffi"
)
return _curl_cffi
# 支持的浏览器指纹列表
SUPPORTED_BROWSERS = [
# Chrome
"chrome99", "chrome100", "chrome101", "chrome104", "chrome107",
"chrome110", "chrome116", "chrome119", "chrome120", "chrome123",
"chrome124", "chrome131",
# Chrome Android
"chrome99_android", "chrome131_android",
# Edge
"edge99", "edge101",
# Safari
"safari15_3", "safari15_5", "safari17_0", "safari17_2_ios",
"safari18_0", "safari18_0_ios",
# Firefox
"firefox102", "firefox109", "firefox133"
]
class ImpersonateResponse:
"""TLS 指纹模拟响应对象"""
def __init__(self, response, cf_colo: Optional[str] = None, cf_ray: Optional[str] = None):
self._response = response
self.cf_colo = cf_colo
self.cf_ray = cf_ray
@property
def text(self) -> str:
return self._response.text
@property
def content(self) -> bytes:
return self._response.content
@property
def status_code(self) -> int:
return self._response.status_code
@property
def headers(self) -> Dict:
return dict(self._response.headers)
@property
def cookies(self) -> Dict:
return dict(self._response.cookies)
@property
def url(self) -> str:
return str(self._response.url)
def json(self, **kwargs) -> Any:
return self._response.json(**kwargs)
def raise_for_status(self) -> None:
self._response.raise_for_status()
def impersonate_request(
method: str,
url: str,
impersonate: str = "chrome131",
cf_proxies: Optional[str] = None,
cf_workers: bool = True,
**kwargs
) -> ImpersonateResponse:
"""
使用 TLS 指纹模拟发送请求
Args:
method: HTTP 方法
url: 目标 URL
impersonate: 浏览器指纹(如 chrome131, safari18_0, firefox133
cf_proxies: 代理地址(选填)
cf_workers: 是否使用 CFspider Workers API默认 True
**kwargs: 其他参数
Returns:
ImpersonateResponse: 响应对象
Example:
>>> response = cfspider.impersonate_get("https://example.com", impersonate="chrome131")
>>> print(response.text)
"""
curl_requests = _get_curl_cffi()
params = kwargs.pop("params", None)
headers = kwargs.pop("headers", {})
data = kwargs.pop("data", None)
json_data = kwargs.pop("json", None)
cookies = kwargs.pop("cookies", None)
timeout = kwargs.pop("timeout", 30)
# 验证浏览器指纹
if impersonate not in SUPPORTED_BROWSERS:
raise ValueError(
f"Unsupported browser: {impersonate}. "
f"Supported browsers: {', '.join(SUPPORTED_BROWSERS[:10])}..."
)
# 如果没有指定 cf_proxies直接请求
if not cf_proxies:
response = curl_requests.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
timeout=timeout,
impersonate=impersonate,
**kwargs
)
return ImpersonateResponse(response)
# cf_workers=False使用普通代理
if not cf_workers:
proxy_url = cf_proxies
if not proxy_url.startswith(('http://', 'https://', 'socks5://')):
proxy_url = f"http://{proxy_url}"
response = curl_requests.request(
method,
url,
params=params,
headers=headers,
data=data,
json=json_data,
cookies=cookies,
timeout=timeout,
impersonate=impersonate,
proxies={"http": proxy_url, "https": proxy_url},
**kwargs
)
return ImpersonateResponse(response)
# cf_workers=True使用 CFspider Workers API 代理
cf_proxies = cf_proxies.rstrip("/")
if not cf_proxies.startswith(('http://', 'https://')):
cf_proxies = f"https://{cf_proxies}"
target_url = url
if params:
target_url = f"{url}?{urlencode(params)}"
proxy_url = f"{cf_proxies}/proxy?url={quote(target_url, safe='')}&method={method.upper()}"
request_headers = {}
if headers:
for key, value in headers.items():
request_headers[f"X-CFSpider-Header-{key}"] = value
if cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
request_headers["X-CFSpider-Header-Cookie"] = cookie_str
response = curl_requests.post(
proxy_url,
headers=request_headers,
data=data,
json=json_data,
timeout=timeout,
impersonate=impersonate,
**kwargs
)
cf_colo = response.headers.get("X-CF-Colo")
cf_ray = response.headers.get("CF-Ray")
return ImpersonateResponse(response, cf_colo=cf_colo, cf_ray=cf_ray)
# 便捷方法
def impersonate_get(url: str, impersonate: str = "chrome131", **kwargs) -> ImpersonateResponse:
"""使用 TLS 指纹模拟发送 GET 请求"""
return impersonate_request("GET", url, impersonate=impersonate, **kwargs)
def impersonate_post(url: str, impersonate: str = "chrome131", **kwargs) -> ImpersonateResponse:
"""使用 TLS 指纹模拟发送 POST 请求"""
return impersonate_request("POST", url, impersonate=impersonate, **kwargs)
def impersonate_put(url: str, impersonate: str = "chrome131", **kwargs) -> ImpersonateResponse:
"""使用 TLS 指纹模拟发送 PUT 请求"""
return impersonate_request("PUT", url, impersonate=impersonate, **kwargs)
def impersonate_delete(url: str, impersonate: str = "chrome131", **kwargs) -> ImpersonateResponse:
"""使用 TLS 指纹模拟发送 DELETE 请求"""
return impersonate_request("DELETE", url, impersonate=impersonate, **kwargs)
def impersonate_head(url: str, impersonate: str = "chrome131", **kwargs) -> ImpersonateResponse:
"""使用 TLS 指纹模拟发送 HEAD 请求"""
return impersonate_request("HEAD", url, impersonate=impersonate, **kwargs)
def impersonate_options(url: str, impersonate: str = "chrome131", **kwargs) -> ImpersonateResponse:
"""使用 TLS 指纹模拟发送 OPTIONS 请求"""
return impersonate_request("OPTIONS", url, impersonate=impersonate, **kwargs)
def impersonate_patch(url: str, impersonate: str = "chrome131", **kwargs) -> ImpersonateResponse:
"""使用 TLS 指纹模拟发送 PATCH 请求"""
return impersonate_request("PATCH", url, impersonate=impersonate, **kwargs)
class ImpersonateSession:
"""
TLS 指纹模拟会话类
Example:
>>> with cfspider.ImpersonateSession(impersonate="chrome131") as session:
>>> r1 = session.get("https://example.com")
>>> r2 = session.post("https://api.example.com", json={"key": "value"})
"""
def __init__(
self,
impersonate: str = "chrome131",
cf_proxies: Optional[str] = None,
cf_workers: bool = True,
timeout: float = 30,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
**kwargs
):
"""
初始化 TLS 指纹模拟会话
Args:
impersonate: 浏览器指纹(默认 chrome131
cf_proxies: 代理地址(选填)
cf_workers: 是否使用 CFspider Workers API默认 True
timeout: 默认超时时间(秒)
headers: 默认请求头
cookies: 默认 Cookies
"""
curl_requests = _get_curl_cffi()
if impersonate not in SUPPORTED_BROWSERS:
raise ValueError(
f"Unsupported browser: {impersonate}. "
f"Supported browsers: {', '.join(SUPPORTED_BROWSERS[:10])}..."
)
self.impersonate = impersonate
self.cf_proxies = cf_proxies
self.cf_workers = cf_workers
self.timeout = timeout
self.headers = headers or {}
self.cookies = cookies or {}
self._session = curl_requests.Session(impersonate=impersonate)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def close(self):
"""关闭会话"""
if self._session:
self._session.close()
def request(self, method: str, url: str, **kwargs) -> ImpersonateResponse:
"""发送请求"""
merged_headers = {**self.headers, **kwargs.pop("headers", {})}
merged_cookies = {**self.cookies, **kwargs.pop("cookies", {})}
timeout = kwargs.pop("timeout", self.timeout)
# 如果没有 cf_proxies 或不使用 Workers API直接请求
if not self.cf_proxies or not self.cf_workers:
proxies = None
if self.cf_proxies and not self.cf_workers:
proxy_url = self.cf_proxies
if not proxy_url.startswith(('http://', 'https://', 'socks5://')):
proxy_url = f"http://{proxy_url}"
proxies = {"http": proxy_url, "https": proxy_url}
response = self._session.request(
method,
url,
headers=merged_headers,
cookies=merged_cookies,
timeout=timeout,
proxies=proxies,
**kwargs
)
return ImpersonateResponse(response)
# 使用 CFspider Workers API 代理
cf_proxies_url = self.cf_proxies.rstrip("/")
if not cf_proxies_url.startswith(('http://', 'https://')):
cf_proxies_url = f"https://{cf_proxies_url}"
params = kwargs.pop("params", None)
target_url = url
if params:
target_url = f"{url}?{urlencode(params)}"
proxy_url = f"{cf_proxies_url}/proxy?url={quote(target_url, safe='')}&method={method.upper()}"
request_headers = {}
for key, value in merged_headers.items():
request_headers[f"X-CFSpider-Header-{key}"] = value
if merged_cookies:
cookie_str = "; ".join([f"{k}={v}" for k, v in merged_cookies.items()])
request_headers["X-CFSpider-Header-Cookie"] = cookie_str
response = self._session.post(
proxy_url,
headers=request_headers,
timeout=timeout,
**kwargs
)
cf_colo = response.headers.get("X-CF-Colo")
cf_ray = response.headers.get("CF-Ray")
return ImpersonateResponse(response, cf_colo=cf_colo, cf_ray=cf_ray)
def get(self, url: str, **kwargs) -> ImpersonateResponse:
"""GET 请求"""
return self.request("GET", url, **kwargs)
def post(self, url: str, **kwargs) -> ImpersonateResponse:
"""POST 请求"""
return self.request("POST", url, **kwargs)
def put(self, url: str, **kwargs) -> ImpersonateResponse:
"""PUT 请求"""
return self.request("PUT", url, **kwargs)
def delete(self, url: str, **kwargs) -> ImpersonateResponse:
"""DELETE 请求"""
return self.request("DELETE", url, **kwargs)
def head(self, url: str, **kwargs) -> ImpersonateResponse:
"""HEAD 请求"""
return self.request("HEAD", url, **kwargs)
def options(self, url: str, **kwargs) -> ImpersonateResponse:
"""OPTIONS 请求"""
return self.request("OPTIONS", url, **kwargs)
def patch(self, url: str, **kwargs) -> ImpersonateResponse:
"""PATCH 请求"""
return self.request("PATCH", url, **kwargs)
def get_supported_browsers() -> List[str]:
"""获取支持的浏览器指纹列表"""
return SUPPORTED_BROWSERS.copy()

522
cfspider/ip_map.py Normal file
View File

@@ -0,0 +1,522 @@
"""
CFspider IP 地图可视化模块
生成包含代理 IP 地理位置的 HTML 地图文件,使用 MapLibre GL JS。
"""
import json
from typing import Optional, List, Dict, Any
from datetime import datetime
# Cloudflare 节点代码对应的坐标(主要节点)
COLO_COORDINATES = {
# 亚洲
"NRT": {"lat": 35.6762, "lng": 139.6503, "city": "东京", "country": "日本"},
"HND": {"lat": 35.5494, "lng": 139.7798, "city": "东京羽田", "country": "日本"},
"KIX": {"lat": 34.4347, "lng": 135.2441, "city": "大阪", "country": "日本"},
"HKG": {"lat": 22.3080, "lng": 113.9185, "city": "香港", "country": "香港"},
"SIN": {"lat": 1.3521, "lng": 103.8198, "city": "新加坡", "country": "新加坡"},
"ICN": {"lat": 37.4602, "lng": 126.4407, "city": "首尔", "country": "韩国"},
"TPE": {"lat": 25.0777, "lng": 121.2330, "city": "台北", "country": "台湾"},
"BKK": {"lat": 13.6900, "lng": 100.7501, "city": "曼谷", "country": "泰国"},
"KUL": {"lat": 2.7456, "lng": 101.7072, "city": "吉隆坡", "country": "马来西亚"},
"BOM": {"lat": 19.0896, "lng": 72.8656, "city": "孟买", "country": "印度"},
"DEL": {"lat": 28.5562, "lng": 77.1000, "city": "新德里", "country": "印度"},
"SYD": {"lat": -33.9399, "lng": 151.1753, "city": "悉尼", "country": "澳大利亚"},
"MEL": {"lat": -37.6690, "lng": 144.8410, "city": "墨尔本", "country": "澳大利亚"},
# 北美
"SJC": {"lat": 37.3639, "lng": -121.9289, "city": "圣何塞", "country": "美国"},
"LAX": {"lat": 33.9416, "lng": -118.4085, "city": "洛杉矶", "country": "美国"},
"SEA": {"lat": 47.4502, "lng": -122.3088, "city": "西雅图", "country": "美国"},
"DFW": {"lat": 32.8998, "lng": -97.0403, "city": "达拉斯", "country": "美国"},
"ORD": {"lat": 41.9742, "lng": -87.9073, "city": "芝加哥", "country": "美国"},
"IAD": {"lat": 38.9531, "lng": -77.4565, "city": "华盛顿", "country": "美国"},
"EWR": {"lat": 40.6895, "lng": -74.1745, "city": "纽瓦克", "country": "美国"},
"MIA": {"lat": 25.7959, "lng": -80.2870, "city": "迈阿密", "country": "美国"},
"ATL": {"lat": 33.6407, "lng": -84.4277, "city": "亚特兰大", "country": "美国"},
"YYZ": {"lat": 43.6777, "lng": -79.6248, "city": "多伦多", "country": "加拿大"},
"YVR": {"lat": 49.1947, "lng": -123.1789, "city": "温哥华", "country": "加拿大"},
# 欧洲
"LHR": {"lat": 51.4700, "lng": -0.4543, "city": "伦敦", "country": "英国"},
"CDG": {"lat": 49.0097, "lng": 2.5479, "city": "巴黎", "country": "法国"},
"FRA": {"lat": 50.0379, "lng": 8.5622, "city": "法兰克福", "country": "德国"},
"AMS": {"lat": 52.3105, "lng": 4.7683, "city": "阿姆斯特丹", "country": "荷兰"},
"ZRH": {"lat": 47.4647, "lng": 8.5492, "city": "苏黎世", "country": "瑞士"},
"MAD": {"lat": 40.4983, "lng": -3.5676, "city": "马德里", "country": "西班牙"},
"MXP": {"lat": 45.6306, "lng": 8.7281, "city": "米兰", "country": "意大利"},
"WAW": {"lat": 52.1672, "lng": 20.9679, "city": "华沙", "country": "波兰"},
"ARN": {"lat": 59.6498, "lng": 17.9238, "city": "斯德哥尔摩", "country": "瑞典"},
# 南美
"GRU": {"lat": -23.4356, "lng": -46.4731, "city": "圣保罗", "country": "巴西"},
"EZE": {"lat": -34.8222, "lng": -58.5358, "city": "布宜诺斯艾利斯", "country": "阿根廷"},
"SCL": {"lat": -33.3930, "lng": -70.7858, "city": "圣地亚哥", "country": "智利"},
# 中东/非洲
"DXB": {"lat": 25.2532, "lng": 55.3657, "city": "迪拜", "country": "阿联酋"},
"JNB": {"lat": -26.1392, "lng": 28.2460, "city": "约翰内斯堡", "country": "南非"},
"CAI": {"lat": 30.1219, "lng": 31.4056, "city": "开罗", "country": "埃及"},
}
class IPMapCollector:
"""
IP 地图数据收集器
收集爬取过程中使用的代理 IP 信息,用于生成可视化地图。
"""
def __init__(self):
self.ip_records: List[Dict[str, Any]] = []
def add_record(
self,
url: str,
ip: Optional[str] = None,
cf_colo: Optional[str] = None,
cf_ray: Optional[str] = None,
status_code: Optional[int] = None,
response_time: Optional[float] = None
):
"""添加一条 IP 记录"""
record = {
"url": url,
"ip": ip,
"cf_colo": cf_colo,
"cf_ray": cf_ray,
"status_code": status_code,
"response_time": response_time,
"timestamp": datetime.now().isoformat()
}
# 获取节点坐标
if cf_colo and cf_colo in COLO_COORDINATES:
coord = COLO_COORDINATES[cf_colo]
record["lat"] = coord["lat"]
record["lng"] = coord["lng"]
record["city"] = coord["city"]
record["country"] = coord["country"]
self.ip_records.append(record)
def get_records(self) -> List[Dict[str, Any]]:
"""获取所有记录"""
return self.ip_records
def clear(self):
"""清空记录"""
self.ip_records = []
def get_unique_colos(self) -> List[str]:
"""获取唯一的节点代码列表"""
colos = set()
for record in self.ip_records:
if record.get("cf_colo"):
colos.add(record["cf_colo"])
return list(colos)
# 全局收集器实例
_global_collector = IPMapCollector()
def get_collector() -> IPMapCollector:
"""获取全局 IP 收集器"""
return _global_collector
def add_ip_record(
url: str,
ip: Optional[str] = None,
cf_colo: Optional[str] = None,
cf_ray: Optional[str] = None,
status_code: Optional[int] = None,
response_time: Optional[float] = None
):
"""添加 IP 记录到全局收集器"""
_global_collector.add_record(
url=url,
ip=ip,
cf_colo=cf_colo,
cf_ray=cf_ray,
status_code=status_code,
response_time=response_time
)
def generate_map_html(
output_file: str = "cfspider_map.html",
title: str = "CFspider Proxy IP Map",
collector: Optional[IPMapCollector] = None
) -> str:
"""
生成 IP 地图 HTML 文件
Args:
output_file: 输出文件名
title: 页面标题
collector: IP 收集器(默认使用全局收集器)
Returns:
生成的文件路径
"""
if collector is None:
collector = _global_collector
records = collector.get_records()
# 过滤有坐标的记录
geo_records = [r for r in records if r.get("lat") and r.get("lng")]
# 生成 GeoJSON 数据
features = []
for i, record in enumerate(geo_records):
feature = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [record["lng"], record["lat"]]
},
"properties": {
"id": i,
"url": record.get("url", ""),
"ip": record.get("ip", "N/A"),
"cf_colo": record.get("cf_colo", "N/A"),
"cf_ray": record.get("cf_ray", "N/A"),
"city": record.get("city", "Unknown"),
"country": record.get("country", "Unknown"),
"status_code": record.get("status_code", 0),
"response_time": record.get("response_time", 0),
"timestamp": record.get("timestamp", "")
}
}
features.append(feature)
geojson = {
"type": "FeatureCollection",
"features": features
}
# 统计信息
total_requests = len(records)
geo_requests = len(geo_records)
unique_colos = collector.get_unique_colos()
html_content = f'''<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title}</title>
<script src="https://unpkg.com/maplibre-gl@3.6.2/dist/maplibre-gl.js"></script>
<link href="https://unpkg.com/maplibre-gl@3.6.2/dist/maplibre-gl.css" rel="stylesheet" />
<style>
:root {{
--bg-primary: #0a0a0f;
--bg-secondary: #12121a;
--neon-cyan: #00f5ff;
--neon-magenta: #ff2d95;
--neon-yellow: #f7f71c;
--neon-green: #50fa7b;
--text-primary: #ffffff;
--text-secondary: #888888;
}}
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
font-family: 'Segoe UI', 'Microsoft YaHei', sans-serif;
background: var(--bg-primary);
color: var(--text-primary);
}}
#map {{
position: absolute;
top: 0;
bottom: 0;
width: 100%;
}}
.info-panel {{
position: absolute;
top: 20px;
left: 20px;
background: rgba(10, 10, 15, 0.95);
border: 1px solid var(--neon-cyan);
border-radius: 12px;
padding: 20px;
min-width: 280px;
box-shadow: 0 0 30px rgba(0, 245, 255, 0.3);
z-index: 1000;
}}
.info-panel h1 {{
font-size: 1.4rem;
color: var(--neon-cyan);
margin-bottom: 15px;
text-shadow: 0 0 10px var(--neon-cyan);
}}
.stats {{
display: grid;
grid-template-columns: repeat(2, 1fr);
gap: 10px;
margin-bottom: 15px;
}}
.stat-item {{
background: rgba(0, 245, 255, 0.1);
border: 1px solid rgba(0, 245, 255, 0.3);
border-radius: 8px;
padding: 10px;
text-align: center;
}}
.stat-value {{
font-size: 1.5rem;
font-weight: bold;
color: var(--neon-cyan);
}}
.stat-label {{
font-size: 0.75rem;
color: var(--text-secondary);
margin-top: 5px;
}}
.colo-list {{
background: rgba(255, 255, 255, 0.05);
border-radius: 8px;
padding: 10px;
max-height: 150px;
overflow-y: auto;
}}
.colo-tag {{
display: inline-block;
background: rgba(255, 45, 149, 0.2);
border: 1px solid var(--neon-magenta);
color: var(--neon-magenta);
padding: 4px 8px;
border-radius: 4px;
font-size: 0.75rem;
margin: 2px;
}}
.maplibregl-popup-content {{
background: rgba(10, 10, 15, 0.95) !important;
border: 1px solid var(--neon-cyan) !important;
border-radius: 8px !important;
padding: 15px !important;
color: var(--text-primary) !important;
box-shadow: 0 0 20px rgba(0, 245, 255, 0.3) !important;
}}
.maplibregl-popup-close-button {{
color: var(--neon-cyan) !important;
font-size: 20px !important;
}}
.popup-title {{
color: var(--neon-cyan);
font-size: 1.1rem;
font-weight: bold;
margin-bottom: 10px;
}}
.popup-row {{
display: flex;
justify-content: space-between;
padding: 5px 0;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
}}
.popup-label {{
color: var(--text-secondary);
}}
.popup-value {{
color: var(--neon-green);
font-family: monospace;
}}
.footer {{
position: absolute;
bottom: 20px;
left: 20px;
background: rgba(10, 10, 15, 0.9);
border: 1px solid rgba(255, 255, 255, 0.2);
border-radius: 8px;
padding: 10px 15px;
font-size: 0.8rem;
color: var(--text-secondary);
z-index: 1000;
}}
.footer a {{
color: var(--neon-cyan);
text-decoration: none;
}}
</style>
</head>
<body>
<div id="map"></div>
<div class="info-panel">
<h1>CFspider IP Map</h1>
<div class="stats">
<div class="stat-item">
<div class="stat-value">{total_requests}</div>
<div class="stat-label">Total Requests</div>
</div>
<div class="stat-item">
<div class="stat-value">{len(unique_colos)}</div>
<div class="stat-label">Unique Nodes</div>
</div>
</div>
<div class="colo-list">
{"".join([f'<span class="colo-tag">{colo}</span>' for colo in unique_colos]) if unique_colos else '<span style="color: #888;">No data</span>'}
</div>
</div>
<div class="footer">
Generated by <a href="https://github.com/violettoolssite/CFspider" target="_blank">CFspider</a> |
{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
</div>
<script>
const geojsonData = {json.dumps(geojson)};
const map = new maplibregl.Map({{
container: 'map',
style: {{
version: 8,
sources: {{
'carto-dark': {{
type: 'raster',
tiles: [
'https://a.basemaps.cartocdn.com/dark_all/{{z}}/{{x}}/{{y}}@2x.png',
'https://b.basemaps.cartocdn.com/dark_all/{{z}}/{{x}}/{{y}}@2x.png',
'https://c.basemaps.cartocdn.com/dark_all/{{z}}/{{x}}/{{y}}@2x.png'
],
tileSize: 256
}}
}},
layers: [{{
id: 'carto-dark-layer',
type: 'raster',
source: 'carto-dark',
minzoom: 0,
maxzoom: 19
}}]
}},
center: [0, 20],
zoom: 1.5
}});
map.addControl(new maplibregl.NavigationControl());
map.on('load', function() {{
// 添加数据源
map.addSource('proxies', {{
type: 'geojson',
data: geojsonData
}});
// 添加圆点图层
map.addLayer({{
id: 'proxy-points',
type: 'circle',
source: 'proxies',
paint: {{
'circle-radius': 10,
'circle-color': '#00f5ff',
'circle-opacity': 0.8,
'circle-stroke-width': 2,
'circle-stroke-color': '#ff2d95'
}}
}});
// 添加发光效果
map.addLayer({{
id: 'proxy-points-glow',
type: 'circle',
source: 'proxies',
paint: {{
'circle-radius': 20,
'circle-color': '#00f5ff',
'circle-opacity': 0.2,
'circle-blur': 1
}}
}}, 'proxy-points');
// 点击事件
map.on('click', 'proxy-points', function(e) {{
const props = e.features[0].properties;
const coordinates = e.features[0].geometry.coordinates.slice();
const popupContent = `
<div class="popup-title">${{props.city}}, ${{props.country}}</div>
<div class="popup-row">
<span class="popup-label">Node:</span>
<span class="popup-value">${{props.cf_colo}}</span>
</div>
<div class="popup-row">
<span class="popup-label">IP:</span>
<span class="popup-value">${{props.ip}}</span>
</div>
<div class="popup-row">
<span class="popup-label">Status:</span>
<span class="popup-value">${{props.status_code}}</span>
</div>
<div class="popup-row">
<span class="popup-label">Time:</span>
<span class="popup-value">${{props.response_time ? props.response_time.toFixed(2) + 'ms' : 'N/A'}}</span>
</div>
<div class="popup-row" style="border: none;">
<span class="popup-label">URL:</span>
<span class="popup-value" style="font-size: 0.7rem; word-break: break-all;">${{props.url.substring(0, 40)}}...</span>
</div>
`;
new maplibregl.Popup()
.setLngLat(coordinates)
.setHTML(popupContent)
.addTo(map);
}});
// 鼠标样式
map.on('mouseenter', 'proxy-points', function() {{
map.getCanvas().style.cursor = 'pointer';
}});
map.on('mouseleave', 'proxy-points', function() {{
map.getCanvas().style.cursor = '';
}});
// 如果有数据,自动缩放到数据范围
if (geojsonData.features.length > 0) {{
const bounds = new maplibregl.LngLatBounds();
geojsonData.features.forEach(feature => {{
bounds.extend(feature.geometry.coordinates);
}});
map.fitBounds(bounds, {{ padding: 50 }});
}}
}});
</script>
</body>
</html>'''
# 写入文件
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html_content)
return output_file
def clear_records():
"""清空全局收集器的记录"""
_global_collector.clear()

682
cfspider/mirror.py Normal file
View File

@@ -0,0 +1,682 @@
"""
CFspider 网页镜像模块
将在线网页完整保存到本地,包括:
- HTML 页面(自动重写资源链接为相对路径)
- CSS 样式表(包括 @import 和 url() 引用)
- JavaScript 脚本
- 图片资源PNG, JPG, WebP, SVG 等)
- 字体文件WOFF, WOFF2, TTF 等)
- 其他资源favicon, 视频, 音频等)
特性:
- 使用浏览器渲染:确保获取 JavaScript 动态生成的内容
- 并发下载:多线程下载资源,速度更快
- 隐身模式:自动使用完整浏览器请求头,避免被 CDN 拦截
- 自动打开预览:下载完成后自动在浏览器中预览
使用方式:
>>> import cfspider
>>>
>>> # 基本用法
>>> result = cfspider.mirror("https://example.com")
>>> print(result.index_file) # ./mirror/index.html
>>>
>>> # 指定保存目录
>>> result = cfspider.mirror(
... "https://example.com",
... save_dir="./my_backup",
... open_browser=False
... )
>>>
>>> # 使用 VLESS 代理
>>> result = cfspider.mirror(
... "https://example.com",
... cf_proxies="vless://uuid@host:443?path=/"
... )
目录结构:
save_dir/
├── index.html # 主页面
└── assets/
├── css/ # CSS 文件
├── js/ # JavaScript 文件
├── images/ # 图片文件
├── fonts/ # 字体文件
└── other/ # 其他资源
"""
import os
import re
import hashlib
import webbrowser
from pathlib import Path
from urllib.parse import urljoin, urlparse, unquote
from dataclasses import dataclass, field
from typing import Optional, Dict, Set, List
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
from bs4 import BeautifulSoup
BS4_AVAILABLE = True
except ImportError:
BS4_AVAILABLE = False
@dataclass
class MirrorResult:
"""
网页镜像结果
包含镜像操作的所有结果信息。
Attributes:
index_file (str): 主 HTML 文件的完整路径
例如:"/home/user/mirror/index.html"
assets_dir (str): 资源目录的完整路径
例如:"/home/user/mirror/assets"
total_files (int): 下载的文件总数(包含 index.html
total_size (int): 所有文件的总大小(字节)
可用 total_size / 1024 转换为 KB
failed_urls (List[str]): 下载失败的 URL 列表
格式:["url: error_message", ...]
success (bool): 镜像是否成功
True 表示主页面已成功保存(部分资源失败不影响)
Example:
>>> result = cfspider.mirror("https://example.com")
>>> if result.success:
... print(f"保存到: {result.index_file}")
... print(f"文件数: {result.total_files}")
... print(f"大小: {result.total_size / 1024:.2f} KB")
... else:
... print(f"失败: {result.failed_urls}")
"""
index_file: str = "" # 主 HTML 文件路径
assets_dir: str = "" # 资源目录路径
total_files: int = 0 # 下载的文件总数
total_size: int = 0 # 总大小(字节)
failed_urls: List[str] = field(default_factory=list) # 下载失败的 URL
success: bool = True # 是否成功
class WebMirror:
"""
网页镜像器
完整下载网页及其所有资源,并重写链接为本地相对路径。
工作流程:
1. 使用 Playwright 浏览器渲染页面(获取 JS 动态内容)
2. 解析 HTML 提取所有资源 URL
3. 并发下载资源到本地
4. 处理 CSS 文件中的额外资源(@import, url()
5. 重写所有资源链接为相对路径
6. 保存最终的 HTML 文件
Attributes:
ASSET_TYPES (dict): 资源类型映射,用于分类保存文件
Example:
>>> mirrorer = WebMirror(max_workers=20)
>>> result = mirrorer.mirror("https://example.com", save_dir="./backup")
Note:
直接使用 cfspider.mirror() 函数更方便,该函数会自动创建 WebMirror 实例。
"""
# 资源类型映射,用于将资源分类保存到不同目录
ASSET_TYPES = {
'css': ['css'],
'js': ['js', 'mjs'],
'images': ['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg', 'ico', 'bmp', 'avif'],
'fonts': ['woff', 'woff2', 'ttf', 'otf', 'eot'],
'media': ['mp4', 'webm', 'mp3', 'ogg', 'wav'],
'other': []
}
def __init__(self, cf_proxies=None, vless_uuid=None, timeout=30, max_workers=10):
"""
初始化镜像器
Args:
cf_proxies (str, optional): 代理地址,支持以下格式:
- VLESS 链接:"vless://uuid@host:port?path=/..."
- HTTP 代理:"http://ip:port"
- SOCKS5 代理:"socks5://ip:port"
- 不填写:直接请求(无代理)
注意:浏览器渲染使用 VLESS 代理,资源下载使用直连
vless_uuid (str, optional): VLESS UUID
仅当 cf_proxies 是域名(非完整链接)时需要
timeout (int): 请求超时时间(秒),默认 30
适用于浏览器渲染和资源下载
max_workers (int): 并发下载线程数,默认 10
增大可加快下载速度,但可能被目标网站限制
Example:
>>> # 无代理
>>> mirrorer = WebMirror()
>>>
>>> # VLESS 代理
>>> mirrorer = WebMirror(cf_proxies="vless://uuid@host:443?path=/")
>>>
>>> # 高并发
>>> mirrorer = WebMirror(max_workers=20, timeout=60)
"""
self.cf_proxies = cf_proxies
self.vless_uuid = vless_uuid
self.timeout = timeout
self.max_workers = max_workers
self._browser = None
self._downloaded: Dict[str, str] = {} # URL -> 本地路径映射
self._failed: Set[str] = set()
def _get_browser(self):
"""获取浏览器实例"""
if self._browser is None:
from .browser import Browser
self._browser = Browser(
cf_proxies=self.cf_proxies,
headless=True,
timeout=self.timeout,
vless_uuid=self.vless_uuid
)
return self._browser
def _close_browser(self):
"""关闭浏览器"""
if self._browser:
try:
self._browser.close()
except:
pass
self._browser = None
def _get_asset_type(self, url: str) -> str:
"""根据 URL 判断资源类型"""
parsed = urlparse(url)
path = parsed.path.lower()
ext = path.rsplit('.', 1)[-1] if '.' in path else ''
for asset_type, extensions in self.ASSET_TYPES.items():
if ext in extensions:
return asset_type
return 'other'
def _generate_local_path(self, url: str, base_url: str, assets_dir: Path) -> str:
"""生成本地文件路径"""
parsed = urlparse(url)
path = unquote(parsed.path)
# 如果没有路径或是根路径,使用 hash
if not path or path == '/':
ext = '.html'
filename = hashlib.md5(url.encode()).hexdigest()[:12] + ext
else:
# 提取文件名
filename = path.rsplit('/', 1)[-1]
if not filename or '.' not in filename:
ext = self._guess_extension(url)
filename = hashlib.md5(url.encode()).hexdigest()[:12] + ext
# 确定资源类型目录
asset_type = self._get_asset_type(url)
# 生成安全的文件名
safe_filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
if len(safe_filename) > 100:
ext = safe_filename.rsplit('.', 1)[-1] if '.' in safe_filename else ''
safe_filename = hashlib.md5(filename.encode()).hexdigest()[:12]
if ext:
safe_filename += '.' + ext
return str(assets_dir / asset_type / safe_filename)
def _guess_extension(self, url: str) -> str:
"""根据 URL 猜测扩展名"""
url_lower = url.lower()
if 'css' in url_lower:
return '.css'
elif 'js' in url_lower or 'javascript' in url_lower:
return '.js'
elif any(ext in url_lower for ext in ['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg']):
for ext in ['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg']:
if ext in url_lower:
return '.' + ext
return '.bin'
def _extract_urls_from_html(self, html: str, base_url: str) -> Set[str]:
"""从 HTML 中提取资源 URL"""
if not BS4_AVAILABLE:
raise ImportError("需要安装 beautifulsoup4: pip install beautifulsoup4")
urls = set()
soup = BeautifulSoup(html, 'html.parser')
base_domain = urlparse(base_url).netloc
# 提取各种资源
# CSS 链接
for link in soup.find_all('link', rel='stylesheet'):
href = link.get('href')
if href:
urls.add(urljoin(base_url, href))
# 其他 link 标签favicon 等)
for link in soup.find_all('link'):
href = link.get('href')
if href and link.get('rel') not in [['stylesheet']]:
full_url = urljoin(base_url, href)
urls.add(full_url)
# JavaScript
for script in soup.find_all('script', src=True):
src = script.get('src')
if src:
urls.add(urljoin(base_url, src))
# 图片
for img in soup.find_all('img', src=True):
src = img.get('src')
if src and not src.startswith('data:'):
urls.add(urljoin(base_url, src))
# srcset
srcset = img.get('srcset')
if srcset:
for item in srcset.split(','):
url = item.strip().split()[0]
if url and not url.startswith('data:'):
urls.add(urljoin(base_url, url))
# 背景图和其他 style 属性
for elem in soup.find_all(style=True):
style = elem.get('style')
css_urls = self._extract_urls_from_css(style, base_url)
urls.update(css_urls)
# style 标签
for style_tag in soup.find_all('style'):
if style_tag.string:
css_urls = self._extract_urls_from_css(style_tag.string, base_url)
urls.update(css_urls)
# video/audio
for media in soup.find_all(['video', 'audio']):
src = media.get('src')
if src:
urls.add(urljoin(base_url, src))
poster = media.get('poster')
if poster:
urls.add(urljoin(base_url, poster))
for source in media.find_all('source'):
src = source.get('src')
if src:
urls.add(urljoin(base_url, src))
# 过滤只保留同域资源
filtered_urls = set()
for url in urls:
parsed = urlparse(url)
if parsed.netloc == base_domain or not parsed.netloc:
filtered_urls.add(url)
return filtered_urls
def _extract_urls_from_css(self, css_content: str, base_url: str) -> Set[str]:
"""从 CSS 中提取 url() 引用"""
urls = set()
pattern = r'url\(["\']?([^"\')\s]+)["\']?\)'
matches = re.findall(pattern, css_content)
for match in matches:
if not match.startswith('data:'):
urls.add(urljoin(base_url, match))
return urls
def _download_resource(self, url: str, local_path: str, referer: str = None) -> tuple:
"""下载单个资源(使用隐身模式避免反爬)"""
try:
from . import get
# 使用隐身模式自动添加完整的浏览器请求头
# 用户自定义的 Referer 会覆盖默认值
extra_headers = {}
if referer:
extra_headers['Referer'] = referer
# 检查是否是 VLESS 链接 - VLESS 不支持 HTTP 请求,只支持浏览器模式
# 如果是 VLESS 链接,资源下载时不使用代理
proxies_for_download = self.cf_proxies
if self.cf_proxies and str(self.cf_proxies).lower().startswith('vless://'):
proxies_for_download = None # VLESS 链接不支持 HTTP 请求,直接下载
response = get(
url,
cf_proxies=proxies_for_download,
timeout=self.timeout,
headers=extra_headers,
stealth=True, # 启用隐身模式,自动添加完整浏览器请求头
stealth_browser='chrome'
)
if response.status_code == 200:
content = response.content
# 检测是否下载到了错误页面HTML 而不是预期的资源)
content_type = response.headers.get('content-type', '').lower()
expected_type = self._get_asset_type(url)
# 如果期望的是 JS/CSS 但得到了 HTML可能是错误页面
if expected_type in ['js', 'css'] and 'text/html' in content_type:
# 检查是否是 nginx 默认页面或其他错误页面
content_str = content.decode('utf-8', errors='ignore')[:500]
if 'nginx' in content_str.lower() or '<!doctype html>' in content_str.lower():
return (url, None, 0, f"下载到错误页面(可能是 CDN 保护)")
# 创建目录
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 写入文件
with open(local_path, 'wb') as f:
f.write(content)
return (url, local_path, len(content), None)
else:
return (url, None, 0, f"HTTP {response.status_code}")
except Exception as e:
return (url, None, 0, str(e))
def _rewrite_html(self, html: str, base_url: str, url_mapping: Dict[str, str], save_dir: Path) -> str:
"""重写 HTML 中的资源链接"""
if not BS4_AVAILABLE:
return html
soup = BeautifulSoup(html, 'html.parser')
def get_relative_path(local_path: str) -> str:
"""获取相对于 index.html 的路径"""
try:
rel_path = os.path.relpath(local_path, save_dir)
return rel_path.replace('\\', '/')
except:
return local_path
def replace_url(url: str) -> Optional[str]:
"""替换 URL 为本地路径"""
full_url = urljoin(base_url, url)
if full_url in url_mapping:
return get_relative_path(url_mapping[full_url])
return None
# 替换 link href
for link in soup.find_all('link', href=True):
new_path = replace_url(link['href'])
if new_path:
link['href'] = new_path
# 替换 script src
for script in soup.find_all('script', src=True):
new_path = replace_url(script['src'])
if new_path:
script['src'] = new_path
# 替换 img src
for img in soup.find_all('img', src=True):
new_path = replace_url(img['src'])
if new_path:
img['src'] = new_path
# 替换 video/audio
for media in soup.find_all(['video', 'audio']):
if media.get('src'):
new_path = replace_url(media['src'])
if new_path:
media['src'] = new_path
if media.get('poster'):
new_path = replace_url(media['poster'])
if new_path:
media['poster'] = new_path
# 替换 style 标签中的 url()
for style_tag in soup.find_all('style'):
if style_tag.string:
new_css = self._rewrite_css(style_tag.string, base_url, url_mapping, save_dir)
style_tag.string = new_css
# 替换 style 属性中的 url()
for elem in soup.find_all(style=True):
style = elem.get('style')
new_style = self._rewrite_css(style, base_url, url_mapping, save_dir)
elem['style'] = new_style
return str(soup)
def _rewrite_css(self, css_content: str, base_url: str, url_mapping: Dict[str, str], save_dir: Path) -> str:
"""重写 CSS 中的 url() 引用"""
def replace_url(match):
url = match.group(1).strip('"\'')
full_url = urljoin(base_url, url)
if full_url in url_mapping:
local_path = url_mapping[full_url]
try:
rel_path = os.path.relpath(local_path, save_dir)
rel_path = rel_path.replace('\\', '/')
return f'url("{rel_path}")'
except:
pass
return match.group(0)
pattern = r'url\(["\']?([^"\')\s]+)["\']?\)'
return re.sub(pattern, replace_url, css_content)
def _process_css_file(self, css_path: str, css_url: str, base_url: str,
assets_dir: Path, url_mapping: Dict[str, str]) -> List[str]:
"""处理 CSS 文件,下载其中引用的资源"""
new_urls = []
try:
with open(css_path, 'r', encoding='utf-8', errors='ignore') as f:
css_content = f.read()
# 提取 CSS 中的 URL
css_base = css_url.rsplit('/', 1)[0] + '/'
urls_in_css = self._extract_urls_from_css(css_content, css_base)
for url in urls_in_css:
if url not in url_mapping and url not in self._failed:
new_urls.append(url)
except:
pass
return new_urls
def mirror(self, url: str, save_dir: str = "./mirror", open_browser: bool = True) -> MirrorResult:
"""
镜像网页到本地
Args:
url: 目标网页 URL
save_dir: 保存目录
open_browser: 是否自动打开浏览器预览
Returns:
MirrorResult: 镜像结果
"""
if not BS4_AVAILABLE:
raise ImportError("需要安装 beautifulsoup4: pip install beautifulsoup4")
result = MirrorResult()
save_path = Path(save_dir).resolve()
assets_path = save_path / "assets"
try:
# 创建目录
save_path.mkdir(parents=True, exist_ok=True)
assets_path.mkdir(exist_ok=True)
for asset_type in self.ASSET_TYPES.keys():
(assets_path / asset_type).mkdir(exist_ok=True)
# 使用浏览器渲染页面
print(f"[Mirror] 正在渲染页面: {url}")
browser = self._get_browser()
html = browser.html(url)
# 提取资源 URL
print("[Mirror] 正在提取资源链接...")
resource_urls = self._extract_urls_from_html(html, url)
print(f"[Mirror] 发现 {len(resource_urls)} 个资源")
# 并发下载资源
url_mapping: Dict[str, str] = {}
total_size = 0
if resource_urls:
print("[Mirror] 正在下载资源...")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {}
for res_url in resource_urls:
local_path = self._generate_local_path(res_url, url, assets_path)
futures[executor.submit(self._download_resource, res_url, local_path, url)] = res_url
completed = 0
for future in as_completed(futures):
res_url, local_path, size, error = future.result()
completed += 1
if local_path:
url_mapping[res_url] = local_path
total_size += size
self._downloaded[res_url] = local_path
else:
self._failed.add(res_url)
result.failed_urls.append(f"{res_url}: {error}")
# 进度显示
if completed % 10 == 0 or completed == len(futures):
print(f"[Mirror] 下载进度: {completed}/{len(futures)}")
# 处理 CSS 文件中的额外资源
css_files = [(path, u) for u, path in url_mapping.items()
if path.endswith('.css')]
additional_urls = set()
for css_path, css_url in css_files:
new_urls = self._process_css_file(css_path, css_url, url, assets_path, url_mapping)
additional_urls.update(new_urls)
# 下载 CSS 中发现的额外资源
if additional_urls:
print(f"[Mirror] 发现 CSS 中的 {len(additional_urls)} 个额外资源")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {}
for res_url in additional_urls:
local_path = self._generate_local_path(res_url, url, assets_path)
futures[executor.submit(self._download_resource, res_url, local_path, url)] = res_url
for future in as_completed(futures):
res_url, local_path, size, error = future.result()
if local_path:
url_mapping[res_url] = local_path
total_size += size
# 重写 HTML 链接
print("[Mirror] 正在重写资源链接...")
rewritten_html = self._rewrite_html(html, url, url_mapping, save_path)
# 重写 CSS 文件中的链接
for css_path, css_url in css_files:
try:
with open(css_path, 'r', encoding='utf-8', errors='ignore') as f:
css_content = f.read()
css_base = css_url.rsplit('/', 1)[0] + '/'
new_css = self._rewrite_css(css_content, css_base, url_mapping, save_path)
with open(css_path, 'w', encoding='utf-8') as f:
f.write(new_css)
except:
pass
# 保存 HTML
index_file = save_path / "index.html"
with open(index_file, 'w', encoding='utf-8') as f:
f.write(rewritten_html)
# 填充结果
result.index_file = str(index_file)
result.assets_dir = str(assets_path)
result.total_files = len(url_mapping) + 1 # +1 for index.html
result.total_size = total_size + len(rewritten_html.encode('utf-8'))
result.success = True
print(f"[Mirror] 镜像完成!")
print(f"[Mirror] 保存位置: {index_file}")
print(f"[Mirror] 总文件数: {result.total_files}")
print(f"[Mirror] 总大小: {result.total_size / 1024:.2f} KB")
if result.failed_urls:
print(f"[Mirror] 失败资源: {len(result.failed_urls)}")
# 自动打开浏览器
if open_browser:
print("[Mirror] 正在打开浏览器预览...")
webbrowser.open(f"file://{index_file}")
except Exception as e:
result.success = False
result.failed_urls.append(str(e))
print(f"[Mirror] 错误: {e}")
finally:
self._close_browser()
return result
def mirror(url: str, save_dir: str = "./mirror", open_browser: bool = True,
cf_proxies: str = None, vless_uuid: str = None,
timeout: int = 30, max_workers: int = 10) -> MirrorResult:
"""
镜像网页到本地
爬取网页及其所有资源CSS、JS、图片、字体等
保存到本地并自动打开浏览器预览。
Args:
url: 目标网页 URL
save_dir: 保存目录,默认 "./mirror"
open_browser: 是否自动打开浏览器预览,默认 True
cf_proxies: 代理地址,支持 VLESS 链接/HTTP/SOCKS5
vless_uuid: VLESS UUID仅域名方式需要
timeout: 请求超时时间(秒),默认 30
max_workers: 并发下载线程数,默认 10
Returns:
MirrorResult: 镜像结果,包含保存路径、文件数量等信息
Examples:
>>> import cfspider
>>>
>>> # 基本用法
>>> result = cfspider.mirror("https://example.com")
>>> print(result.index_file) # 保存的 HTML 路径
>>>
>>> # 指定保存目录
>>> result = cfspider.mirror(
... "https://example.com",
... save_dir="./my_mirror",
... open_browser=False
... )
>>>
>>> # 使用 VLESS 代理
>>> result = cfspider.mirror(
... "https://example.com",
... cf_proxies="vless://uuid@host:443?path=/"
... )
"""
mirrorer = WebMirror(
cf_proxies=cf_proxies,
vless_uuid=vless_uuid,
timeout=timeout,
max_workers=max_workers
)
return mirrorer.mirror(url, save_dir, open_browser)

143
cfspider/session.py Normal file
View File

@@ -0,0 +1,143 @@
"""
CFspider Session 模块
提供会话管理功能,在多个请求之间保持代理配置、请求头和 Cookie。
"""
from .api import request
class Session:
"""
CFspider 会话类
在多个请求之间保持相同的代理配置、请求头和 Cookie。
适合需要登录状态或连续请求的场景。
Attributes:
cf_proxies (str): Workers 代理地址
headers (dict): 会话级别的默认请求头
cookies (dict): 会话级别的 Cookie
Example:
>>> import cfspider
>>>
>>> # 创建会话
>>> with cfspider.Session(cf_proxies="https://your-workers.dev") as session:
... # 设置会话级别的请求头
... session.headers['Authorization'] = 'Bearer token'
...
... # 所有请求都会使用相同的代理和请求头
... response1 = session.get("https://api.example.com/user")
... response2 = session.post("https://api.example.com/data", json={"key": "value"})
...
... # Cookie 会自动保持
... print(session.cookies)
Note:
如果需要隐身模式的会话一致性(自动 Referer、随机延迟等
请使用 cfspider.StealthSession。
"""
def __init__(self, cf_proxies=None):
"""
初始化会话
Args:
cf_proxies (str): Workers 代理地址(必填)
例如:"https://your-workers.dev"
Raises:
ValueError: 当 cf_proxies 为空时
Example:
>>> session = cfspider.Session(cf_proxies="https://your-workers.dev")
"""
if not cf_proxies:
raise ValueError(
"cf_proxies 是必填参数。\n"
"请提供 CFspider Workers 地址,例如:\n"
" session = cfspider.Session(cf_proxies='https://your-workers.dev')\n\n"
"如果不需要代理,可以直接使用 cfspider.get() 等函数。\n"
"如果需要隐身模式会话,请使用 cfspider.StealthSession。"
)
self.cf_proxies = cf_proxies.rstrip("/")
self.headers = {}
self.cookies = {}
def request(self, method, url, **kwargs):
"""
发送 HTTP 请求
Args:
method (str): HTTP 方法GET, POST, PUT, DELETE 等)
url (str): 目标 URL
**kwargs: 其他参数,与 cfspider.request() 相同
Returns:
CFSpiderResponse: 响应对象
Note:
会话级别的 headers 和 cookies 会自动添加到请求中,
但请求级别的参数优先级更高。
"""
headers = self.headers.copy()
headers.update(kwargs.pop("headers", {}))
cookies = self.cookies.copy()
cookies.update(kwargs.pop("cookies", {}))
return request(
method,
url,
cf_proxies=self.cf_proxies,
headers=headers,
cookies=cookies,
**kwargs
)
def get(self, url, **kwargs):
"""发送 GET 请求"""
return self.request("GET", url, **kwargs)
def post(self, url, **kwargs):
"""发送 POST 请求"""
return self.request("POST", url, **kwargs)
def put(self, url, **kwargs):
"""发送 PUT 请求"""
return self.request("PUT", url, **kwargs)
def delete(self, url, **kwargs):
"""发送 DELETE 请求"""
return self.request("DELETE", url, **kwargs)
def head(self, url, **kwargs):
"""发送 HEAD 请求"""
return self.request("HEAD", url, **kwargs)
def options(self, url, **kwargs):
"""发送 OPTIONS 请求"""
return self.request("OPTIONS", url, **kwargs)
def patch(self, url, **kwargs):
"""发送 PATCH 请求"""
return self.request("PATCH", url, **kwargs)
def close(self):
"""
关闭会话
当前实现中,每个请求都是独立的,无需特殊清理。
保留此方法是为了与 requests.Session 保持接口兼容。
"""
pass
def __enter__(self):
"""支持上下文管理器with 语句)"""
return self
def __exit__(self, *args):
"""退出上下文时关闭会话"""
self.close()

530
cfspider/stealth.py Normal file
View File

@@ -0,0 +1,530 @@
"""
CFspider 隐身模式模块
提供完整的反爬虫规避能力,解决以下常见问题:
1. 请求头不完整或不真实
- 问题:缺少 User-Agent, Accept-Language, Sec-Fetch-* 等头
- 解决:自动添加 15+ 个真实浏览器请求头
2. 缺乏会话一致性
- 问题:频繁更换 IP、User-Agent不处理 Cookie
- 解决StealthSession 固定 User-Agent自动管理 Cookie
3. 行为模式单一
- 问题:只访问特定 API没有随机停留等行为
- 解决random_delay 随机延迟auto_referer 自动添加来源
使用方式:
方式一:单次请求启用隐身模式
>>> response = cfspider.get(url, stealth=True, stealth_browser='chrome')
方式二:使用 StealthSession 保持会话一致性
>>> with cfspider.StealthSession(browser='chrome', delay=(1, 3)) as session:
... response1 = session.get(url1) # 自动添加请求头
... response2 = session.get(url2) # 自动添加 Referer = url1
支持的浏览器:
- chrome: Chrome 131推荐15 个请求头)
- firefox: Firefox 13312 个请求头,含隐私保护头)
- safari: Safari 185 个请求头macOS 风格)
- edge: Edge 13114 个请求头)
- chrome_mobile: Chrome Mobile10 个请求头Android
"""
import random
import time
from typing import Optional, Dict, List, Tuple, Any
from urllib.parse import urlparse
# Chrome 131 完整请求头模板
CHROME_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Sec-CH-UA': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'Sec-CH-UA-Mobile': '?0',
'Sec-CH-UA-Platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'DNT': '1',
}
# Firefox 133 完整请求头模板
FIREFOX_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:133.0) Gecko/20100101 Firefox/133.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'Connection': 'keep-alive',
'DNT': '1',
'Sec-GPC': '1',
}
# Safari 18 完整请求头模板
SAFARI_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.0 Safari/605.1.15',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh-Hans;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
}
# Edge 131 完整请求头模板
EDGE_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Sec-CH-UA': '"Microsoft Edge";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'Sec-CH-UA-Mobile': '?0',
'Sec-CH-UA-Platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
}
# 移动端 Chrome 请求头
CHROME_MOBILE_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 14; Pixel 8 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Mobile Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Sec-CH-UA': '"Google Chrome";v="131", "Chromium";v="131"',
'Sec-CH-UA-Mobile': '?1',
'Sec-CH-UA-Platform': '"Android"',
'Upgrade-Insecure-Requests': '1',
}
# 浏览器配置集合
BROWSER_PROFILES = {
'chrome': CHROME_HEADERS,
'firefox': FIREFOX_HEADERS,
'safari': SAFARI_HEADERS,
'edge': EDGE_HEADERS,
'chrome_mobile': CHROME_MOBILE_HEADERS,
}
# 默认使用 Chrome
DEFAULT_BROWSER = 'chrome'
def get_stealth_headers(browser: str = 'chrome', custom_headers: Dict = None) -> Dict[str, str]:
"""
获取隐身模式请求头
Args:
browser: 浏览器类型 (chrome/firefox/safari/edge/chrome_mobile)
custom_headers: 自定义请求头(会覆盖默认值)
Returns:
完整的浏览器请求头字典
"""
headers = BROWSER_PROFILES.get(browser, CHROME_HEADERS).copy()
if custom_headers:
headers.update(custom_headers)
return headers
def get_random_browser_headers() -> Dict[str, str]:
"""随机选择一个浏览器的请求头"""
browser = random.choice(list(BROWSER_PROFILES.keys()))
return get_stealth_headers(browser)
def random_delay(min_sec: float = 0.5, max_sec: float = 2.0) -> float:
"""
随机延迟,模拟人类行为
Args:
min_sec: 最小延迟秒数
max_sec: 最大延迟秒数
Returns:
实际延迟的秒数
"""
delay = random.uniform(min_sec, max_sec)
time.sleep(delay)
return delay
def get_referer(current_url: str, previous_url: str = None) -> Optional[str]:
"""
生成 Referer 头
Args:
current_url: 当前请求的 URL
previous_url: 上一个访问的 URL
Returns:
Referer 值
"""
if previous_url:
return previous_url
# 如果没有上一个 URL使用当前 URL 的首页作为 Referer
parsed = urlparse(current_url)
return f"{parsed.scheme}://{parsed.netloc}/"
def update_sec_fetch_headers(headers: Dict, site_type: str = 'none') -> Dict:
"""
更新 Sec-Fetch-* 请求头
Args:
headers: 原始请求头
site_type: 网站类型 (none/same-origin/same-site/cross-site)
Returns:
更新后的请求头
"""
headers = headers.copy()
headers['Sec-Fetch-Site'] = site_type
if site_type == 'none':
# 直接访问如在地址栏输入URL
headers['Sec-Fetch-Mode'] = 'navigate'
headers['Sec-Fetch-Dest'] = 'document'
elif site_type in ('same-origin', 'same-site'):
# 站内跳转
headers['Sec-Fetch-Mode'] = 'navigate'
headers['Sec-Fetch-Dest'] = 'document'
else:
# 跨站跳转
headers['Sec-Fetch-Mode'] = 'navigate'
headers['Sec-Fetch-Dest'] = 'document'
return headers
class StealthSession:
"""
隐身会话类
提供完整的会话一致性管理,解决反爬虫检测的三大问题:
1. 固定 User-Agent整个会话使用同一个浏览器指纹
2. 自动管理 Cookie响应中的 Cookie 自动保存并在后续请求中发送
3. 自动添加 Referer页面跳转时自动添加来源信息
4. 随机延迟:每次请求前随机等待,模拟人类行为
5. 自动更新 Sec-Fetch-Site根据 Referer 判断同站/跨站访问
Attributes:
browser (str): 当前使用的浏览器类型
cf_proxies (str): 代理地址
delay (tuple): 随机延迟范围
auto_referer (bool): 是否自动添加 Referer
last_url (str): 上一次请求的 URL
request_count (int): 会话累计请求次数
Example:
>>> import cfspider
>>>
>>> # 基本用法
>>> with cfspider.StealthSession(browser='chrome') as session:
... # 第一次请求Sec-Fetch-Site: none
... r1 = session.get("https://example.com")
...
... # 第二次请求:自动添加 Referer: https://example.com
... # Sec-Fetch-Site: same-origin
... r2 = session.get("https://example.com/page2")
>>>
>>> # 带随机延迟
>>> with cfspider.StealthSession(delay=(1, 3)) as session:
... for url in urls:
... # 每次请求前随机等待 1-3 秒
... response = session.get(url)
>>>
>>> # 结合代理使用
>>> with cfspider.StealthSession(
... cf_proxies="https://your-workers.dev",
... browser='firefox',
... delay=(0.5, 2.0)
... ) as session:
... response = session.get("https://example.com")
... print(f"请求次数: {session.request_count}")
... print(f"当前 Cookie: {session.get_cookies()}")
Note:
StealthSession 与普通 Session 的区别:
- Session: 仅保持代理配置和基本请求头
- StealthSession: 完整的隐身模式包括浏览器指纹、Cookie 管理、
自动 Referer、随机延迟、Sec-Fetch-* 更新
"""
def __init__(
self,
browser: str = 'chrome',
cf_proxies: str = None,
cf_workers: bool = True,
delay: Tuple[float, float] = None,
auto_referer: bool = True,
**kwargs
):
"""
初始化隐身会话
Args:
browser (str): 浏览器类型,决定使用的 User-Agent 和请求头模板
- 'chrome': Chrome 131推荐最完整的请求头15 个)
- 'firefox': Firefox 133含 Sec-GPC 隐私头12 个)
- 'safari': Safari 18macOS 风格5 个)
- 'edge': Edge 131类似 Chrome14 个)
- 'chrome_mobile': Chrome MobileAndroid10 个)
cf_proxies (str, optional): 代理地址
- 不指定则直接请求目标 URL
- 指定 Workers 地址时配合 cf_workers=True
- 指定普通代理时配合 cf_workers=False
cf_workers (bool): 是否使用 CFspider Workers API默认 True
delay (tuple, optional): 请求间随机延迟范围(秒)
- 如 (1, 3) 表示每次请求前随机等待 1-3 秒
- 第一次请求不会延迟
- 用于避免请求频率过高被检测
auto_referer (bool): 是否自动添加 Referer默认 True
- True: 自动使用上一个 URL 作为 Referer
- False: 不自动添加(但可以手动指定)
**kwargs: 保留参数,用于未来扩展
Example:
>>> session = cfspider.StealthSession(
... browser='chrome',
... cf_proxies='https://your-workers.dev',
... delay=(1, 3),
... auto_referer=True
... )
"""
self.browser = browser
self.cf_proxies = cf_proxies
self.cf_workers = cf_workers
self.delay = delay
self.auto_referer = auto_referer
self.last_url = None
self.request_count = 0
self._extra_kwargs = kwargs
# 获取固定的浏览器请求头
self._base_headers = get_stealth_headers(browser)
# Cookie 管理
self._cookies = {}
def _prepare_headers(self, url: str, headers: Dict = None) -> Dict:
"""准备请求头"""
final_headers = self._base_headers.copy()
# 添加 Referer
if self.auto_referer and self.last_url:
parsed_current = urlparse(url)
parsed_last = urlparse(self.last_url)
if parsed_current.netloc == parsed_last.netloc:
# 同站跳转
final_headers['Referer'] = self.last_url
final_headers = update_sec_fetch_headers(final_headers, 'same-origin')
else:
# 跨站跳转
final_headers['Referer'] = self.last_url
final_headers = update_sec_fetch_headers(final_headers, 'cross-site')
# 合并自定义请求头
if headers:
final_headers.update(headers)
return final_headers
def _apply_delay(self):
"""应用请求延迟"""
if self.delay and self.request_count > 0:
random_delay(self.delay[0], self.delay[1])
def _update_cookies(self, response):
"""更新 Cookie"""
if hasattr(response, 'cookies'):
for cookie in response.cookies:
self._cookies[cookie.name] = cookie.value
def get(self, url: str, **kwargs) -> Any:
"""
发送 GET 请求
Args:
url: 目标 URL
**kwargs: 其他参数
Returns:
响应对象
"""
from .api import get as _get
self._apply_delay()
headers = self._prepare_headers(url, kwargs.pop('headers', None))
# 添加 Cookie
cookies = kwargs.pop('cookies', {})
cookies.update(self._cookies)
response = _get(
url,
cf_proxies=self.cf_proxies,
cf_workers=self.cf_workers,
headers=headers,
cookies=cookies,
**kwargs
)
self._update_cookies(response)
self.last_url = url
self.request_count += 1
return response
def post(self, url: str, **kwargs) -> Any:
"""发送 POST 请求"""
from .api import post as _post
self._apply_delay()
headers = self._prepare_headers(url, kwargs.pop('headers', None))
# POST 请求的特殊头
if 'json' in kwargs or 'data' in kwargs:
headers.setdefault('Content-Type', 'application/x-www-form-urlencoded')
cookies = kwargs.pop('cookies', {})
cookies.update(self._cookies)
response = _post(
url,
cf_proxies=self.cf_proxies,
cf_workers=self.cf_workers,
headers=headers,
cookies=cookies,
**kwargs
)
self._update_cookies(response)
self.last_url = url
self.request_count += 1
return response
def put(self, url: str, **kwargs) -> Any:
"""发送 PUT 请求"""
from .api import put as _put
self._apply_delay()
headers = self._prepare_headers(url, kwargs.pop('headers', None))
cookies = kwargs.pop('cookies', {})
cookies.update(self._cookies)
response = _put(
url,
cf_proxies=self.cf_proxies,
cf_workers=self.cf_workers,
headers=headers,
cookies=cookies,
**kwargs
)
self._update_cookies(response)
self.last_url = url
self.request_count += 1
return response
def delete(self, url: str, **kwargs) -> Any:
"""发送 DELETE 请求"""
from .api import delete as _delete
self._apply_delay()
headers = self._prepare_headers(url, kwargs.pop('headers', None))
cookies = kwargs.pop('cookies', {})
cookies.update(self._cookies)
response = _delete(
url,
cf_proxies=self.cf_proxies,
cf_workers=self.cf_workers,
headers=headers,
cookies=cookies,
**kwargs
)
self._update_cookies(response)
self.last_url = url
self.request_count += 1
return response
def head(self, url: str, **kwargs) -> Any:
"""发送 HEAD 请求"""
from .api import head as _head
self._apply_delay()
headers = self._prepare_headers(url, kwargs.pop('headers', None))
cookies = kwargs.pop('cookies', {})
cookies.update(self._cookies)
response = _head(
url,
cf_proxies=self.cf_proxies,
cf_workers=self.cf_workers,
headers=headers,
cookies=cookies,
**kwargs
)
self._update_cookies(response)
self.last_url = url
self.request_count += 1
return response
def get_cookies(self) -> Dict[str, str]:
"""获取当前会话的所有 Cookie"""
return self._cookies.copy()
def set_cookie(self, name: str, value: str):
"""设置 Cookie"""
self._cookies[name] = value
def clear_cookies(self):
"""清除所有 Cookie"""
self._cookies.clear()
def get_headers(self) -> Dict[str, str]:
"""获取当前会话的基础请求头"""
return self._base_headers.copy()
def close(self):
"""关闭会话"""
pass # 无需清理,每次请求都是独立的
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# 支持的浏览器列表
SUPPORTED_BROWSERS = list(BROWSER_PROFILES.keys())
def get_supported_browsers() -> List[str]:
"""获取支持的浏览器列表"""
return SUPPORTED_BROWSERS.copy()

572
cfspider/vless_client.py Normal file
View File

@@ -0,0 +1,572 @@
"""
CFspider 内置 VLESS 客户端
通过 WebSocket 连接 edgetunnel提供本地 HTTP 代理
"""
import socket
import struct
import threading
import ssl
import time
import uuid
from urllib.parse import urlparse
class VlessClient:
"""VLESS 协议客户端"""
def __init__(self, ws_url, vless_uuid=None):
"""
初始化 VLESS 客户端
Args:
ws_url: edgetunnel WebSocket 地址,如 "wss://v2.kami666.xyz"
vless_uuid: VLESS UUID如不提供则自动生成
"""
self.ws_url = ws_url
self.vless_uuid = vless_uuid or str(uuid.uuid4())
parsed = urlparse(ws_url)
self.host = parsed.hostname
self.port = parsed.port or (443 if parsed.scheme == 'wss' else 80)
self.path = parsed.path or '/'
self.use_ssl = parsed.scheme == 'wss'
def _create_vless_header(self, target_host, target_port):
"""创建 VLESS 请求头"""
# VLESS 协议版本
header = bytes([0])
# UUID (16 bytes)
uuid_bytes = uuid.UUID(self.vless_uuid).bytes
header += uuid_bytes
# 附加信息长度
header += bytes([0])
# 命令 (1 = TCP)
header += bytes([1])
# 目标端口
header += struct.pack('>H', target_port)
# 地址类型和地址
try:
# 尝试解析为 IPv4
socket.inet_aton(target_host)
header += bytes([1]) # IPv4
header += socket.inet_aton(target_host)
except socket.error:
try:
# 尝试解析为 IPv6
socket.inet_pton(socket.AF_INET6, target_host)
header += bytes([3]) # IPv6
header += socket.inet_pton(socket.AF_INET6, target_host)
except socket.error:
# 域名
header += bytes([2]) # 域名
domain_bytes = target_host.encode('utf-8')
header += bytes([len(domain_bytes)])
header += domain_bytes
return header
def _websocket_handshake(self, sock):
"""执行 WebSocket 握手"""
import base64
import hashlib
import os
# 生成随机 key
key = base64.b64encode(os.urandom(16)).decode('utf-8')
# 构建握手请求
request = (
f"GET {self.path} HTTP/1.1\r\n"
f"Host: {self.host}\r\n"
f"Upgrade: websocket\r\n"
f"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {key}\r\n"
f"Sec-WebSocket-Version: 13\r\n"
f"\r\n"
)
sock.sendall(request.encode('utf-8'))
# 读取响应
response = b''
while b'\r\n\r\n' not in response:
chunk = sock.recv(1024)
if not chunk:
raise Exception("WebSocket 握手失败")
response += chunk
if b'101' not in response:
raise Exception(f"WebSocket 握手失败: {response.decode('utf-8', errors='ignore')}")
return True
def _send_ws_frame(self, sock, data):
"""发送 WebSocket 帧"""
import os
# 构建帧头
frame = bytes([0x82]) # Binary frame, FIN=1
length = len(data)
if length <= 125:
frame += bytes([0x80 | length]) # Masked
elif length <= 65535:
frame += bytes([0x80 | 126])
frame += struct.pack('>H', length)
else:
frame += bytes([0x80 | 127])
frame += struct.pack('>Q', length)
# 掩码
mask = os.urandom(4)
frame += mask
# 掩码数据
masked_data = bytes([data[i] ^ mask[i % 4] for i in range(len(data))])
frame += masked_data
sock.sendall(frame)
def _recv_ws_frame(self, sock):
"""接收 WebSocket 帧"""
# 读取帧头
header = sock.recv(2)
if len(header) < 2:
return None
opcode = header[0] & 0x0F
masked = (header[1] & 0x80) != 0
length = header[1] & 0x7F
if length == 126:
length_bytes = sock.recv(2)
length = struct.unpack('>H', length_bytes)[0]
elif length == 127:
length_bytes = sock.recv(8)
length = struct.unpack('>Q', length_bytes)[0]
if masked:
mask = sock.recv(4)
# 读取数据
data = b''
while len(data) < length:
chunk = sock.recv(length - len(data))
if not chunk:
break
data += chunk
if masked:
data = bytes([data[i] ^ mask[i % 4] for i in range(len(data))])
# 处理关闭帧
if opcode == 0x08:
return None
return data
def connect(self, target_host, target_port):
"""
通过 VLESS 连接到目标
Returns:
VlessConnection: 可用于读写的连接对象
"""
# 创建连接
sock = socket.create_connection((self.host, self.port), timeout=30)
if self.use_ssl:
context = ssl.create_default_context()
sock = context.wrap_socket(sock, server_hostname=self.host)
# WebSocket 握手
self._websocket_handshake(sock)
# 创建 VLESS 头(稍后与第一个数据包一起发送)
vless_header = self._create_vless_header(target_host, target_port)
return VlessConnection(sock, self, vless_header)
class VlessConnection:
"""VLESS 连接封装"""
def __init__(self, sock, client, vless_header=None):
self.sock = sock
self.client = client
self.buffer = b''
self.first_response = True
self.vless_header = vless_header # 第一次发送时需要带上
self.first_send = True
def send(self, data):
"""发送数据"""
if self.first_send and self.vless_header:
# 第一次发送时,将 VLESS 头和数据一起发送
self.client._send_ws_frame(self.sock, self.vless_header + data)
self.first_send = False
else:
self.client._send_ws_frame(self.sock, data)
def recv(self, size):
"""接收数据"""
# 如果缓冲区不够,尝试接收更多数据
if len(self.buffer) < size:
try:
frame = self.client._recv_ws_frame(self.sock)
if frame:
# 第一个响应需要跳过 VLESS 响应头
if self.first_response and len(frame) >= 2:
addon_len = frame[1] if len(frame) > 1 else 0
frame = frame[2 + addon_len:]
self.first_response = False
self.buffer += frame
except:
pass
result = self.buffer[:size]
self.buffer = self.buffer[size:]
return result
def recv_all(self):
"""接收所有可用数据"""
try:
self.sock.setblocking(False)
while True:
try:
frame = self.client._recv_ws_frame(self.sock)
if frame is None:
break
if self.first_response and len(frame) >= 2:
addon_len = frame[1] if len(frame) > 1 else 0
frame = frame[2 + addon_len:]
self.first_response = False
self.buffer += frame
except (BlockingIOError, ssl.SSLWantReadError):
break
finally:
self.sock.setblocking(True)
result = self.buffer
self.buffer = b''
return result
def close(self):
"""关闭连接"""
try:
self.sock.close()
except:
pass
class LocalVlessProxy:
"""本地 VLESS HTTP 代理服务器"""
def __init__(self, ws_url, vless_uuid=None):
"""
初始化本地代理
Args:
ws_url: edgetunnel WebSocket 地址
vless_uuid: VLESS UUID
"""
self.ws_url = ws_url
self.vless_uuid = vless_uuid
self.server = None
self.thread = None
self.port = None
self.running = False
def start(self):
"""启动代理服务器"""
# 找可用端口
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(('127.0.0.1', 0))
self.port = self.server.getsockname()[1]
self.server.listen(10)
self.running = True
self.thread = threading.Thread(target=self._serve, daemon=True)
self.thread.start()
# 等待服务器就绪
time.sleep(0.1)
return self.port
def _serve(self):
"""服务循环"""
self.server.settimeout(1)
while self.running:
try:
client, addr = self.server.accept()
handler = threading.Thread(
target=self._handle_client,
args=(client,),
daemon=True
)
handler.start()
except socket.timeout:
continue
except:
break
def _handle_client(self, client):
"""处理客户端连接"""
try:
client.settimeout(30)
# 读取请求
request = b''
while b'\r\n\r\n' not in request:
chunk = client.recv(4096)
if not chunk:
return
request += chunk
# 解析请求
lines = request.split(b'\r\n')
first_line = lines[0].decode('utf-8')
parts = first_line.split(' ')
if len(parts) < 2:
return
method = parts[0]
if method == 'CONNECT':
# HTTPS 代理
target = parts[1]
if ':' in target:
host, port = target.rsplit(':', 1)
port = int(port)
else:
host = target
port = 443
self._handle_connect(client, host, port)
else:
# HTTP 代理
url = parts[1]
self._handle_http(client, method, url, request)
except Exception as e:
pass
finally:
try:
client.close()
except:
pass
def _handle_connect(self, client, host, port):
"""处理 HTTPS CONNECT 请求"""
try:
# 连接到 VLESS
vless = VlessClient(self.ws_url, self.vless_uuid)
conn = vless.connect(host, port)
# 发送连接成功
client.sendall(b'HTTP/1.1 200 Connection Established\r\n\r\n')
# 双向转发(使用线程)
self._relay_bidirectional(client, conn)
except Exception as e:
try:
client.sendall(b'HTTP/1.1 502 Bad Gateway\r\n\r\n')
except:
pass
def _handle_http(self, client, method, url, original_request):
"""处理 HTTP 请求"""
try:
parsed = urlparse(url)
host = parsed.hostname
port = parsed.port or 80
path = parsed.path or '/'
if parsed.query:
path += '?' + parsed.query
# 连接到 VLESS
vless = VlessClient(self.ws_url, self.vless_uuid)
conn = vless.connect(host, port)
# 重建请求
lines = original_request.split(b'\r\n')
lines[0] = f'{method} {path} HTTP/1.1'.encode('utf-8')
# 更新 Host 头
new_lines = [lines[0]]
has_host = False
for line in lines[1:]:
if line.lower().startswith(b'host:'):
new_lines.append(f'Host: {host}'.encode('utf-8'))
has_host = True
elif line.lower().startswith(b'proxy-'):
continue
else:
new_lines.append(line)
if not has_host:
new_lines.insert(1, f'Host: {host}'.encode('utf-8'))
request = b'\r\n'.join(new_lines)
conn.send(request)
# 读取响应并转发
self._relay_response(client, conn)
except Exception as e:
client.sendall(b'HTTP/1.1 502 Bad Gateway\r\n\r\n')
def _relay_bidirectional(self, client, conn):
"""双向数据转发(使用线程)"""
import threading
stop_event = threading.Event()
def client_to_vless():
try:
while not stop_event.is_set():
try:
client.settimeout(1)
data = client.recv(8192)
if data:
conn.send(data)
else:
break
except socket.timeout:
continue
except:
break
finally:
stop_event.set()
def vless_to_client():
try:
while not stop_event.is_set():
try:
conn.sock.settimeout(1)
frame = self._recv_ws_frame_safe(conn)
if frame:
client.sendall(frame)
elif frame is None:
break
except socket.timeout:
continue
except:
break
finally:
stop_event.set()
t1 = threading.Thread(target=client_to_vless, daemon=True)
t2 = threading.Thread(target=vless_to_client, daemon=True)
t1.start()
t2.start()
# 等待任一方向结束
while not stop_event.is_set():
time.sleep(0.1)
conn.close()
def _recv_ws_frame_safe(self, conn):
"""安全地接收 WebSocket 帧"""
try:
sock = conn.sock
header = sock.recv(2)
if len(header) < 2:
return None
opcode = header[0] & 0x0F
masked = (header[1] & 0x80) != 0
length = header[1] & 0x7F
if length == 126:
length_bytes = sock.recv(2)
length = struct.unpack('>H', length_bytes)[0]
elif length == 127:
length_bytes = sock.recv(8)
length = struct.unpack('>Q', length_bytes)[0]
if masked:
mask = sock.recv(4)
data = b''
while len(data) < length:
chunk = sock.recv(min(length - len(data), 8192))
if not chunk:
break
data += chunk
if masked:
data = bytes([data[i] ^ mask[i % 4] for i in range(len(data))])
if opcode == 0x08:
return None
# 跳过 VLESS 响应头
if conn.first_response and len(data) >= 2:
addon_len = data[1]
data = data[2 + addon_len:]
conn.first_response = False
return data
except:
return b''
def _relay_response(self, client, conn):
"""转发 HTTP 响应"""
try:
# 读取响应
response = b''
while True:
data = conn.recv(8192)
if not data:
break
response += data
client.sendall(data)
# 检查是否完成
if b'\r\n\r\n' in response:
# 简单处理:继续读取直到没有数据
conn.sock.settimeout(0.5)
try:
while True:
data = conn.recv(8192)
if not data:
break
client.sendall(data)
except socket.timeout:
pass
break
finally:
conn.close()
def stop(self):
"""停止代理服务器"""
self.running = False
if self.server:
try:
self.server.close()
except:
pass
self.server = None
self.thread = None
self.port = None
@property
def proxy_url(self):
"""获取代理 URL"""
if self.port:
return f"http://127.0.0.1:{self.port}"
return None

46
pyproject.toml Normal file
View File

@@ -0,0 +1,46 @@
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "cfspider"
version = "1.7.3"
description = "Cloudflare Workers proxy IP pool client with stealth mode, anti-detection, async, HTTP/2, TLS fingerprint, browser, mirror and IP map"
readme = "README.md"
license = {text = "Apache-2.0"}
requires-python = ">=3.8"
authors = [
{name = "violettools", email = "violet@violetteam.cloud"}
]
keywords = ["cloudflare", "workers", "proxy", "ip", "pool", "crawler", "spider", "browser", "playwright", "vless", "httpx", "http2", "async", "curl_cffi", "tls", "fingerprint", "impersonate", "map", "visualization", "maplibre", "mirror", "offline", "beautifulsoup", "stealth", "anti-detection", "anti-bot"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Internet :: WWW/HTTP",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"requests>=2.20.0",
"playwright>=1.40.0",
"httpx[http2]>=0.25.0",
"curl_cffi>=0.5.0",
"beautifulsoup4>=4.9.0",
]
[project.scripts]
cfspider = "cfspider.cli:main"
[project.urls]
Homepage = "https://spider.violetteam.cloud"
Repository = "https://github.com/violettoolssite/CFspider"
Documentation = "https://spider.violetteam.cloud"
Issues = "https://github.com/violettoolssite/CFspider/issues"

50
setup.py Normal file
View File

@@ -0,0 +1,50 @@
from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
setup(
name="cfspider",
version="1.7.3",
author="violettools",
author_email="violet@violetteam.cloud",
description="Cloudflare Workers proxy IP pool client with stealth mode, anti-detection, async, HTTP/2, TLS fingerprint, browser, mirror and IP map",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/violettoolssite/CFspider",
project_urls={
"Bug Tracker": "https://github.com/violettoolssite/CFspider/issues",
"Documentation": "https://spider.violetteam.cloud",
},
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Internet :: WWW/HTTP",
"Topic :: Software Development :: Libraries :: Python Modules",
],
packages=find_packages(),
python_requires=">=3.8",
install_requires=[
"requests>=2.20.0",
"playwright>=1.40.0",
"httpx[http2]>=0.25.0",
"curl_cffi>=0.5.0",
"beautifulsoup4>=4.9.0",
],
entry_points={
"console_scripts": [
"cfspider=cfspider.cli:main",
],
},
keywords="cloudflare workers proxy ip pool crawler spider browser playwright httpx http2 async curl_cffi tls fingerprint impersonate map visualization maplibre mirror offline stealth anti-detection anti-bot",
)