v1.8.0: 添加数据提取和批量请求功能

新增功能:
- extract.py: CSS/XPath/JSONPath 数据提取
- export.py: JSON/CSV/Excel/SQLite 导出
- batch.py: 批量请求、并发控制、进度条
- Response 集成 find/pick/save 方法
- CLI 添加 get/post/batch 命令
- 可选依赖组 [extract][xpath][excel][all]

更新文档和版本号
This commit is contained in:
test01
2026-01-08 23:14:52 +08:00
parent 9bd209b6de
commit 7ffe7b333f
13 changed files with 3191 additions and 92 deletions

215
README.md
View File

@@ -1102,6 +1102,221 @@ save_dir/
- **浏览器渲染**:使用 Playwright 渲染 JavaScript 动态页面
- **自动预览**:下载完成后自动打开浏览器预览
## 数据提取
CFspider 1.8.0 新增了强大的数据提取功能,支持 CSS 选择器、XPath、JSONPath让网页数据提取变得简单直观。
### 安装数据提取依赖(可选)
```bash
# 安装完整数据提取功能
pip install cfspider[extract]
# 或单独安装某个功能
pip install cfspider[xpath] # XPath 支持
pip install cfspider[excel] # Excel 导出
```
### 基本用法
```python
import cfspider
# 发送请求
response = cfspider.get("https://example.com")
# CSS 选择器提取
title = response.find("h1") # 第一个 h1 的文本
links = response.find_all("a", attr="href") # 所有链接
# XPath 提取(需要安装 lxml
price = response.xpath("//span[@class='price']/text()")
# JSONPath 提取(用于 JSON API
response = cfspider.get("https://api.example.com/data")
names = response.jpath_all("$.data[*].name")
```
### 批量字段提取
使用 `pick()` 方法一次提取多个字段:
```python
# 提取多个字段
data = response.pick(
title="h1", # CSS 选择器提取文本
author=".author-name", # CSS 选择器
links=("a", "href"), # 元组形式提取属性
price=(".price", "text", float), # 带类型转换
)
# data 是一个 ExtractResult 字典
print(data["title"]) # 标题文本
print(data["links"]) # 所有链接
# 直接保存到文件
data.save("output.csv") # 保存为 CSV
data.save("output.json") # 保存为 JSON
data.save("output.xlsx") # 保存为 Excel需要 openpyxl
```
### 链式 Element 操作
```python
# 获取 Element 对象进行链式操作
product = response.css_one(".product-card")
# 在元素内部继续查找
name = product.find("h2")
price = product.find(".price")
image = product.find("img", attr="src")
# 获取元素属性
print(product["id"]) # 获取 id 属性
print(product.text) # 获取文本内容
print(product.html) # 获取 HTML 内容
```
### 自动识别选择器类型
`find()` 方法会自动识别选择器类型:
```python
# 自动识别选择器类型
response.find("h1") # CSS默认
response.find("//h1/text()") # XPath以 // 开头)
response.find("$.data.title") # JSONPath以 $ 开头)
```
## 批量请求
CFspider 支持批量请求多个 URL自动处理并发控制、重试和进度显示。
### 基本用法
```python
import cfspider
# 批量请求多个 URL
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
]
results = cfspider.batch(urls, concurrency=5)
# 遍历结果
for item in results:
if item.success:
print(f"{item.url}: {item.response.status_code}")
else:
print(f"{item.url}: Error - {item.error}")
```
### 带数据提取的批量请求
```python
# 批量请求并提取数据
results = cfspider.batch(
urls,
pick={"title": "h1", "price": ".price"}, # 每个页面提取的字段
concurrency=10, # 并发数
delay=0.5, # 请求间隔(秒)
retry=2, # 失败重试次数
progress=True, # 显示进度条
)
# 保存结果
results.save("output.csv")
# 查看摘要
print(results.summary())
# {'total': 100, 'successful': 98, 'failed': 2, 'success_rate': '98.0%', ...}
```
### 从文件读取 URL
```python
# 从文件读取 URL每行一个
results = cfspider.batch(
"urls.txt", # 文件路径
pick={"title": "h1"},
concurrency=5,
)
```
### 异步批量请求
```python
import asyncio
import cfspider
async def main():
results = await cfspider.abatch(
urls,
pick={"title": "h1"},
concurrency=20, # 异步可以更高的并发
)
results.save("output.json")
asyncio.run(main())
```
## 命令行工具
CFspider 提供了完整的命令行工具:
```bash
# GET 请求
cfspider get https://httpbin.org/ip
# 使用代理
cfspider get https://example.com --proxy https://your.workers.dev
# 数据提取
cfspider get https://example.com --pick "title:h1" "links:a@href" -o data.csv
# POST 请求
cfspider post https://api.example.com -d '{"key": "value"}'
# 批量请求
cfspider batch url1 url2 url3 --pick "title:h1" -o results.csv
# 从文件批量请求
cfspider batch urls.txt -c 10 --pick "title:h1" -o results.json
# VPN 模式
cfspider vpn start --workers-url https://your.workers.dev --port 1080
# 安装浏览器
cfspider install
```
### 命令行选项
```bash
cfspider get/post <url> [options]
选项:
-H, --header HEADER 请求头 ("User-Agent: xxx")
--proxy URL Workers 代理地址
--token TOKEN 鉴权 token
--impersonate BROWSER TLS 指纹模拟 (如 chrome131)
--stealth 启用隐身模式
--pick RULE 数据提取规则 ("title:h1")
-o, --output FILE 输出文件
-v, --verbose 显示详细信息
cfspider batch <urls...> [options]
选项:
-c, --concurrency N 并发数 (默认 5)
--delay N 请求间隔(秒)
--retry N 失败重试次数
-q, --quiet 安静模式
```
## 浏览器模式
CFspider 支持浏览器模式,可以渲染 JavaScript 动态页面、截图、生成 PDF、自动化操作等。

View File

@@ -39,7 +39,7 @@ CFspider - Cloudflare 代理 IP 池 Python 库
版本信息:
- 版本号: 1.7.0
- 协议: Apache License 2.0
- 文档: https://spider.violetteam.cloud
- 文档: https://www.cfspider.com
依赖关系:
必需requests
@@ -67,6 +67,12 @@ from .ip_map import (
# 网页镜像
from .mirror import mirror, MirrorResult, WebMirror
# 批量请求
from .batch import batch, abatch, BatchResult, BatchItem
# 数据导出
from .export import export
# 异步 API基于 httpx
from .async_api import (
aget, apost, aput, adelete, ahead, aoptions, apatch,
@@ -199,7 +205,7 @@ class PlaywrightNotInstalledError(CFSpiderError):
pass
__version__ = "1.7.3"
__version__ = "1.8.0"
__all__ = [
# 同步 API (requests)
"get", "post", "put", "delete", "head", "options", "patch", "request",
@@ -226,5 +232,9 @@ __all__ = [
"get_ip_collector", "clear_ip_records", "COLO_COORDINATES",
"clear_map_records", "get_map_collector",
# 网页镜像
"mirror", "MirrorResult", "WebMirror"
"mirror", "MirrorResult", "WebMirror",
# 批量请求
"batch", "abatch", "BatchResult", "BatchItem",
# 数据导出
"export",
]

View File

@@ -163,74 +163,334 @@ class CFSpiderResponse:
"""
self._response.raise_for_status()
# ========== 数据提取方法 ==========
def _get_extractor(self):
"""获取数据提取器(延迟初始化)"""
if not hasattr(self, '_extractor') or self._extractor is None:
from .extract import Extractor
content_type = "json" if self._is_json_response() else "html"
self._extractor = Extractor(self.text, content_type)
return self._extractor
def _is_json_response(self) -> bool:
"""判断是否是 JSON 响应"""
content_type = self.headers.get("content-type", "")
return "application/json" in content_type.lower()
def find(self, selector: str, attr: str = None, strip: bool = True,
regex: str = None, parser=None):
"""
查找第一个匹配的元素(最简单的 API
自动识别选择器类型:
- 以 $ 开头JSONPath
- 以 // 开头XPath
- 其他CSS 选择器
Args:
selector: 选择器CSS/XPath/JSONPath
attr: 要提取的属性名
strip: 是否去除空白
regex: 正则表达式提取
parser: 自定义解析函数
Returns:
匹配的文本或属性值
Example:
>>> response.find("h1") # CSS
>>> response.find("//h1/text()") # XPath
>>> response.find("$.title") # JSONPath
"""
return self._get_extractor().find(selector, attr=attr, strip=strip,
regex=regex, parser=parser)
def find_all(self, selector: str, attr: str = None, strip: bool = True):
"""
查找所有匹配的元素
Args:
selector: 选择器CSS/XPath/JSONPath
attr: 要提取的属性名
strip: 是否去除空白
Returns:
匹配的文本或属性值列表
"""
return self._get_extractor().find_all(selector, attr=attr, strip=strip)
def css(self, selector: str, attr: str = None, html: bool = False, strip: bool = True):
"""
使用 CSS 选择器提取第一个匹配元素
Args:
selector: CSS 选择器
attr: 要提取的属性名
html: 是否返回 HTML 而非文本
strip: 是否去除空白
Returns:
匹配元素的文本、属性或 HTML
"""
return self._get_extractor().css(selector, attr=attr, html=html, strip=strip)
def css_all(self, selector: str, attr: str = None, html: bool = False, strip: bool = True):
"""
使用 CSS 选择器提取所有匹配元素
Args:
selector: CSS 选择器
attr: 要提取的属性名
html: 是否返回 HTML 而非文本
strip: 是否去除空白
Returns:
匹配元素的文本、属性或 HTML 列表
"""
return self._get_extractor().css_all(selector, attr=attr, html=html, strip=strip)
def css_one(self, selector: str):
"""
返回第一个匹配的 Element 对象,支持链式操作
Args:
selector: CSS 选择器
Returns:
Element 对象
"""
return self._get_extractor().css_one(selector)
def xpath(self, expression: str):
"""
使用 XPath 表达式提取第一个匹配
Args:
expression: XPath 表达式
Returns:
匹配的文本或属性值
"""
return self._get_extractor().xpath(expression)
def xpath_all(self, expression: str):
"""
使用 XPath 表达式提取所有匹配
Args:
expression: XPath 表达式
Returns:
匹配的文本或属性值列表
"""
return self._get_extractor().xpath_all(expression)
def xpath_one(self, expression: str):
"""
返回第一个匹配的 Element 对象
Args:
expression: XPath 表达式
Returns:
Element 对象
"""
return self._get_extractor().xpath_one(expression)
def jpath(self, expression: str):
"""
使用 JSONPath 表达式提取第一个匹配
Args:
expression: JSONPath 表达式(如 $.data.items[0].name
Returns:
匹配的值
"""
return self._get_extractor().jpath(expression)
def jpath_all(self, expression: str):
"""
使用 JSONPath 表达式提取所有匹配
Args:
expression: JSONPath 表达式
Returns:
匹配的值列表
"""
return self._get_extractor().jpath_all(expression)
def pick(self, **fields):
"""
批量提取多个字段
Args:
**fields: 字段名=选择器 的映射
- 字符串CSS 选择器,提取文本
- 元组 (selector, attr):提取属性
- 元组 (selector, attr, converter):提取并转换
Returns:
ExtractResult 字典,支持直接保存
Example:
>>> data = response.pick(
... title="h1",
... links=("a", "href"),
... price=(".price", "text", float),
... )
>>> data.save("output.csv")
"""
result = self._get_extractor().pick(**fields)
result.url = str(self.url)
return result
def extract(self, rules: dict):
"""
使用规则字典提取数据(支持前缀指定类型)
Args:
rules: 字段名到选择器的映射
选择器可以带前缀指定类型:
- "css:h1.title" 或直接 "h1.title"
- "xpath://a/@href"
- "jsonpath:$.data.name"
Returns:
ExtractResult 字典
"""
result = self._get_extractor().extract(rules)
result.url = str(self.url)
return result
def save(self, filepath: str, encoding: str = "utf-8"):
"""
保存响应内容到文件
Args:
filepath: 输出文件路径
encoding: 文件编码(仅用于文本内容)
Returns:
输出文件的绝对路径
"""
from .export import save_response
return save_response(self.content, filepath, encoding=encoding)
def request(method, url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, token=None, **kwargs):
"""
发送 HTTP 请求
发送 HTTP 请求 / Send HTTP request
这是 CFspider 的核心函数,支持多种代理模式和反爬虫功能。
This is the core function of CFspider, supporting multiple proxy modes and anti-crawler features.
Args:
method (str): HTTP 方法GET, POST, PUT, DELETE, HEAD, OPTIONS, PATCH
/ HTTP method (GET, POST, PUT, DELETE, HEAD, OPTIONS, PATCH)
url (str): 目标 URL必须包含协议https://
cf_proxies (str, optional): 代理地址,根据 cf_workers 参数有不同含义:
/ Target URL (must include protocol, e.g., https://)
cf_proxies (str, optional): 代理地址,根据 cf_workers 参数有不同含义
/ Proxy address, meaning depends on cf_workers parameter
- 当 cf_workers=True 时:填写 CFspider Workers 地址(如 "https://your-workers.dev"
- When cf_workers=True: CFspider Workers address (e.g., "https://your-workers.dev")
- 当 cf_workers=False 时:填写普通 HTTP/SOCKS5 代理(如 "http://127.0.0.1:8080"
- When cf_workers=False: Regular HTTP/SOCKS5 proxy (e.g., "http://127.0.0.1:8080")
- 不填写时:直接请求目标 URL不使用代理
- None: Direct request without proxy
cf_workers (bool): 是否使用 CFspider Workers API默认 True
/ Whether to use CFspider Workers API (default: True)
- True: cf_proxies 是 Workers 地址,请求通过 Workers API 转发
- True: cf_proxies is Workers address, requests forwarded via Workers API
- False: cf_proxies 是普通代理,使用 requests/httpx 的 proxies 参数
- False: cf_proxies is regular proxy, uses requests/httpx proxies parameter
http2 (bool): 是否启用 HTTP/2 协议(默认 False
/ Whether to enable HTTP/2 protocol (default: False)
- True: 使用 httpx 客户端,支持 HTTP/2
- True: Uses httpx client with HTTP/2 support
- False: 使用 requests 库(默认行为)
- False: Uses requests library (default behavior)
- 注意http2 和 impersonate 不能同时使用
- Note: http2 and impersonate cannot be used together
impersonate (str, optional): TLS 指纹模拟,模拟真实浏览器的 TLS 握手特征
/ TLS fingerprint impersonation, mimics real browser TLS handshake
- 可选值chrome131, chrome124, safari18_0, firefox133, edge101 等
- Options: chrome131, chrome124, safari18_0, firefox133, edge101, etc.
- 设置后自动使用 curl_cffi 发送请求
- Automatically uses curl_cffi when set
- 完整列表cfspider.get_supported_browsers()
- Full list: cfspider.get_supported_browsers()
map_output (bool): 是否生成 IP 地图 HTML 文件(默认 False
/ Whether to generate IP map HTML file (default: False)
- True: 请求完成后生成包含代理 IP 信息的交互式地图
- True: Generates interactive map with proxy IP information after request
map_file (str): 地图输出文件名(默认 "cfspider_map.html"
/ Map output filename (default: "cfspider_map.html")
stealth (bool): 是否启用隐身模式(默认 False
/ Whether to enable stealth mode (default: False)
- True: 自动添加 15+ 个完整浏览器请求头,模拟真实浏览器访问
- True: Automatically adds 15+ complete browser headers, mimics real browser
- 添加的请求头包括User-Agent, Accept, Accept-Language, Sec-Fetch-*, Sec-CH-UA 等
- Headers include: User-Agent, Accept, Accept-Language, Sec-Fetch-*, Sec-CH-UA, etc.
stealth_browser (str): 隐身模式使用的浏览器类型(默认 'chrome'
/ Stealth mode browser type (default: 'chrome')
- 可选值chrome, firefox, safari, edge, chrome_mobile
- Options: chrome, firefox, safari, edge, chrome_mobile
delay (tuple, optional): 请求前的随机延迟范围(秒)
/ Random delay range before request (seconds)
- 如 (1, 3) 表示请求前随机等待 1-3 秒
- e.g., (1, 3) means random wait 1-3 seconds before request
- 用于模拟人类行为,避免被反爬系统检测
- Used to simulate human behavior, avoid anti-crawler detection
token (str, optional): Workers API 鉴权 token
/ Workers API authentication token
- 当使用 Workers APIcf_workers=True将 token 添加到查询参数
- When using Workers API (cf_workers=True), adds token to query parameters
- 如果 Workers 端配置了 TOKEN 环境变量,必须提供有效的 token
- Required when Workers has TOKEN environment variable configured
- 格式:从查询参数 ?token=xxx 传递
- Format: Passed via query parameter ?token=xxx
**kwargs: 其他参数,与 requests 库完全兼容
- params (dict): URL 查询参数
/ Other parameters, fully compatible with requests library
- params (dict): URL 查询参数 / URL query parameters
- headers (dict): 自定义请求头(会与隐身模式头合并)
- data (dict/str): 表单数据
/ Custom headers (merged with stealth mode headers)
- data (dict/str): 表单数据 / Form data
- json (dict): JSON 数据(自动设置 Content-Type
/ JSON data (Content-Type set automatically)
- cookies (dict): Cookie
- timeout (int/float): 超时时间(秒),默认 30
/ Timeout (seconds), default: 30
- allow_redirects (bool): 是否跟随重定向,默认 True
/ Whether to follow redirects, default: True
- verify (bool): 是否验证 SSL 证书,默认 True
/ Whether to verify SSL certificate, default: True
Returns:
CFSpiderResponse: 响应对象,包含以下属性
- text: 响应文本
- content: 响应字节
- json(): 解析 JSON
- status_code: HTTP 状态码
- headers: 响应头
CFSpiderResponse: 响应对象,包含以下属性
/ Response object with the following attributes
- text: 响应文本 / Response text
- content: 响应字节 / Response bytes
- json(): 解析 JSON / Parse JSON
- status_code: HTTP 状态码 / HTTP status code
- headers: 响应头 / Response headers
- cf_colo: Cloudflare 节点代码(使用 Workers 时可用)
/ Cloudflare colo code (available when using Workers)
- cf_ray: Cloudflare Ray ID
Raises:
ImportError: 当需要的可选依赖未安装时
- http2=True 需要 httpx[http2]
- impersonate 需要 curl_cffi
/ When required optional dependencies are not installed
- http2=True 需要 httpx[http2] / http2=True requires httpx[http2]
- impersonate 需要 curl_cffi / impersonate requires curl_cffi
ValueError: 当 http2 和 impersonate 同时启用时
/ When http2 and impersonate are both enabled
requests.RequestException: 网络请求失败时
/ When network request fails
Examples:
>>> import cfspider
@@ -574,86 +834,313 @@ def _request_httpx(method, url, cf_proxies, cf_workers, params=None, headers=Non
def get(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
stealth=False, stealth_browser='chrome', delay=None, token=None, **kwargs):
"""
发送 GET 请求
发送 GET 请求 / Send GET request
Args:
url: 目标 URL
cf_proxies: 代理地址
cf_workers: 是否使用 Workers API默认 True
http2: 是否启用 HTTP/2
impersonate: TLS 指纹(如 "chrome131", "safari18_0", "firefox133"
map_output: 是否生成 IP 地图 HTML 文件
map_file: 地图输出文件名
stealth: 是否启用隐身模式(自动添加完整浏览器请求头
stealth_browser: 隐身模式浏览器类型chrome/firefox/safari/edge/chrome_mobile
delay: 请求前随机延迟范围,如 (1, 3)
url (str): 目标 URL / Target URL (must include protocol, e.g., https://)
cf_proxies (str, optional): 代理地址 / Proxy address
- 当 cf_workers=True 时CFspider Workers 地址(如 "https://your-workers.dev"
- When cf_workers=True: CFspider Workers address (e.g., "https://your-workers.dev")
- 当 cf_workers=False 时:普通 HTTP/SOCKS5 代理(如 "http://127.0.0.1:8080"
- When cf_workers=False: Regular HTTP/SOCKS5 proxy (e.g., "http://127.0.0.1:8080")
- 不填写时:直接请求,不使用代理 / None: Direct request without proxy
cf_workers (bool): 是否使用 CFspider Workers API默认 True
/ Whether to use CFspider Workers API (default: True)
http2 (bool): 是否启用 HTTP/2 协议(默认 False
/ Whether to enable HTTP/2 protocol (default: False)
impersonate (str, optional): TLS 指纹模拟 / TLS fingerprint impersonation
- 可选值chrome131, chrome124, safari18_0, firefox133, edge101 等
- Options: chrome131, chrome124, safari18_0, firefox133, edge101, etc.
- 设置后自动使用 curl_cffi 发送请求
- Automatically uses curl_cffi when set
map_output (bool): 是否生成 IP 地图 HTML 文件(默认 False
/ Whether to generate IP map HTML file (default: False)
map_file (str): 地图输出文件名(默认 "cfspider_map.html"
/ Map output filename (default: "cfspider_map.html")
stealth (bool): 是否启用隐身模式(默认 False
/ Whether to enable stealth mode (default: False)
- True: 自动添加 15+ 个完整浏览器请求头
- True: Automatically adds 15+ complete browser headers
stealth_browser (str): 隐身模式浏览器类型(默认 'chrome'
/ Stealth mode browser type (default: 'chrome')
- 可选值chrome, firefox, safari, edge, chrome_mobile
- Options: chrome, firefox, safari, edge, chrome_mobile
delay (tuple, optional): 请求前随机延迟范围(秒),如 (1, 3)
/ Random delay range before request (seconds), e.g., (1, 3)
token (str, optional): Workers API 鉴权 token
/ Workers API authentication token
- 当 Workers 配置了 TOKEN 环境变量时必填
- Required when Workers has TOKEN environment variable configured
**kwargs: 其他参数,与 requests 库完全兼容
/ Other parameters, fully compatible with requests library
- params (dict): URL 查询参数 / URL query parameters
- headers (dict): 自定义请求头 / Custom headers
- data (dict/str): 表单数据 / Form data
- json (dict): JSON 数据 / JSON data
- cookies (dict): Cookie
- timeout (int/float): 超时时间(秒),默认 30 / Timeout (seconds), default: 30
Returns:
CFSpiderResponse: 响应对象 / Response object
- text: 响应文本 / Response text
- content: 响应字节 / Response bytes
- json(): 解析 JSON / Parse JSON
- status_code: HTTP 状态码 / HTTP status code
- headers: 响应头 / Response headers
- cf_colo: Cloudflare 节点代码(使用 Workers 时可用)
/ Cloudflare colo code (available when using Workers)
- cf_ray: Cloudflare Ray ID
"""
return request("GET", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
stealth=stealth, stealth_browser=stealth_browser, delay=delay, token=token, **kwargs)
def post(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 POST 请求"""
stealth=False, stealth_browser='chrome', delay=None, token=None, **kwargs):
"""
发送 POST 请求 / Send POST request
Args:
url (str): 目标 URL / Target URL (must include protocol, e.g., https://)
cf_proxies (str, optional): 代理地址 / Proxy address
- 当 cf_workers=True 时CFspider Workers 地址(如 "https://your-workers.dev"
- When cf_workers=True: CFspider Workers address (e.g., "https://your-workers.dev")
- 当 cf_workers=False 时:普通 HTTP/SOCKS5 代理(如 "http://127.0.0.1:8080"
- When cf_workers=False: Regular HTTP/SOCKS5 proxy (e.g., "http://127.0.0.1:8080")
- 不填写时:直接请求,不使用代理 / None: Direct request without proxy
cf_workers (bool): 是否使用 CFspider Workers API默认 True
/ Whether to use CFspider Workers API (default: True)
http2 (bool): 是否启用 HTTP/2 协议(默认 False
/ Whether to enable HTTP/2 protocol (default: False)
impersonate (str, optional): TLS 指纹模拟 / TLS fingerprint impersonation
- 可选值chrome131, chrome124, safari18_0, firefox133, edge101 等
- Options: chrome131, chrome124, safari18_0, firefox133, edge101, etc.
map_output (bool): 是否生成 IP 地图 HTML 文件(默认 False
/ Whether to generate IP map HTML file (default: False)
map_file (str): 地图输出文件名(默认 "cfspider_map.html"
/ Map output filename (default: "cfspider_map.html")
stealth (bool): 是否启用隐身模式(默认 False
/ Whether to enable stealth mode (default: False)
stealth_browser (str): 隐身模式浏览器类型(默认 'chrome'
/ Stealth mode browser type (default: 'chrome')
- 可选值chrome, firefox, safari, edge, chrome_mobile
- Options: chrome, firefox, safari, edge, chrome_mobile
delay (tuple, optional): 请求前随机延迟范围(秒),如 (1, 3)
/ Random delay range before request (seconds), e.g., (1, 3)
token (str, optional): Workers API 鉴权 token
/ Workers API authentication token
- 当 Workers 配置了 TOKEN 环境变量时必填
- Required when Workers has TOKEN environment variable configured
**kwargs: 其他参数,与 requests 库完全兼容
/ Other parameters, fully compatible with requests library
- data (dict/str): 表单数据 / Form data
- json (dict): JSON 数据 / JSON data
- headers (dict): 自定义请求头 / Custom headers
- cookies (dict): Cookie
- timeout (int/float): 超时时间(秒),默认 30 / Timeout (seconds), default: 30
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return request("POST", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
stealth=stealth, stealth_browser=stealth_browser, delay=delay, token=token, **kwargs)
def put(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 PUT 请求"""
stealth=False, stealth_browser='chrome', delay=None, token=None, **kwargs):
"""
发送 PUT 请求 / Send PUT request
Args:
url (str): 目标 URL / Target URL
cf_proxies (str, optional): 代理地址 / Proxy address
cf_workers (bool): 是否使用 CFspider Workers API默认 True
/ Whether to use CFspider Workers API (default: True)
http2 (bool): 是否启用 HTTP/2 协议(默认 False
/ Whether to enable HTTP/2 protocol (default: False)
impersonate (str, optional): TLS 指纹模拟 / TLS fingerprint impersonation
map_output (bool): 是否生成 IP 地图 HTML 文件(默认 False
/ Whether to generate IP map HTML file (default: False)
map_file (str): 地图输出文件名(默认 "cfspider_map.html"
/ Map output filename (default: "cfspider_map.html")
stealth (bool): 是否启用隐身模式(默认 False
/ Whether to enable stealth mode (default: False)
stealth_browser (str): 隐身模式浏览器类型(默认 'chrome'
/ Stealth mode browser type (default: 'chrome')
delay (tuple, optional): 请求前随机延迟范围(秒),如 (1, 3)
/ Random delay range before request (seconds), e.g., (1, 3)
token (str, optional): Workers API 鉴权 token
/ Workers API authentication token
**kwargs: 其他参数,与 requests 库完全兼容
/ Other parameters, fully compatible with requests library
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return request("PUT", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
stealth=stealth, stealth_browser=stealth_browser, delay=delay, token=token, **kwargs)
def delete(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 DELETE 请求"""
stealth=False, stealth_browser='chrome', delay=None, token=None, **kwargs):
"""
发送 DELETE 请求 / Send DELETE request
Args:
url (str): 目标 URL / Target URL
cf_proxies (str, optional): 代理地址 / Proxy address
cf_workers (bool): 是否使用 CFspider Workers API默认 True
/ Whether to use CFspider Workers API (default: True)
http2 (bool): 是否启用 HTTP/2 协议(默认 False
/ Whether to enable HTTP/2 protocol (default: False)
impersonate (str, optional): TLS 指纹模拟 / TLS fingerprint impersonation
map_output (bool): 是否生成 IP 地图 HTML 文件(默认 False
/ Whether to generate IP map HTML file (default: False)
map_file (str): 地图输出文件名(默认 "cfspider_map.html"
/ Map output filename (default: "cfspider_map.html")
stealth (bool): 是否启用隐身模式(默认 False
/ Whether to enable stealth mode (default: False)
stealth_browser (str): 隐身模式浏览器类型(默认 'chrome'
/ Stealth mode browser type (default: 'chrome')
delay (tuple, optional): 请求前随机延迟范围(秒),如 (1, 3)
/ Random delay range before request (seconds), e.g., (1, 3)
token (str, optional): Workers API 鉴权 token
/ Workers API authentication token
**kwargs: 其他参数,与 requests 库完全兼容
/ Other parameters, fully compatible with requests library
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return request("DELETE", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
stealth=stealth, stealth_browser=stealth_browser, delay=delay, token=token, **kwargs)
def head(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 HEAD 请求"""
stealth=False, stealth_browser='chrome', delay=None, token=None, **kwargs):
"""
发送 HEAD 请求 / Send HEAD request
Args:
url (str): 目标 URL / Target URL
cf_proxies (str, optional): 代理地址 / Proxy address
cf_workers (bool): 是否使用 CFspider Workers API默认 True
/ Whether to use CFspider Workers API (default: True)
http2 (bool): 是否启用 HTTP/2 协议(默认 False
/ Whether to enable HTTP/2 protocol (default: False)
impersonate (str, optional): TLS 指纹模拟 / TLS fingerprint impersonation
map_output (bool): 是否生成 IP 地图 HTML 文件(默认 False
/ Whether to generate IP map HTML file (default: False)
map_file (str): 地图输出文件名(默认 "cfspider_map.html"
/ Map output filename (default: "cfspider_map.html")
stealth (bool): 是否启用隐身模式(默认 False
/ Whether to enable stealth mode (default: False)
stealth_browser (str): 隐身模式浏览器类型(默认 'chrome'
/ Stealth mode browser type (default: 'chrome')
delay (tuple, optional): 请求前随机延迟范围(秒),如 (1, 3)
/ Random delay range before request (seconds), e.g., (1, 3)
token (str, optional): Workers API 鉴权 token
/ Workers API authentication token
**kwargs: 其他参数,与 requests 库完全兼容
/ Other parameters, fully compatible with requests library
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return request("HEAD", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
stealth=stealth, stealth_browser=stealth_browser, delay=delay, token=token, **kwargs)
def options(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 OPTIONS 请求"""
stealth=False, stealth_browser='chrome', delay=None, token=None, **kwargs):
"""
发送 OPTIONS 请求 / Send OPTIONS request
Args:
url (str): 目标 URL / Target URL
cf_proxies (str, optional): 代理地址 / Proxy address
cf_workers (bool): 是否使用 CFspider Workers API默认 True
/ Whether to use CFspider Workers API (default: True)
http2 (bool): 是否启用 HTTP/2 协议(默认 False
/ Whether to enable HTTP/2 protocol (default: False)
impersonate (str, optional): TLS 指纹模拟 / TLS fingerprint impersonation
map_output (bool): 是否生成 IP 地图 HTML 文件(默认 False
/ Whether to generate IP map HTML file (default: False)
map_file (str): 地图输出文件名(默认 "cfspider_map.html"
/ Map output filename (default: "cfspider_map.html")
stealth (bool): 是否启用隐身模式(默认 False
/ Whether to enable stealth mode (default: False)
stealth_browser (str): 隐身模式浏览器类型(默认 'chrome'
/ Stealth mode browser type (default: 'chrome')
delay (tuple, optional): 请求前随机延迟范围(秒),如 (1, 3)
/ Random delay range before request (seconds), e.g., (1, 3)
token (str, optional): Workers API 鉴权 token
/ Workers API authentication token
**kwargs: 其他参数,与 requests 库完全兼容
/ Other parameters, fully compatible with requests library
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return request("OPTIONS", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
stealth=stealth, stealth_browser=stealth_browser, delay=delay, token=token, **kwargs)
def patch(url, cf_proxies=None, cf_workers=True, http2=False, impersonate=None,
map_output=False, map_file="cfspider_map.html",
stealth=False, stealth_browser='chrome', delay=None, **kwargs):
"""发送 PATCH 请求"""
stealth=False, stealth_browser='chrome', delay=None, token=None, **kwargs):
"""
发送 PATCH 请求 / Send PATCH request
Args:
url (str): 目标 URL / Target URL
cf_proxies (str, optional): 代理地址 / Proxy address
cf_workers (bool): 是否使用 CFspider Workers API默认 True
/ Whether to use CFspider Workers API (default: True)
http2 (bool): 是否启用 HTTP/2 协议(默认 False
/ Whether to enable HTTP/2 protocol (default: False)
impersonate (str, optional): TLS 指纹模拟 / TLS fingerprint impersonation
map_output (bool): 是否生成 IP 地图 HTML 文件(默认 False
/ Whether to generate IP map HTML file (default: False)
map_file (str): 地图输出文件名(默认 "cfspider_map.html"
/ Map output filename (default: "cfspider_map.html")
stealth (bool): 是否启用隐身模式(默认 False
/ Whether to enable stealth mode (default: False)
stealth_browser (str): 隐身模式浏览器类型(默认 'chrome'
/ Stealth mode browser type (default: 'chrome')
delay (tuple, optional): 请求前随机延迟范围(秒),如 (1, 3)
/ Random delay range before request (seconds), e.g., (1, 3)
token (str, optional): Workers API 鉴权 token
/ Workers API authentication token
**kwargs: 其他参数,与 requests 库完全兼容
/ Other parameters, fully compatible with requests library
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return request("PATCH", url, cf_proxies=cf_proxies, cf_workers=cf_workers,
http2=http2, impersonate=impersonate,
map_output=map_output, map_file=map_file,
stealth=stealth, stealth_browser=stealth_browser, delay=delay, **kwargs)
stealth=stealth, stealth_browser=stealth_browser, delay=delay, token=token, **kwargs)
def clear_map_records():

View File

@@ -108,6 +108,80 @@ class AsyncCFSpiderResponse:
def raise_for_status(self) -> None:
self._response.raise_for_status()
# ========== 数据提取方法 ==========
def _get_extractor(self):
"""获取数据提取器(延迟初始化)"""
if not hasattr(self, '_extractor') or self._extractor is None:
from .extract import Extractor
content_type = "json" if self._is_json_response() else "html"
self._extractor = Extractor(self.text, content_type)
return self._extractor
def _is_json_response(self) -> bool:
"""判断是否是 JSON 响应"""
content_type = self.headers.get("content-type", "")
return "application/json" in content_type.lower()
def find(self, selector: str, attr: str = None, strip: bool = True,
regex: str = None, parser=None):
"""查找第一个匹配的元素"""
return self._get_extractor().find(selector, attr=attr, strip=strip,
regex=regex, parser=parser)
def find_all(self, selector: str, attr: str = None, strip: bool = True):
"""查找所有匹配的元素"""
return self._get_extractor().find_all(selector, attr=attr, strip=strip)
def css(self, selector: str, attr: str = None, html: bool = False, strip: bool = True):
"""使用 CSS 选择器提取"""
return self._get_extractor().css(selector, attr=attr, html=html, strip=strip)
def css_all(self, selector: str, attr: str = None, html: bool = False, strip: bool = True):
"""使用 CSS 选择器提取所有"""
return self._get_extractor().css_all(selector, attr=attr, html=html, strip=strip)
def css_one(self, selector: str):
"""返回第一个匹配的 Element 对象"""
return self._get_extractor().css_one(selector)
def xpath(self, expression: str):
"""使用 XPath 表达式提取"""
return self._get_extractor().xpath(expression)
def xpath_all(self, expression: str):
"""使用 XPath 表达式提取所有"""
return self._get_extractor().xpath_all(expression)
def xpath_one(self, expression: str):
"""返回第一个匹配的 Element 对象"""
return self._get_extractor().xpath_one(expression)
def jpath(self, expression: str):
"""使用 JSONPath 表达式提取"""
return self._get_extractor().jpath(expression)
def jpath_all(self, expression: str):
"""使用 JSONPath 表达式提取所有"""
return self._get_extractor().jpath_all(expression)
def pick(self, **fields):
"""批量提取多个字段"""
result = self._get_extractor().pick(**fields)
result.url = str(self.url)
return result
def extract(self, rules: dict):
"""使用规则字典提取数据"""
result = self._get_extractor().extract(rules)
result.url = str(self.url)
return result
def save(self, filepath: str, encoding: str = "utf-8"):
"""保存响应内容到文件"""
from .export import save_response
return save_response(self.content, filepath, encoding=encoding)
async def aiter_bytes(self, chunk_size: Optional[int] = None) -> AsyncIterator[bytes]:
"""异步迭代响应字节"""
async for chunk in self._response.aiter_bytes(chunk_size):

451
cfspider/batch.py Normal file
View File

@@ -0,0 +1,451 @@
"""
CFspider 批量请求模块
提供批量请求、并发控制、进度显示和结果聚合功能。
Example:
>>> import cfspider
>>>
>>> # 基础批量请求
>>> results = cfspider.batch(["url1", "url2", "url3"])
>>>
>>> # 带数据提取的批量请求
>>> results = cfspider.batch(
... ["url1", "url2"],
... pick={"title": "h1", "price": ".price"},
... concurrency=5
... )
>>> results.save("output.csv")
"""
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, List, Dict, Optional, Callable, Union
from dataclasses import dataclass, field
# 延迟导入 tqdm
_tqdm = None
def _get_tqdm():
"""延迟加载 tqdm"""
global _tqdm
if _tqdm is None:
try:
from tqdm import tqdm
_tqdm = tqdm
except ImportError:
# 如果没有 tqdm使用简单的进度显示
_tqdm = None
return _tqdm
@dataclass
class BatchItem:
"""
批量请求的单个结果项
Attributes:
url: 请求的 URL
data: 提取的数据(如果使用了 pick
response: 原始响应对象
error: 错误信息(如果请求失败)
duration: 请求耗时(秒)
"""
url: str
data: Optional[Dict[str, Any]] = None
response: Any = None
error: Optional[str] = None
duration: float = 0.0
@property
def success(self) -> bool:
"""请求是否成功"""
return self.error is None
def __repr__(self):
if self.success:
return f"BatchItem(url={self.url!r}, data={self.data})"
else:
return f"BatchItem(url={self.url!r}, error={self.error!r})"
class BatchResult:
"""
批量请求结果集合
支持迭代、过滤和导出功能。
Example:
>>> results = cfspider.batch(urls, pick={...})
>>>
>>> # 迭代结果
>>> for item in results:
... print(item.url, item.data)
>>>
>>> # 获取成功/失败的结果
>>> print(len(results.successful))
>>> print(len(results.failed))
>>>
>>> # 保存结果
>>> results.save("output.csv")
"""
def __init__(self, items: List[BatchItem] = None):
self._items: List[BatchItem] = items or []
def append(self, item: BatchItem):
"""添加结果项"""
self._items.append(item)
def __iter__(self):
return iter(self._items)
def __len__(self):
return len(self._items)
def __getitem__(self, index):
return self._items[index]
@property
def successful(self) -> List[BatchItem]:
"""获取成功的结果"""
return [item for item in self._items if item.success]
@property
def failed(self) -> List[BatchItem]:
"""获取失败的结果"""
return [item for item in self._items if not item.success]
@property
def success_rate(self) -> float:
"""成功率"""
if not self._items:
return 0.0
return len(self.successful) / len(self._items)
def to_list(self) -> List[Dict[str, Any]]:
"""转换为字典列表"""
results = []
for item in self._items:
row = {"url": item.url}
if item.data:
row.update(item.data)
if item.error:
row["_error"] = item.error
row["_duration"] = item.duration
results.append(row)
return results
def to_dataframe(self):
"""转换为 pandas DataFrame"""
try:
import pandas as pd
return pd.DataFrame(self.to_list())
except ImportError:
raise ImportError(
"pandas is required for to_dataframe(). "
"Install it with: pip install pandas"
)
def save(self, filepath: str, **kwargs) -> str:
"""
保存结果到文件
Args:
filepath: 输出文件路径(根据扩展名自动选择格式)
**kwargs: 传递给导出函数的参数
Returns:
输出文件的绝对路径
"""
from .export import export
return export(self.to_list(), filepath, **kwargs)
def filter(self, predicate: Callable[[BatchItem], bool]) -> 'BatchResult':
"""
过滤结果
Args:
predicate: 过滤函数
Returns:
过滤后的 BatchResult
"""
return BatchResult([item for item in self._items if predicate(item)])
def summary(self) -> Dict[str, Any]:
"""获取结果摘要"""
total_duration = sum(item.duration for item in self._items)
return {
"total": len(self._items),
"successful": len(self.successful),
"failed": len(self.failed),
"success_rate": f"{self.success_rate:.1%}",
"total_duration": f"{total_duration:.2f}s",
"avg_duration": f"{total_duration / len(self._items):.2f}s" if self._items else "0s",
}
def __repr__(self):
return f"BatchResult({len(self.successful)} successful, {len(self.failed)} failed)"
def batch(
urls: Union[List[str], str],
pick: Dict[str, Any] = None,
concurrency: int = 5,
delay: float = 0.0,
retry: int = 0,
timeout: float = 30.0,
cf_proxies: str = None,
token: str = None,
impersonate: str = None,
stealth: bool = False,
stealth_browser: str = None,
headers: Dict[str, str] = None,
on_success: Callable = None,
on_error: Callable = None,
progress: bool = True,
**kwargs
) -> BatchResult:
"""
批量请求多个 URL
Args:
urls: URL 列表或文件路径
pick: 数据提取规则(字典),如 {"title": "h1", "price": ".price"}
concurrency: 并发数
delay: 请求间隔(秒)
retry: 失败重试次数
timeout: 超时时间(秒)
cf_proxies: Cloudflare Workers 代理地址
token: 鉴权 token
impersonate: TLS 指纹模拟
stealth: 是否启用隐身模式
stealth_browser: 隐身模式的浏览器类型
headers: 自定义请求头
on_success: 成功回调函数 (url, response, data) -> None
on_error: 错误回调函数 (url, error) -> None
progress: 是否显示进度条
**kwargs: 传递给 cfspider.get 的其他参数
Returns:
BatchResult 对象
Example:
>>> results = cfspider.batch(
... ["https://example.com", "https://example.org"],
... pick={"title": "h1"},
... concurrency=10,
... progress=True
... )
>>> results.save("output.csv")
"""
from . import api
# 如果是文件路径,读取 URL 列表
if isinstance(urls, str):
with open(urls, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip() and not line.startswith('#')]
result = BatchResult()
lock = threading.Lock()
last_request_time = [0.0] # 用列表以便在闭包中修改
def process_url(url: str) -> BatchItem:
"""处理单个 URL"""
# 应用请求延迟
if delay > 0:
with lock:
elapsed = time.time() - last_request_time[0]
if elapsed < delay:
time.sleep(delay - elapsed)
last_request_time[0] = time.time()
start_time = time.time()
item = BatchItem(url=url)
for attempt in range(retry + 1):
try:
response = api.get(
url,
cf_proxies=cf_proxies,
token=token,
impersonate=impersonate,
stealth=stealth,
stealth_browser=stealth_browser,
headers=headers,
timeout=timeout,
**kwargs
)
item.response = response
item.duration = time.time() - start_time
# 数据提取
if pick:
item.data = response.pick(**pick)
# 成功回调
if on_success:
on_success(url, response, item.data)
return item
except Exception as e:
if attempt < retry:
time.sleep(1) # 重试前等待
continue
item.error = str(e)
item.duration = time.time() - start_time
# 错误回调
if on_error:
on_error(url, e)
return item
return item
# 使用线程池并发请求
tqdm = _get_tqdm()
with ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {executor.submit(process_url, url): url for url in urls}
if progress and tqdm:
# 使用 tqdm 显示进度
iterator = tqdm(as_completed(futures), total=len(urls), desc="Fetching")
else:
iterator = as_completed(futures)
for future in iterator:
try:
item = future.result()
result.append(item)
except Exception as e:
url = futures[future]
result.append(BatchItem(url=url, error=str(e)))
return result
async def abatch(
urls: Union[List[str], str],
pick: Dict[str, Any] = None,
concurrency: int = 10,
delay: float = 0.0,
retry: int = 0,
timeout: float = 30.0,
cf_proxies: str = None,
token: str = None,
impersonate: str = None,
stealth: bool = False,
stealth_browser: str = None,
headers: Dict[str, str] = None,
on_success: Callable = None,
on_error: Callable = None,
progress: bool = True,
**kwargs
) -> BatchResult:
"""
异步批量请求多个 URL
参数与 batch() 相同,但使用异步方式执行。
Example:
>>> results = await cfspider.abatch(
... ["https://example.com", "https://example.org"],
... pick={"title": "h1"},
... concurrency=20
... )
"""
import asyncio
from . import async_api
# 如果是文件路径,读取 URL 列表
if isinstance(urls, str):
with open(urls, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip() and not line.startswith('#')]
result = BatchResult()
semaphore = asyncio.Semaphore(concurrency)
last_request_time = [0.0]
async def process_url(url: str) -> BatchItem:
"""处理单个 URL"""
async with semaphore:
# 应用请求延迟
if delay > 0:
elapsed = time.time() - last_request_time[0]
if elapsed < delay:
await asyncio.sleep(delay - elapsed)
last_request_time[0] = time.time()
start_time = time.time()
item = BatchItem(url=url)
for attempt in range(retry + 1):
try:
response = await async_api.aget(
url,
cf_proxies=cf_proxies,
token=token,
impersonate=impersonate,
stealth=stealth,
stealth_browser=stealth_browser,
headers=headers,
timeout=timeout,
**kwargs
)
item.response = response
item.duration = time.time() - start_time
# 数据提取
if pick:
item.data = response.pick(**pick)
# 成功回调
if on_success:
on_success(url, response, item.data)
return item
except Exception as e:
if attempt < retry:
await asyncio.sleep(1)
continue
item.error = str(e)
item.duration = time.time() - start_time
# 错误回调
if on_error:
on_error(url, e)
return item
return item
# 并发执行所有请求
tqdm = _get_tqdm()
tasks = [process_url(url) for url in urls]
if progress and tqdm:
# 使用 tqdm 显示进度
for coro in tqdm(asyncio.as_completed(tasks), total=len(urls), desc="Fetching"):
item = await coro
result.append(item)
else:
items = await asyncio.gather(*tasks, return_exceptions=True)
for item in items:
if isinstance(item, Exception):
result.append(BatchItem(url="", error=str(item)))
else:
result.append(item)
return result

View File

@@ -1,9 +1,23 @@
"""
CFspider 命令行工具
提供完整的命令行接口,支持:
- GET/POST/HEAD 等 HTTP 请求
- 批量 URL 请求
- 数据提取和导出
- VPN 代理模式
- 浏览器安装
用法示例:
cfspider get https://example.com
cfspider post https://api.example.com -d '{"key": "value"}'
cfspider batch urls.txt --pick "title:h1" -o results.csv
"""
import sys
import subprocess
import argparse
import json
def install_browser():
@@ -26,15 +40,367 @@ def install_browser():
return False
def cmd_get(args):
"""执行 GET 请求"""
from . import api
# 解析请求头
headers = {}
if args.header:
for h in args.header:
if ':' in h:
key, value = h.split(':', 1)
headers[key.strip()] = value.strip()
try:
response = api.get(
args.url,
cf_proxies=args.proxy,
token=args.token,
impersonate=args.impersonate,
stealth=args.stealth,
stealth_browser=args.stealth_browser or 'chrome',
headers=headers if headers else None,
timeout=args.timeout,
)
# 输出结果
_output_response(response, args)
except Exception as e:
print(f"请求失败: {e}", file=sys.stderr)
sys.exit(1)
def cmd_post(args):
"""执行 POST 请求"""
from . import api
# 解析请求头
headers = {}
if args.header:
for h in args.header:
if ':' in h:
key, value = h.split(':', 1)
headers[key.strip()] = value.strip()
# 解析数据
data = None
json_data = None
if args.data:
# 尝试解析为 JSON
try:
json_data = json.loads(args.data)
except json.JSONDecodeError:
data = args.data
if args.form:
# 表单数据
data = {}
for item in args.form.split('&'):
if '=' in item:
key, value = item.split('=', 1)
data[key] = value
try:
response = api.post(
args.url,
cf_proxies=args.proxy,
token=args.token,
impersonate=args.impersonate,
stealth=args.stealth,
stealth_browser=args.stealth_browser or 'chrome',
headers=headers if headers else None,
data=data,
json=json_data,
timeout=args.timeout,
)
# 输出结果
_output_response(response, args)
except Exception as e:
print(f"请求失败: {e}", file=sys.stderr)
sys.exit(1)
def cmd_head(args):
"""执行 HEAD 请求"""
from . import api
headers = {}
if args.header:
for h in args.header:
if ':' in h:
key, value = h.split(':', 1)
headers[key.strip()] = value.strip()
try:
response = api.head(
args.url,
cf_proxies=args.proxy,
token=args.token,
impersonate=args.impersonate,
stealth=args.stealth,
headers=headers if headers else None,
timeout=args.timeout,
)
# HEAD 请求只输出响应头
print(f"HTTP {response.status_code}")
for key, value in response.headers.items():
print(f"{key}: {value}")
if response.cf_colo:
print(f"\nCF-Colo: {response.cf_colo}")
if response.cf_ray:
print(f"CF-Ray: {response.cf_ray}")
except Exception as e:
print(f"请求失败: {e}", file=sys.stderr)
sys.exit(1)
def cmd_batch(args):
"""执行批量请求"""
from .batch import batch
# 解析 URL 列表
if args.urls:
# 从命令行参数获取 URL
urls = args.urls
else:
# 从文件读取
print("错误: 必须提供 URL 列表或文件", file=sys.stderr)
sys.exit(1)
# 解析 pick 规则
pick = None
if args.pick:
pick = {}
for rule in args.pick:
if ':' in rule:
name, selector = rule.split(':', 1)
# 检查是否有属性指定 (selector@attr)
if '@' in selector:
sel, attr = selector.rsplit('@', 1)
pick[name] = (sel, attr)
else:
pick[name] = selector
try:
results = batch(
urls=urls,
pick=pick,
concurrency=args.concurrency,
delay=args.delay,
retry=args.retry,
timeout=args.timeout,
cf_proxies=args.proxy,
token=args.token,
impersonate=args.impersonate,
stealth=args.stealth,
progress=not args.quiet,
)
# 输出摘要
if not args.quiet:
summary = results.summary()
print(f"\n完成: {summary['successful']}/{summary['total']} 成功 "
f"({summary['success_rate']}), 耗时 {summary['total_duration']}")
# 保存结果
if args.output:
filepath = results.save(args.output)
if not args.quiet:
print(f"结果已保存到: {filepath}")
else:
# 输出到标准输出
print(json.dumps(results.to_list(), ensure_ascii=False, indent=2))
except Exception as e:
print(f"批量请求失败: {e}", file=sys.stderr)
sys.exit(1)
def _output_response(response, args):
"""输出响应结果"""
# 数据提取
if args.pick:
pick = {}
for rule in args.pick:
if ':' in rule:
name, selector = rule.split(':', 1)
if '@' in selector:
sel, attr = selector.rsplit('@', 1)
pick[name] = (sel, attr)
else:
pick[name] = selector
data = response.pick(**pick)
if args.output:
data.save(args.output)
print(f"结果已保存到: {args.output}")
else:
print(json.dumps(dict(data), ensure_ascii=False, indent=2))
else:
# 直接输出响应
if args.output:
response.save(args.output)
print(f"响应已保存到: {args.output}")
else:
# 输出响应信息
if args.verbose:
print(f"HTTP {response.status_code}")
for key, value in response.headers.items():
print(f"{key}: {value}")
print()
# 输出响应体
try:
# 尝试格式化 JSON
data = response.json()
print(json.dumps(data, ensure_ascii=False, indent=2))
except:
print(response.text)
# 输出 CF 信息
if args.verbose and response.cf_colo:
print(f"\n[CF-Colo: {response.cf_colo}]")
def cmd_vpn(args):
"""VPN 代理命令"""
if args.vpn_command == 'start':
from .vless_client import start_socks5_proxy
print(f"启动 SOCKS5 代理服务器...")
print(f"Workers URL: {args.workers_url}")
print(f"本地端口: {args.port}")
print(f"监听地址: 127.0.0.1:{args.port}")
print()
print("使用方法:")
print(f" - 设置系统代理为 SOCKS5://127.0.0.1:{args.port}")
print(f" - 或使用浏览器扩展如 SwitchyOmega")
print()
print("按 Ctrl+C 停止服务")
try:
start_socks5_proxy(
workers_url=args.workers_url,
local_port=args.port,
token=args.token
)
except KeyboardInterrupt:
print("\n代理服务已停止")
else:
print("未知的 VPN 子命令")
sys.exit(1)
def main():
"""命令行入口"""
if len(sys.argv) < 2:
parser = argparse.ArgumentParser(
description='CFspider - Cloudflare 代理 IP 池',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
cfspider get https://httpbin.org/ip
cfspider get https://example.com --proxy https://workers.dev --pick "title:h1"
cfspider post https://api.example.com -d '{"key": "value"}'
cfspider batch urls.txt --pick "title:h1" "links:a@href" -o results.csv
cfspider vpn start --workers-url https://your.workers.dev --port 1080
更多信息: https://www.cfspider.com
"""
)
subparsers = parser.add_subparsers(dest='command', help='可用命令')
# ===== install 命令 =====
install_parser = subparsers.add_parser('install', help='安装 Chromium 浏览器')
# ===== version 命令 =====
version_parser = subparsers.add_parser('version', help='显示版本号')
# ===== 通用请求参数 =====
def add_common_args(p):
p.add_argument('-H', '--header', action='append', metavar='HEADER',
help='请求头 (如 "User-Agent: Mozilla/5.0")')
p.add_argument('--proxy', metavar='URL',
help='Workers 代理地址')
p.add_argument('--token', metavar='TOKEN',
help='鉴权 token')
p.add_argument('--impersonate', metavar='BROWSER',
help='TLS 指纹模拟 (如 chrome131)')
p.add_argument('--stealth', action='store_true',
help='启用隐身模式')
p.add_argument('--stealth-browser', metavar='BROWSER',
help='隐身模式浏览器类型')
p.add_argument('--timeout', type=float, default=30,
help='超时时间(秒)')
p.add_argument('--pick', action='append', metavar='RULE',
help='数据提取规则 (如 "title:h1")')
p.add_argument('-o', '--output', metavar='FILE',
help='输出文件')
p.add_argument('-v', '--verbose', action='store_true',
help='显示详细信息')
# ===== get 命令 =====
get_parser = subparsers.add_parser('get', help='发送 GET 请求')
get_parser.add_argument('url', help='目标 URL')
add_common_args(get_parser)
# ===== post 命令 =====
post_parser = subparsers.add_parser('post', help='发送 POST 请求')
post_parser.add_argument('url', help='目标 URL')
post_parser.add_argument('-d', '--data', metavar='DATA',
help='POST 数据 (JSON 或字符串)')
post_parser.add_argument('-f', '--form', metavar='DATA',
help='表单数据 (如 "name=test&age=20")')
add_common_args(post_parser)
# ===== head 命令 =====
head_parser = subparsers.add_parser('head', help='发送 HEAD 请求')
head_parser.add_argument('url', help='目标 URL')
add_common_args(head_parser)
# ===== batch 命令 =====
batch_parser = subparsers.add_parser('batch', help='批量请求')
batch_parser.add_argument('urls', nargs='*', help='URL 列表或文件路径')
batch_parser.add_argument('--concurrency', '-c', type=int, default=5,
help='并发数 (默认 5)')
batch_parser.add_argument('--delay', type=float, default=0,
help='请求间隔(秒)')
batch_parser.add_argument('--retry', type=int, default=0,
help='失败重试次数')
batch_parser.add_argument('-q', '--quiet', action='store_true',
help='安静模式,不显示进度')
add_common_args(batch_parser)
# ===== vpn 命令 =====
vpn_parser = subparsers.add_parser('vpn', help='VPN 代理模式')
vpn_subparsers = vpn_parser.add_subparsers(dest='vpn_command')
vpn_start = vpn_subparsers.add_parser('start', help='启动 SOCKS5 代理')
vpn_start.add_argument('--workers-url', required=True,
help='Workers URL')
vpn_start.add_argument('--port', type=int, default=1080,
help='本地端口 (默认 1080)')
vpn_start.add_argument('--token',
help='鉴权 token')
# 解析参数
args = parser.parse_args()
if not args.command:
print_help()
return
command = sys.argv[1].lower()
if command == 'install':
if args.command == 'install':
print("正在安装 Chromium 浏览器...")
if install_browser():
print("安装完成!")
@@ -42,16 +408,27 @@ def main():
print("安装失败,请检查网络连接或手动安装")
sys.exit(1)
elif command == 'version':
elif args.command == 'version':
from . import __version__
print(f"cfspider {__version__}")
elif command == 'help' or command == '-h' or command == '--help':
print_help()
elif args.command == 'get':
cmd_get(args)
elif args.command == 'post':
cmd_post(args)
elif args.command == 'head':
cmd_head(args)
elif args.command == 'batch':
cmd_batch(args)
elif args.command == 'vpn':
cmd_vpn(args)
else:
print(f"未知命令: {command}")
print_help()
parser.print_help()
sys.exit(1)
@@ -61,21 +438,35 @@ def print_help():
CFspider - Cloudflare 代理 IP 池
用法:
cfspider <command>
cfspider <command> [options]
命令:
install 安装 Chromium 浏览器(用于 Browser 功能)
version 显示版本号
help 显示帮助信息
get <url> 发送 GET 请求
post <url> 发送 POST 请求
head <url> 发送 HEAD 请求
batch <urls> 批量请求多个 URL
vpn start 启动 SOCKS5 代理服务器
install 安装 Chromium 浏览器
version 显示版本号
通用选项:
-H, --header 添加请求头
--proxy Workers 代理地址
--token 鉴权 token
--impersonate TLS 指纹模拟
--stealth 启用隐身模式
--pick 数据提取规则
-o, --output 输出文件
示例:
cfspider install # 安装浏览器
cfspider version # 显示版本
cfspider get https://httpbin.org/ip
cfspider get https://example.com --pick "title:h1" -o data.json
cfspider batch url1 url2 url3 --pick "title:h1" -o results.csv
cfspider vpn start --workers-url https://your.workers.dev
更多信息请访问: https://github.com/violettoolssite/CFspider
更多信息请访问: https://www.cfspider.com
""")
if __name__ == '__main__':
main()

381
cfspider/export.py Normal file
View File

@@ -0,0 +1,381 @@
"""
CFspider 数据导出模块
支持导出数据到 JSON、CSV、Excel、SQLite 格式。
Example:
>>> import cfspider
>>>
>>> # 保存响应
>>> response = cfspider.get("https://example.com")
>>> response.save("page.html")
>>>
>>> # 保存提取结果
>>> data = response.pick(title="h1", price=".price")
>>> data.save("output.csv")
>>>
>>> # 使用导出函数
>>> cfspider.export(data, "output.xlsx", format="excel")
"""
import json
import csv
import os
from typing import Any, Dict, List, Union, Optional
def export(data: Union[Dict, List[Dict], Any],
filepath: str,
format: str = None,
table: str = "data",
encoding: str = "utf-8",
**kwargs) -> str:
"""
导出数据到文件
Args:
data: 要导出的数据(字典、字典列表或其他)
filepath: 输出文件路径
format: 导出格式json/csv/excel/sqliteNone 则自动从扩展名推断
table: SQLite 表名(仅 sqlite 格式使用)
encoding: 文件编码
**kwargs: 传递给底层导出函数的参数
Returns:
输出文件的绝对路径
Example:
>>> export({"title": "Hello"}, "output.json")
>>> export([{"a": 1}, {"a": 2}], "output.csv")
>>> export(data, "output.xlsx", format="excel")
"""
# 自动推断格式
if format is None:
ext = os.path.splitext(filepath)[1].lower()
format_map = {
'.json': 'json',
'.csv': 'csv',
'.xlsx': 'excel',
'.xls': 'excel',
'.db': 'sqlite',
'.sqlite': 'sqlite',
'.sqlite3': 'sqlite',
}
format = format_map.get(ext, 'json')
format = format.lower()
if format == 'json':
return export_json(data, filepath, encoding=encoding, **kwargs)
elif format == 'csv':
return export_csv(data, filepath, encoding=encoding, **kwargs)
elif format == 'excel':
return export_excel(data, filepath, **kwargs)
elif format == 'sqlite':
return export_sqlite(data, filepath, table=table, **kwargs)
else:
raise ValueError(f"Unsupported format: {format}")
def export_json(data: Any,
filepath: str,
encoding: str = "utf-8",
indent: int = 2,
ensure_ascii: bool = False,
**kwargs) -> str:
"""
导出数据到 JSON 文件
Args:
data: 要导出的数据
filepath: 输出文件路径
encoding: 文件编码
indent: 缩进空格数
ensure_ascii: 是否转义非 ASCII 字符
Returns:
输出文件的绝对路径
"""
filepath = os.path.abspath(filepath)
with open(filepath, 'w', encoding=encoding) as f:
json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii, **kwargs)
return filepath
def export_csv(data: Union[Dict, List[Dict]],
filepath: str,
encoding: str = "utf-8-sig", # 带 BOM 以支持 Excel 打开
delimiter: str = ",",
**kwargs) -> str:
"""
导出数据到 CSV 文件
Args:
data: 要导出的数据(字典或字典列表)
filepath: 输出文件路径
encoding: 文件编码(默认 utf-8-sig 带 BOM
delimiter: 分隔符
Returns:
输出文件的绝对路径
"""
filepath = os.path.abspath(filepath)
# 统一转换为列表
if isinstance(data, dict):
# 单个字典,检查值是否为列表
has_list_values = any(isinstance(v, list) for v in data.values())
if has_list_values:
# 展开列表值为多行
rows = _expand_dict_with_lists(data)
else:
# 单行数据
rows = [data]
elif isinstance(data, list):
rows = data
else:
rows = [{"value": data}]
if not rows:
return filepath
# 获取所有字段名
fieldnames = []
for row in rows:
if isinstance(row, dict):
for key in row.keys():
if key not in fieldnames:
fieldnames.append(key)
with open(filepath, 'w', encoding=encoding, newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter=delimiter, **kwargs)
writer.writeheader()
for row in rows:
if isinstance(row, dict):
# 将列表值转为字符串
clean_row = {}
for k, v in row.items():
if isinstance(v, (list, dict)):
clean_row[k] = json.dumps(v, ensure_ascii=False)
else:
clean_row[k] = v
writer.writerow(clean_row)
return filepath
def _expand_dict_with_lists(data: Dict) -> List[Dict]:
"""
展开包含列表的字典为多行
Example:
>>> _expand_dict_with_lists({"title": "Hello", "links": ["a", "b", "c"]})
[{"title": "Hello", "links": "a"}, {"title": "Hello", "links": "b"}, ...]
"""
# 找出最长的列表长度
max_len = 1
for v in data.values():
if isinstance(v, list):
max_len = max(max_len, len(v))
rows = []
for i in range(max_len):
row = {}
for k, v in data.items():
if isinstance(v, list):
row[k] = v[i] if i < len(v) else None
else:
row[k] = v if i == 0 else None # 非列表值只在第一行显示
rows.append(row)
return rows
def export_excel(data: Union[Dict, List[Dict]],
filepath: str,
sheet_name: str = "Sheet1",
**kwargs) -> str:
"""
导出数据到 Excel 文件
Args:
data: 要导出的数据(字典或字典列表)
filepath: 输出文件路径
sheet_name: 工作表名称
Returns:
输出文件的绝对路径
Raises:
ImportError: 如果未安装 openpyxl
"""
try:
from openpyxl import Workbook
except ImportError:
raise ImportError(
"openpyxl is required for Excel export. "
"Install it with: pip install openpyxl"
)
filepath = os.path.abspath(filepath)
# 统一转换为列表
if isinstance(data, dict):
has_list_values = any(isinstance(v, list) for v in data.values())
if has_list_values:
rows = _expand_dict_with_lists(data)
else:
rows = [data]
elif isinstance(data, list):
rows = data
else:
rows = [{"value": data}]
if not rows:
wb = Workbook()
wb.save(filepath)
return filepath
# 获取所有字段名
fieldnames = []
for row in rows:
if isinstance(row, dict):
for key in row.keys():
if key not in fieldnames:
fieldnames.append(key)
wb = Workbook()
ws = wb.active
ws.title = sheet_name
# 写入表头
for col, name in enumerate(fieldnames, 1):
ws.cell(row=1, column=col, value=name)
# 写入数据
for row_idx, row in enumerate(rows, 2):
if isinstance(row, dict):
for col, name in enumerate(fieldnames, 1):
value = row.get(name)
if isinstance(value, (list, dict)):
value = json.dumps(value, ensure_ascii=False)
ws.cell(row=row_idx, column=col, value=value)
wb.save(filepath)
return filepath
def export_sqlite(data: Union[Dict, List[Dict]],
filepath: str,
table: str = "data",
if_exists: str = "replace",
**kwargs) -> str:
"""
导出数据到 SQLite 数据库
Args:
data: 要导出的数据(字典或字典列表)
filepath: 数据库文件路径
table: 表名
if_exists: 如果表存在的处理方式 ("replace", "append", "fail")
Returns:
输出文件的绝对路径
"""
import sqlite3
filepath = os.path.abspath(filepath)
# 统一转换为列表
if isinstance(data, dict):
has_list_values = any(isinstance(v, list) for v in data.values())
if has_list_values:
rows = _expand_dict_with_lists(data)
else:
rows = [data]
elif isinstance(data, list):
rows = data
else:
rows = [{"value": data}]
if not rows:
# 创建空数据库
conn = sqlite3.connect(filepath)
conn.close()
return filepath
# 获取所有字段名
fieldnames = []
for row in rows:
if isinstance(row, dict):
for key in row.keys():
if key not in fieldnames:
fieldnames.append(key)
conn = sqlite3.connect(filepath)
cursor = conn.cursor()
# 处理表存在的情况
if if_exists == "replace":
cursor.execute(f"DROP TABLE IF EXISTS {table}")
elif if_exists == "fail":
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table,))
if cursor.fetchone():
conn.close()
raise ValueError(f"Table '{table}' already exists")
# 创建表(如果不存在)
columns = ", ".join([f'"{name}" TEXT' for name in fieldnames])
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table} ({columns})")
# 插入数据
placeholders = ", ".join(["?" for _ in fieldnames])
insert_sql = f"INSERT INTO {table} ({', '.join([f'\"{n}\"' for n in fieldnames])}) VALUES ({placeholders})"
for row in rows:
if isinstance(row, dict):
values = []
for name in fieldnames:
value = row.get(name)
if isinstance(value, (list, dict)):
value = json.dumps(value, ensure_ascii=False)
elif value is not None:
value = str(value)
values.append(value)
cursor.execute(insert_sql, values)
conn.commit()
conn.close()
return filepath
def save_response(content: Union[str, bytes],
filepath: str,
encoding: str = "utf-8") -> str:
"""
保存响应内容到文件
Args:
content: 响应内容(字符串或字节)
filepath: 输出文件路径
encoding: 文件编码(仅用于字符串)
Returns:
输出文件的绝对路径
"""
filepath = os.path.abspath(filepath)
if isinstance(content, bytes):
with open(filepath, 'wb') as f:
f.write(content)
else:
with open(filepath, 'w', encoding=encoding) as f:
f.write(content)
return filepath

727
cfspider/extract.py Normal file
View File

@@ -0,0 +1,727 @@
"""
CFspider 数据提取模块
提供 CSS 选择器、XPath、JSONPath 数据提取功能。
Example:
>>> import cfspider
>>> response = cfspider.get("https://example.com")
>>>
>>> # CSS 选择器
>>> title = response.find("h1")
>>> links = response.find_all("a", attr="href")
>>>
>>> # XPath
>>> items = response.xpath("//div[@class='item']")
>>>
>>> # 批量提取
>>> data = response.pick(title="h1", links=("a", "href"))
"""
import re
import json
from typing import Any, Optional, Union, List, Dict, Callable
# 延迟导入可选依赖
_bs4 = None
_lxml = None
_jsonpath_ng = None
def _get_bs4():
"""延迟加载 BeautifulSoup"""
global _bs4
if _bs4 is None:
try:
from bs4 import BeautifulSoup
_bs4 = BeautifulSoup
except ImportError:
raise ImportError(
"beautifulsoup4 is required for HTML extraction. "
"Install it with: pip install beautifulsoup4"
)
return _bs4
def _get_lxml():
"""延迟加载 lxml"""
global _lxml
if _lxml is None:
try:
from lxml import etree
_lxml = etree
except ImportError:
raise ImportError(
"lxml is required for XPath extraction. "
"Install it with: pip install lxml"
)
return _lxml
def _get_jsonpath():
"""延迟加载 jsonpath-ng"""
global _jsonpath_ng
if _jsonpath_ng is None:
try:
from jsonpath_ng import parse
_jsonpath_ng = parse
except ImportError:
raise ImportError(
"jsonpath-ng is required for JSONPath extraction. "
"Install it with: pip install jsonpath-ng"
)
return _jsonpath_ng
class Element:
"""
HTML 元素封装类,支持链式操作
Example:
>>> element = response.css_one("#product")
>>> title = element.find("h1")
>>> price = element.find(".price")
>>> element.text # 获取文本
>>> element.html # 获取 HTML
>>> element["href"] # 获取属性
"""
def __init__(self, element, parser: str = "bs4"):
"""
初始化元素
Args:
element: BeautifulSoup Tag 或 lxml Element
parser: 解析器类型 ("bs4""lxml")
"""
self._element = element
self._parser = parser
@property
def text(self) -> str:
"""获取元素文本内容"""
if self._element is None:
return ""
if self._parser == "bs4":
return self._element.get_text(strip=True)
else:
return self._element.text_content().strip() if hasattr(self._element, 'text_content') else str(self._element)
@property
def html(self) -> str:
"""获取元素 HTML 内容"""
if self._element is None:
return ""
if self._parser == "bs4":
return str(self._element)
else:
etree = _get_lxml()
return etree.tostring(self._element, encoding='unicode')
@property
def attrs(self) -> Dict[str, str]:
"""获取所有属性"""
if self._element is None:
return {}
if self._parser == "bs4":
return dict(self._element.attrs) if hasattr(self._element, 'attrs') else {}
else:
return dict(self._element.attrib) if hasattr(self._element, 'attrib') else {}
def __getitem__(self, key: str) -> Optional[str]:
"""获取属性值"""
return self.attrs.get(key)
def get(self, key: str, default: str = None) -> Optional[str]:
"""获取属性值,支持默认值"""
return self.attrs.get(key, default)
def find(self, selector: str, attr: str = None, strip: bool = True) -> Optional[str]:
"""
在当前元素内查找第一个匹配的元素
Args:
selector: CSS 选择器
attr: 要提取的属性名None 表示提取文本
strip: 是否去除空白
Returns:
匹配元素的文本或属性值
"""
if self._element is None:
return None
if self._parser == "bs4":
found = self._element.select_one(selector)
if found is None:
return None
if attr:
return found.get(attr)
text = found.get_text(strip=strip)
return text
else:
# lxml 使用 cssselect
try:
from lxml.cssselect import CSSSelector
sel = CSSSelector(selector)
results = sel(self._element)
if not results:
return None
found = results[0]
if attr:
return found.get(attr)
text = found.text_content()
return text.strip() if strip and text else text
except ImportError:
raise ImportError("cssselect is required for CSS selectors with lxml")
def find_all(self, selector: str, attr: str = None, strip: bool = True) -> List[str]:
"""
在当前元素内查找所有匹配的元素
Args:
selector: CSS 选择器
attr: 要提取的属性名None 表示提取文本
strip: 是否去除空白
Returns:
匹配元素的文本或属性值列表
"""
if self._element is None:
return []
results = []
if self._parser == "bs4":
elements = self._element.select(selector)
for el in elements:
if attr:
val = el.get(attr)
if val:
results.append(val)
else:
text = el.get_text(strip=strip)
if text:
results.append(text)
else:
try:
from lxml.cssselect import CSSSelector
sel = CSSSelector(selector)
elements = sel(self._element)
for el in elements:
if attr:
val = el.get(attr)
if val:
results.append(val)
else:
text = el.text_content()
if text:
results.append(text.strip() if strip else text)
except ImportError:
raise ImportError("cssselect is required for CSS selectors with lxml")
return results
def css_one(self, selector: str) -> 'Element':
"""返回第一个匹配的 Element 对象,支持链式操作"""
if self._element is None:
return Element(None, self._parser)
if self._parser == "bs4":
found = self._element.select_one(selector)
return Element(found, self._parser)
else:
try:
from lxml.cssselect import CSSSelector
sel = CSSSelector(selector)
results = sel(self._element)
found = results[0] if results else None
return Element(found, self._parser)
except ImportError:
raise ImportError("cssselect is required for CSS selectors with lxml")
def __bool__(self) -> bool:
"""检查元素是否存在"""
return self._element is not None
def __str__(self) -> str:
return self.text
def __repr__(self) -> str:
if self._element is None:
return "Element(None)"
return f"Element({self.html[:50]}...)" if len(self.html) > 50 else f"Element({self.html})"
class ExtractResult(dict):
"""
提取结果封装,支持直接保存
Example:
>>> data = response.pick(title="h1", price=".price")
>>> data.save("output.csv")
>>> data.save("output.json")
"""
def __init__(self, data: Dict[str, Any], url: str = None):
super().__init__(data)
self.url = url
def save(self, filepath: str, **kwargs):
"""
保存提取结果到文件
Args:
filepath: 输出文件路径(根据扩展名自动选择格式)
**kwargs: 传递给导出函数的参数
"""
from .export import export
export(dict(self), filepath, **kwargs)
def to_json(self, **kwargs) -> str:
"""转换为 JSON 字符串"""
return json.dumps(dict(self), ensure_ascii=False, indent=2, **kwargs)
class Extractor:
"""
数据提取器,支持 CSS 选择器、XPath、JSONPath
Example:
>>> extractor = Extractor(html_content)
>>> title = extractor.css("h1")
>>> links = extractor.css_all("a", attr="href")
"""
def __init__(self, content: Union[str, bytes], content_type: str = "html"):
"""
初始化提取器
Args:
content: HTML 或 JSON 内容
content_type: 内容类型 ("html", "json")
"""
self.content = content if isinstance(content, str) else content.decode('utf-8', errors='replace')
self.content_type = content_type
self._soup = None
self._lxml_doc = None
self._json_data = None
def _get_soup(self):
"""获取 BeautifulSoup 对象"""
if self._soup is None:
BeautifulSoup = _get_bs4()
self._soup = BeautifulSoup(self.content, 'html.parser')
return self._soup
def _get_lxml_doc(self):
"""获取 lxml 文档对象"""
if self._lxml_doc is None:
etree = _get_lxml()
self._lxml_doc = etree.HTML(self.content)
return self._lxml_doc
def _get_json(self):
"""获取 JSON 数据"""
if self._json_data is None:
self._json_data = json.loads(self.content)
return self._json_data
# ========== 简洁 API ==========
def find(self, selector: str, attr: str = None, strip: bool = True,
regex: str = None, parser: Callable = None) -> Optional[str]:
"""
查找第一个匹配的元素(最简单的 API
自动识别选择器类型:
- 以 $ 开头JSONPath
- 以 // 开头XPath
- 其他CSS 选择器
Args:
selector: 选择器CSS/XPath/JSONPath
attr: 要提取的属性名
strip: 是否去除空白
regex: 正则表达式提取
parser: 自定义解析函数
Returns:
匹配的文本或属性值
Example:
>>> response.find("h1") # CSS
>>> response.find("//h1/text()") # XPath
>>> response.find("$.title") # JSONPath
"""
# 自动识别选择器类型
if selector.startswith('$'):
result = self.jpath(selector)
elif selector.startswith('//') or selector.startswith('(//'):
result = self.xpath(selector)
else:
result = self.css(selector, attr=attr, strip=strip)
# 应用正则表达式
if regex and result:
match = re.search(regex, str(result))
result = match.group(0) if match else None
# 应用自定义解析函数
if parser and result:
result = parser(result)
return result
def find_all(self, selector: str, attr: str = None, strip: bool = True) -> List[str]:
"""
查找所有匹配的元素
Args:
selector: 选择器CSS/XPath/JSONPath
attr: 要提取的属性名
strip: 是否去除空白
Returns:
匹配的文本或属性值列表
"""
if selector.startswith('$'):
return self.jpath_all(selector)
elif selector.startswith('//') or selector.startswith('(//'):
return self.xpath_all(selector)
else:
return self.css_all(selector, attr=attr, strip=strip)
# ========== CSS 选择器 ==========
def css(self, selector: str, attr: str = None, html: bool = False, strip: bool = True) -> Optional[str]:
"""
使用 CSS 选择器提取第一个匹配元素
Args:
selector: CSS 选择器
attr: 要提取的属性名
html: 是否返回 HTML 而非文本
strip: 是否去除空白
Returns:
匹配元素的文本、属性或 HTML
"""
soup = self._get_soup()
element = soup.select_one(selector)
if element is None:
return None
if attr:
return element.get(attr)
if html:
return str(element)
text = element.get_text(strip=strip)
return text
def css_all(self, selector: str, attr: str = None, html: bool = False, strip: bool = True) -> List[str]:
"""
使用 CSS 选择器提取所有匹配元素
Args:
selector: CSS 选择器
attr: 要提取的属性名
html: 是否返回 HTML 而非文本
strip: 是否去除空白
Returns:
匹配元素的文本、属性或 HTML 列表
"""
soup = self._get_soup()
elements = soup.select(selector)
results = []
for el in elements:
if attr:
val = el.get(attr)
if val:
results.append(val)
elif html:
results.append(str(el))
else:
text = el.get_text(strip=strip)
if text:
results.append(text)
return results
def css_one(self, selector: str) -> Element:
"""
返回第一个匹配的 Element 对象,支持链式操作
Args:
selector: CSS 选择器
Returns:
Element 对象
"""
soup = self._get_soup()
element = soup.select_one(selector)
return Element(element, "bs4")
# ========== XPath ==========
def xpath(self, expression: str) -> Optional[str]:
"""
使用 XPath 表达式提取第一个匹配
Args:
expression: XPath 表达式
Returns:
匹配的文本或属性值
"""
doc = self._get_lxml_doc()
results = doc.xpath(expression)
if not results:
return None
result = results[0]
if hasattr(result, 'text_content'):
return result.text_content().strip()
return str(result).strip() if result else None
def xpath_all(self, expression: str) -> List[str]:
"""
使用 XPath 表达式提取所有匹配
Args:
expression: XPath 表达式
Returns:
匹配的文本或属性值列表
"""
doc = self._get_lxml_doc()
results = doc.xpath(expression)
extracted = []
for result in results:
if hasattr(result, 'text_content'):
text = result.text_content().strip()
if text:
extracted.append(text)
elif result:
extracted.append(str(result).strip())
return extracted
def xpath_one(self, expression: str) -> Element:
"""
返回第一个匹配的 Element 对象
Args:
expression: XPath 表达式
Returns:
Element 对象
"""
doc = self._get_lxml_doc()
results = doc.xpath(expression)
element = results[0] if results else None
return Element(element, "lxml")
# ========== JSONPath ==========
def jpath(self, expression: str) -> Any:
"""
使用 JSONPath 表达式提取第一个匹配
Args:
expression: JSONPath 表达式(如 $.data.items[0].name
Returns:
匹配的值
"""
parse = _get_jsonpath()
data = self._get_json()
# 处理简化的点号路径(如 data.items.*.name
if not expression.startswith('$'):
expression = '$.' + expression
jsonpath_expr = parse(expression)
matches = jsonpath_expr.find(data)
if not matches:
return None
return matches[0].value
def jpath_all(self, expression: str) -> List[Any]:
"""
使用 JSONPath 表达式提取所有匹配
Args:
expression: JSONPath 表达式
Returns:
匹配的值列表
"""
parse = _get_jsonpath()
data = self._get_json()
if not expression.startswith('$'):
expression = '$.' + expression
jsonpath_expr = parse(expression)
matches = jsonpath_expr.find(data)
return [match.value for match in matches]
# ========== 批量提取 ==========
def pick(self, **fields) -> ExtractResult:
"""
批量提取多个字段
Args:
**fields: 字段名=选择器 的映射
- 字符串CSS 选择器,提取文本
- 元组 (selector, attr):提取属性
- 元组 (selector, attr, converter):提取并转换
Returns:
ExtractResult 字典,支持直接保存
Example:
>>> data = response.pick(
... title="h1",
... links=("a", "href"),
... price=(".price", "text", float),
... )
>>> data.save("output.csv")
"""
result = {}
for field_name, selector_spec in fields.items():
try:
if isinstance(selector_spec, str):
# 简单字符串选择器
result[field_name] = self.find(selector_spec)
elif isinstance(selector_spec, tuple):
if len(selector_spec) == 2:
selector, attr = selector_spec
if attr == "text":
result[field_name] = self.find(selector)
else:
result[field_name] = self.find(selector, attr=attr)
elif len(selector_spec) == 3:
selector, attr, converter = selector_spec
if attr == "text":
value = self.find(selector)
else:
value = self.find(selector, attr=attr)
if value is not None and converter:
try:
value = converter(value)
except (ValueError, TypeError):
pass
result[field_name] = value
else:
result[field_name] = None
else:
result[field_name] = None
except Exception:
result[field_name] = None
return ExtractResult(result)
def extract(self, rules: Dict[str, str]) -> ExtractResult:
"""
使用规则字典提取数据(支持前缀指定类型)
Args:
rules: 字段名到选择器的映射
选择器可以带前缀指定类型:
- "css:h1.title" 或直接 "h1.title"
- "xpath://a/@href"
- "jsonpath:$.data.name"
- 添加 "::text" 后缀提取文本
- 添加 "::html" 后缀提取 HTML
- 添加 "@attr" 提取属性
Returns:
ExtractResult 字典
Example:
>>> result = response.extract({
... "title": "h1.title",
... "links": "xpath://a/@href",
... "api_data": "jsonpath:$.items[*].id"
... })
"""
result = {}
for field_name, selector in rules.items():
try:
# 解析选择器
extract_html = False
attr = None
# 检查后缀
if "::text" in selector:
selector = selector.replace("::text", "")
elif "::html" in selector:
selector = selector.replace("::html", "")
extract_html = True
# 检查属性提取 @attr
if "@" in selector and not selector.startswith("xpath:"):
parts = selector.rsplit("@", 1)
selector = parts[0]
attr = parts[1]
# 检查前缀
if selector.startswith("css:"):
selector = selector[4:]
if extract_html:
result[field_name] = self.css(selector, html=True)
elif attr:
result[field_name] = self.css(selector, attr=attr)
else:
result[field_name] = self.css(selector)
elif selector.startswith("xpath:"):
selector = selector[6:]
result[field_name] = self.xpath(selector)
elif selector.startswith("jsonpath:"):
selector = selector[9:]
result[field_name] = self.jpath(selector)
else:
# 默认使用 find自动识别
result[field_name] = self.find(selector, attr=attr)
except Exception:
result[field_name] = None
return ExtractResult(result)
def create_extractor(content: Union[str, bytes], content_type: str = "html") -> Extractor:
"""
创建数据提取器
Args:
content: HTML 或 JSON 内容
content_type: 内容类型
Returns:
Extractor 实例
"""
return Extractor(content, content_type)

View File

@@ -9,48 +9,59 @@ from .api import request
class Session:
"""
CFspider 会话类
CFspider 会话类 / CFspider Session class
在多个请求之间保持相同的代理配置、请求头和 Cookie。
Maintains the same proxy configuration, headers, and cookies across multiple requests.
适合需要登录状态或连续请求的场景。
Suitable for scenarios requiring login state or consecutive requests.
Attributes:
cf_proxies (str): Workers 代理地址
headers (dict): 会话级别的默认请求头
cookies (dict): 会话级别的 Cookie
cf_proxies (str): Workers 代理地址 / Workers proxy address
headers (dict): 会话级别的默认请求头 / Session-level default headers
cookies (dict): 会话级别的 Cookie / Session-level cookies
token (str, optional): Workers API 鉴权 token / Workers API authentication token
Example:
>>> import cfspider
>>>
>>> # 创建会话
>>> with cfspider.Session(cf_proxies="https://your-workers.dev") as session:
... # 设置会话级别的请求头
>>> # 创建会话 / Create session
>>> with cfspider.Session(cf_proxies="https://your-workers.dev", token="your-token") as session:
... # 设置会话级别的请求头 / Set session-level headers
... session.headers['Authorization'] = 'Bearer token'
...
... # 所有请求都会使用相同的代理和请求头
... # All requests use the same proxy and headers
... response1 = session.get("https://api.example.com/user")
... response2 = session.post("https://api.example.com/data", json={"key": "value"})
...
... # Cookie 会自动保持
... # Cookie 会自动保持 / Cookies are automatically maintained
... print(session.cookies)
Note:
如果需要隐身模式的会话一致性(自动 Referer、随机延迟等
If you need stealth mode session consistency (auto Referer, random delay, etc.),
请使用 cfspider.StealthSession。
please use cfspider.StealthSession.
"""
def __init__(self, cf_proxies=None, token=None):
"""
初始化会话
初始化会话 / Initialize session
Args:
cf_proxies (str): Workers 代理地址(必填)
/ Workers proxy address (required)
例如:"https://your-workers.dev"
e.g., "https://your-workers.dev"
token (str, optional): Workers API 鉴权 token
/ Workers API authentication token
当 Workers 端配置了 TOKEN 环境变量时,必须提供有效的 token
Required when Workers has TOKEN environment variable configured
Raises:
ValueError: 当 cf_proxies 为空时
/ When cf_proxies is empty
Example:
>>> session = cfspider.Session(cf_proxies="https://your-workers.dev", token="your-token")
@@ -70,19 +81,30 @@ class Session:
def request(self, method, url, **kwargs):
"""
发送 HTTP 请求
发送 HTTP 请求 / Send HTTP request
Args:
method (str): HTTP 方法GET, POST, PUT, DELETE 等)
url (str): 目标 URL
/ HTTP method (GET, POST, PUT, DELETE, etc.)
url (str): 目标 URL / Target URL
**kwargs: 其他参数,与 cfspider.request() 相同
/ Other parameters, same as cfspider.request()
- headers (dict): 自定义请求头 / Custom headers
- cookies (dict): Cookie
- data (dict/str): 表单数据 / Form data
- json (dict): JSON 数据 / JSON data
- timeout (int/float): 超时时间(秒) / Timeout (seconds)
- 其他参数与 requests 库兼容
- Other parameters compatible with requests library
Returns:
CFSpiderResponse: 响应对象
CFSpiderResponse: 响应对象 / Response object
Note:
会话级别的 headers 和 cookies 会自动添加到请求中,
Session-level headers and cookies are automatically added to requests,
但请求级别的参数优先级更高。
but request-level parameters have higher priority.
"""
headers = self.headers.copy()
headers.update(kwargs.pop("headers", {}))
@@ -101,31 +123,101 @@ class Session:
)
def get(self, url, **kwargs):
"""发送 GET 请求"""
"""
发送 GET 请求 / Send GET request
Args:
url (str): 目标 URL / Target URL
**kwargs: 其他参数,与 cfspider.get() 相同
/ Other parameters, same as cfspider.get()
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return self.request("GET", url, **kwargs)
def post(self, url, **kwargs):
"""发送 POST 请求"""
"""
发送 POST 请求 / Send POST request
Args:
url (str): 目标 URL / Target URL
**kwargs: 其他参数,与 cfspider.post() 相同
/ Other parameters, same as cfspider.post()
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return self.request("POST", url, **kwargs)
def put(self, url, **kwargs):
"""发送 PUT 请求"""
"""
发送 PUT 请求 / Send PUT request
Args:
url (str): 目标 URL / Target URL
**kwargs: 其他参数,与 cfspider.put() 相同
/ Other parameters, same as cfspider.put()
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return self.request("PUT", url, **kwargs)
def delete(self, url, **kwargs):
"""发送 DELETE 请求"""
"""
发送 DELETE 请求 / Send DELETE request
Args:
url (str): 目标 URL / Target URL
**kwargs: 其他参数,与 cfspider.delete() 相同
/ Other parameters, same as cfspider.delete()
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return self.request("DELETE", url, **kwargs)
def head(self, url, **kwargs):
"""发送 HEAD 请求"""
"""
发送 HEAD 请求 / Send HEAD request
Args:
url (str): 目标 URL / Target URL
**kwargs: 其他参数,与 cfspider.head() 相同
/ Other parameters, same as cfspider.head()
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return self.request("HEAD", url, **kwargs)
def options(self, url, **kwargs):
"""发送 OPTIONS 请求"""
"""
发送 OPTIONS 请求 / Send OPTIONS request
Args:
url (str): 目标 URL / Target URL
**kwargs: 其他参数,与 cfspider.options() 相同
/ Other parameters, same as cfspider.options()
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return self.request("OPTIONS", url, **kwargs)
def patch(self, url, **kwargs):
"""发送 PATCH 请求"""
"""
发送 PATCH 请求 / Send PATCH request
Args:
url (str): 目标 URL / Target URL
**kwargs: 其他参数,与 cfspider.patch() 相同
/ Other parameters, same as cfspider.patch()
Returns:
CFSpiderResponse: 响应对象 / Response object
"""
return self.request("PATCH", url, **kwargs)
def close(self):

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "cfspider"
version = "1.7.3"
version = "1.8.0"
description = "Cloudflare Workers proxy IP pool client"
readme = "README.md"
license = {text = "Apache-2.0"}
@@ -12,7 +12,7 @@ requires-python = ">=3.8"
authors = [
{name = "violettools", email = "violet@violetteam.cloud"}
]
keywords = ["cloudflare", "workers", "proxy", "spider"]
keywords = ["cloudflare", "workers", "proxy", "spider", "scraper", "crawler"]
classifiers = [
"Development Status :: 4 - Beta",
"License :: OSI Approved :: Apache Software License",
@@ -26,13 +26,37 @@ dependencies = [
]
[project.optional-dependencies]
# 浏览器自动化
browser = ["playwright>=1.40.0"]
# XPath 数据提取
xpath = ["lxml>=4.9.0"]
# JSONPath 数据提取
jsonpath = ["jsonpath-ng>=1.5.0"]
# Excel 导出
excel = ["openpyxl>=3.0.0"]
# 进度条显示
progress = ["tqdm>=4.60.0"]
# 数据处理全功能(推荐)
extract = [
"lxml>=4.9.0",
"jsonpath-ng>=1.5.0",
"openpyxl>=3.0.0",
"tqdm>=4.60.0",
]
# 全部可选功能
all = [
"playwright>=1.40.0",
"lxml>=4.9.0",
"jsonpath-ng>=1.5.0",
"openpyxl>=3.0.0",
"tqdm>=4.60.0",
]
[project.scripts]
cfspider = "cfspider.cli:main"
[project.urls]
Homepage = "https://spider.violetteam.cloud"
Homepage = "https://cfspider.com"
Repository = "https://github.com/violettoolssite/CFspider"
[tool.setuptools.packages.find]

46
test.py
View File

@@ -1,7 +1,43 @@
import cfspider
# 查看帮助信息
help(cfspider.get)
help(cfspider.Browser)
help(cfspider.StealthSession)
help(cfspider.mirror)
# ========== 方案一:使用有效的 Workers 地址 ==========
# 请将下面的地址替换为你的实际 Workers 地址
# Workers 地址格式https://your-worker-name.your-subdomain.workers.dev
WORKERS_URL = "https://proxy.kami666.xyz/" # 替换为你的 Workers 地址
TOKEN = "HAIfuge27" # 替换为你在 Workers 中配置的 token
try:
# 使用 Token 鉴权的请求
res = cfspider.get(
"https://httpbin.org/ip",
cf_proxies=WORKERS_URL,
token=TOKEN
)
print("✅ 请求成功!")
print(f"响应内容: {res.text}")
print(f"节点代码: {res.cf_colo}")
print(f"Ray ID: {res.cf_ray}")
print(f"状态码: {res.status_code}")
except Exception as e:
print(f"❌ 请求失败: {e}")
print("\n可能的原因:")
print("1. Workers 地址不正确或域名无法解析")
print("2. Token 配置错误")
print("3. 网络连接问题")
print("\n解决方案:")
print("1. 检查 Workers 地址是否正确")
print("2. 确认 Workers 已部署并运行")
print("3. 检查 Token 是否在 Workers 环境变量中配置")
print("4. 尝试不使用代理测试(见下方方案二)")
# ========== 方案二:不使用代理测试(用于验证库是否正常)==========
print("\n" + "="*50)
print("测试:不使用代理直接请求")
try:
res = cfspider.get("https://httpbin.org/ip")
print("✅ 直接请求成功!")
print(f"响应内容: {res.text}")
except Exception as e:
print(f"❌ 直接请求也失败: {e}")

View File

@@ -47,7 +47,10 @@ export default {
if (path === '' || path === '/') {
return new Response(generateCyberpunkPage(request, url, 访问IP), {
headers: { 'Content-Type': 'text/html; charset=utf-8' }
headers: {
'Content-Type': 'text/html; charset=utf-8',
...corsHeaders
}
});
}
@@ -1279,7 +1282,7 @@ browser.<span class="code-function">close</span>()</pre>
<p style="margin-top:15px;">
<a href="https://github.com/violettoolssite/CFspider" target="_blank">GITHUB</a>
<a href="https://pypi.org/project/cfspider/" target="_blank">PYPI</a>
<a href="https://spider.violetteam.cloud" target="_blank">DOCS</a>
<a href="https://cfspider.com" target="_blank">DOCS</a>
</p>
<p style="margin-top:15px;">${t.footer}<span class="cursor"></span></p>
</footer>

208
workers_redirect.js Normal file
View File

@@ -0,0 +1,208 @@
// CFspider 根域名重定向 Workers
// 将 cfspider.com 显示提示页面,让用户点击跳转到 www.cfspider.com
export default {
async fetch(request, env, ctx) {
const url = new URL(request.url);
const hostname = url.hostname;
// 如果是根域名(不含 www显示提示页面
if (hostname === 'cfspider.com') {
const newUrl = 'https://www.cfspider.com' + url.pathname + url.search + url.hash;
const html = generateRedirectPage(newUrl, hostname);
return new Response(html, {
headers: {
'Content-Type': 'text/html; charset=utf-8',
'Cache-Control': 'no-cache, no-store, must-revalidate'
}
});
}
// 如果是 www 子域名或其他域名,返回 404这个 Workers 只用于根域名重定向)
return new Response('Not Found', { status: 404 });
}
};
function generateRedirectPage(newUrl, hostname) {
return `<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>网站跳转 - CFspider</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Noto Sans SC', 'Microsoft YaHei', sans-serif;
background: linear-gradient(135deg, #0a0a0f 0%, #1a1a2e 100%);
color: #fff;
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
padding: 20px;
}
.container {
text-align: center;
max-width: 600px;
width: 100%;
}
.card {
background: linear-gradient(135deg, rgba(20, 20, 30, 0.95) 0%, rgba(30, 30, 45, 0.95) 100%);
border: 2px solid #00f5ff;
border-radius: 20px;
padding: 40px;
box-shadow: 0 0 40px rgba(0, 245, 255, 0.3), 0 0 80px rgba(0, 245, 255, 0.1);
animation: slideUp 0.5s ease-out;
}
@keyframes slideUp {
from {
transform: translateY(30px);
opacity: 0;
}
to {
transform: translateY(0);
opacity: 1;
}
}
h1 {
font-size: 32px;
color: #00f5ff;
margin-bottom: 20px;
text-shadow: 0 0 20px rgba(0, 245, 255, 0.5);
}
p {
font-size: 16px;
line-height: 1.8;
color: #e0e0e0;
margin-bottom: 15px;
}
.domain-info {
margin: 30px 0;
padding: 20px;
background: rgba(0, 245, 255, 0.1);
border-radius: 10px;
border: 1px solid rgba(0, 245, 255, 0.3);
}
.old-domain {
color: #ff2d95;
font-weight: bold;
font-size: 18px;
}
.arrow {
color: #00f5ff;
font-size: 24px;
margin: 10px 0;
}
.new-domain {
color: #50fa7b;
font-weight: bold;
font-size: 18px;
}
.redirect-button {
display: inline-block;
margin-top: 30px;
padding: 15px 40px;
background: linear-gradient(135deg, #00f5ff 0%, #bd93f9 100%);
color: #0a0a0f;
font-size: 18px;
font-weight: bold;
text-decoration: none;
border-radius: 10px;
border: none;
cursor: pointer;
transition: all 0.3s ease;
box-shadow: 0 0 20px rgba(0, 245, 255, 0.4);
}
.redirect-button:hover {
transform: translateY(-2px);
box-shadow: 0 0 30px rgba(0, 245, 255, 0.6);
}
.redirect-button:active {
transform: translateY(0);
}
.note {
margin-top: 20px;
font-size: 14px;
color: #999;
}
.auto-redirect {
margin-top: 20px;
font-size: 14px;
color: #666;
}
</style>
<script>
// 5秒后自动跳转
let countdown = 5;
const countdownElement = document.getElementById('countdown');
const redirectUrl = '${newUrl}';
function updateCountdown() {
if (countdown > 0) {
countdownElement.textContent = countdown;
countdown--;
setTimeout(updateCountdown, 1000);
} else {
window.location.replace(redirectUrl);
}
}
// 页面加载后开始倒计时
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', updateCountdown);
} else {
updateCountdown();
}
</script>
</head>
<body>
<div class="container">
<div class="card">
<h1>🌐 网站跳转</h1>
<p>CFspider 网站已迁移至新域名</p>
<div class="domain-info">
<div class="old-domain">${hostname}</div>
<div class="arrow">↓</div>
<div class="new-domain">www.cfspider.com</div>
</div>
<p>请点击下方按钮跳转到新网站</p>
<button class="redirect-button" onclick="window.location.replace('${newUrl}')">
立即跳转到新网站 →
</button>
<div class="auto-redirect">
<p>页面将在 <span id="countdown">5</span> 秒后自动跳转...</p>
</div>
<div class="note">
<p>如果页面没有自动跳转,请手动点击上方按钮</p>
</div>
</div>
</div>
</body>
</html>`;
}