chore: ignore test files

This commit is contained in:
test01
2026-01-04 01:43:36 +08:00
parent 672d350396
commit d32b46ca28
8 changed files with 4 additions and 1140 deletions

4
.gitignore vendored
View File

@@ -9,3 +9,7 @@ dist/
build/
*.egg-info/
__pycache__/
# 测试文件
test*.py
edgetunnel_proxy.py

View File

@@ -1,289 +0,0 @@
"""
edgetunnel to HTTP Proxy
将 edgetunnel Workers 转换为本地 HTTP 代理
使用方法:
python edgetunnel_proxy.py --host v2.kami666.xyz --uuid your-uuid --port 8080
然后可以使用:
curl -x http://127.0.0.1:8080 https://httpbin.org/ip
"""
import argparse
import socket
import struct
import threading
import ssl
import time
import uuid as uuid_module
from urllib.parse import urlparse
class VlessProxy:
"""VLESS to HTTP Proxy"""
def __init__(self, edgetunnel_host, vless_uuid, local_port=8080):
self.edgetunnel_host = edgetunnel_host
self.vless_uuid = vless_uuid
self.local_port = local_port
self.running = False
self.server_socket = None
def _create_vless_header(self, target_host, target_port):
"""创建 VLESS 请求头"""
header = bytes([0]) # 版本
header += uuid_module.UUID(self.vless_uuid).bytes # UUID
header += bytes([0]) # 附加信息长度
header += bytes([1]) # TCP 命令
header += struct.pack('>H', target_port) # 端口
# 地址
try:
socket.inet_aton(target_host)
header += bytes([1])
header += socket.inet_aton(target_host)
except socket.error:
header += bytes([2])
domain_bytes = target_host.encode('utf-8')
header += bytes([len(domain_bytes)])
header += domain_bytes
return header
def _websocket_handshake(self, sock):
"""WebSocket 握手"""
import base64
import hashlib
import os
key = base64.b64encode(os.urandom(16)).decode()
path = f'/{self.vless_uuid}'
request = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {self.edgetunnel_host}\r\n"
f"Upgrade: websocket\r\n"
f"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {key}\r\n"
f"Sec-WebSocket-Version: 13\r\n"
f"\r\n"
)
sock.sendall(request.encode())
response = b""
while b"\r\n\r\n" not in response:
chunk = sock.recv(1024)
if not chunk:
raise Exception("WebSocket handshake failed")
response += chunk
if b"101" not in response:
raise Exception(f"WebSocket upgrade failed: {response[:200]}")
return True
def _send_ws_frame(self, sock, data):
"""发送 WebSocket 帧"""
import os
mask_key = os.urandom(4)
length = len(data)
if length <= 125:
header = bytes([0x82, 0x80 | length])
elif length <= 65535:
header = bytes([0x82, 0x80 | 126]) + struct.pack('>H', length)
else:
header = bytes([0x82, 0x80 | 127]) + struct.pack('>Q', length)
masked_data = bytes([data[i] ^ mask_key[i % 4] for i in range(length)])
sock.sendall(header + mask_key + masked_data)
def _recv_ws_frame(self, sock):
"""接收 WebSocket 帧"""
header = sock.recv(2)
if len(header) < 2:
return None
payload_len = header[1] & 0x7F
if payload_len == 126:
ext = sock.recv(2)
payload_len = struct.unpack('>H', ext)[0]
elif payload_len == 127:
ext = sock.recv(8)
payload_len = struct.unpack('>Q', ext)[0]
data = b""
while len(data) < payload_len:
chunk = sock.recv(min(payload_len - len(data), 8192))
if not chunk:
break
data += chunk
return data
def _handle_client(self, client_socket, client_addr):
"""处理客户端连接"""
try:
request = b""
while b"\r\n\r\n" not in request:
chunk = client_socket.recv(4096)
if not chunk:
return
request += chunk
first_line = request.split(b"\r\n")[0].decode()
parts = first_line.split()
if len(parts) < 3:
return
method = parts[0]
if method == "CONNECT":
# HTTPS 代理
host_port = parts[1]
if ":" in host_port:
target_host, target_port = host_port.rsplit(":", 1)
target_port = int(target_port)
else:
target_host = host_port
target_port = 443
initial_data = b""
else:
# HTTP 代理
url = parts[1]
parsed = urlparse(url)
target_host = parsed.hostname
target_port = parsed.port or 80
# 重写请求
path = parsed.path or "/"
if parsed.query:
path += "?" + parsed.query
new_first_line = f"{method} {path} HTTP/1.1\r\n"
rest = request.split(b"\r\n", 1)[1]
initial_data = new_first_line.encode() + rest
# 连接 edgetunnel
ws_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ws_sock.settimeout(30)
context = ssl.create_default_context()
ws_sock = context.wrap_socket(ws_sock, server_hostname=self.edgetunnel_host)
ws_sock.connect((self.edgetunnel_host, 443))
self._websocket_handshake(ws_sock)
# 发送 VLESS 头 + 初始数据
vless_header = self._create_vless_header(target_host, target_port)
self._send_ws_frame(ws_sock, vless_header + initial_data)
if method == "CONNECT":
client_socket.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n")
# 双向转发
ws_sock.setblocking(False)
client_socket.setblocking(False)
first_response = True
while True:
readable = []
try:
client_socket.setblocking(False)
ws_sock.setblocking(False)
import select
readable, _, _ = select.select([client_socket, ws_sock], [], [], 30)
except:
break
if not readable:
break
for sock in readable:
try:
if sock is client_socket:
data = client_socket.recv(8192)
if not data:
raise Exception("Client closed")
self._send_ws_frame(ws_sock, data)
else:
ws_sock.setblocking(True)
ws_sock.settimeout(1)
data = self._recv_ws_frame(ws_sock)
if data is None:
raise Exception("WS closed")
# 跳过 VLESS 响应头
if first_response and len(data) >= 2:
addon_len = data[1]
data = data[2 + addon_len:]
first_response = False
if data:
client_socket.sendall(data)
except Exception as e:
raise
except Exception as e:
pass
finally:
try:
client_socket.close()
except:
pass
def start(self):
"""启动代理服务器"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('127.0.0.1', self.local_port))
self.server_socket.listen(100)
self.running = True
print(f"[*] edgetunnel HTTP Proxy started")
print(f"[*] edgetunnel: {self.edgetunnel_host}")
print(f"[*] UUID: {self.vless_uuid}")
print(f"[*] Local proxy: http://127.0.0.1:{self.local_port}")
print(f"[*] Usage: curl -x http://127.0.0.1:{self.local_port} https://httpbin.org/ip")
print()
while self.running:
try:
client_socket, client_addr = self.server_socket.accept()
thread = threading.Thread(target=self._handle_client, args=(client_socket, client_addr))
thread.daemon = True
thread.start()
except Exception as e:
if self.running:
print(f"[!] Accept error: {e}")
def stop(self):
"""停止代理服务器"""
self.running = False
if self.server_socket:
self.server_socket.close()
def main():
parser = argparse.ArgumentParser(description='edgetunnel to HTTP Proxy')
parser.add_argument('--host', '-H', required=True, help='edgetunnel Workers host (e.g., v2.kami666.xyz)')
parser.add_argument('--uuid', '-u', required=True, help='VLESS UUID')
parser.add_argument('--port', '-p', type=int, default=8080, help='Local proxy port (default: 8080)')
args = parser.parse_args()
proxy = VlessProxy(args.host, args.uuid, args.port)
try:
proxy.start()
except KeyboardInterrupt:
print("\n[*] Stopping...")
proxy.stop()
if __name__ == '__main__':
main()

View File

@@ -1,9 +0,0 @@
import cfspider
import requests
worker_url = "ip.kami666.xyz"
cf_response = cfspider.get("https://httpbin.org/ip", cf_proxies=worker_url)
req_response = requests.get("https://httpbin.org/ip")
print(cf_response.text)
print(req_response.text)

View File

@@ -1,16 +0,0 @@
import cfspider
print("1. 无代理:")
r = cfspider.get("https://httpbin.org/ip")
print(f" IP: {r.json()['origin']}")
print("2. Workers代理 (cf_workers=True):")
r = cfspider.get("https://httpbin.org/ip", cf_proxies="cfspider.violetqqcom.workers.dev")
print(f" IP: {r.json()['origin']}")
print("3. 普通代理 (cf_workers=False):")
r = cfspider.get("https://httpbin.org/ip", cf_proxies="127.0.0.1:9674", cf_workers=False)
print(f" IP: {r.json()['origin']}")
print("\nDone!")

View File

@@ -1,260 +0,0 @@
"""
测试 cfspider 文件下载功能(流式响应)
"""
import asyncio
import os
import sys
sys.path.insert(0, '.')
import cfspider
# Workers 地址
CF_WORKERS = "https://ip.kami666.xyz"
# 测试文件 URL
TEST_FILES = [
{
"name": "小文件 (JSON)",
"url": "https://httpbin.org/json",
"filename": "test_json.json"
},
{
"name": "中等文件 (robots.txt)",
"url": "https://www.google.com/robots.txt",
"filename": "test_robots.txt"
},
{
"name": "图片文件 (PNG)",
"url": "https://httpbin.org/image/png",
"filename": "test_image.png"
}
]
async def test_stream_download_no_proxy():
"""测试流式下载 - 无代理"""
print("\n" + "="*60)
print("测试 1: 流式下载 - 无代理模式")
print("="*60)
url = "https://httpbin.org/bytes/10240" # 10KB 随机字节
filename = "test_bytes_no_proxy.bin"
try:
total_bytes = 0
async with cfspider.astream("GET", url) as response:
print(f"状态码: {response.status_code}")
print(f"HTTP 版本: {response.http_version}")
with open(filename, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=1024):
f.write(chunk)
total_bytes += len(chunk)
file_size = os.path.getsize(filename)
print(f"下载完成: {filename}")
print(f"文件大小: {file_size} bytes")
os.remove(filename)
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_stream_download_workers():
"""测试流式下载 - Workers API 代理"""
print("\n" + "="*60)
print("测试 2: 流式下载 - Workers API 代理")
print("="*60)
url = "https://httpbin.org/bytes/10240" # 10KB 随机字节
filename = "test_bytes_workers.bin"
try:
total_bytes = 0
async with cfspider.astream("GET", url, cf_proxies=CF_WORKERS) as response:
print(f"状态码: {response.status_code}")
print(f"HTTP 版本: {response.http_version}")
print(f"CF Colo: {response.cf_colo}")
with open(filename, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=1024):
f.write(chunk)
total_bytes += len(chunk)
file_size = os.path.getsize(filename)
print(f"下载完成: {filename}")
print(f"文件大小: {file_size} bytes")
os.remove(filename)
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_download_image():
"""测试下载图片文件"""
print("\n" + "="*60)
print("测试 3: 下载图片文件 - Workers API 代理")
print("="*60)
url = "https://httpbin.org/image/png"
filename = "test_image.png"
try:
async with cfspider.astream("GET", url, cf_proxies=CF_WORKERS) as response:
print(f"状态码: {response.status_code}")
print(f"HTTP 版本: {response.http_version}")
print(f"Content-Type: {response.headers.get('content-type', 'N/A')}")
with open(filename, "wb") as f:
async for chunk in response.aiter_bytes():
f.write(chunk)
file_size = os.path.getsize(filename)
print(f"下载完成: {filename}")
print(f"文件大小: {file_size} bytes")
# 验证 PNG 文件头
with open(filename, "rb") as f:
header = f.read(8)
if header[:4] == b'\x89PNG':
print("文件类型验证: ✓ 有效的 PNG 文件")
else:
print("文件类型验证: ✗ 无效的 PNG 文件")
os.remove(filename)
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_download_text():
"""测试下载文本文件"""
print("\n" + "="*60)
print("测试 4: 下载文本文件 - Workers API 代理")
print("="*60)
url = "https://www.google.com/robots.txt"
filename = "test_robots.txt"
try:
lines = []
async with cfspider.astream("GET", url, cf_proxies=CF_WORKERS) as response:
print(f"状态码: {response.status_code}")
print(f"HTTP 版本: {response.http_version}")
async for line in response.aiter_lines():
lines.append(line)
if len(lines) <= 5:
print(f" {line}")
print(f"...")
print(f"总行数: {len(lines)}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_large_download():
"""测试大文件下载"""
print("\n" + "="*60)
print("测试 5: 大文件下载 (100KB) - 无代理")
print("="*60)
url = "https://httpbin.org/bytes/102400" # 100KB
filename = "test_large.bin"
try:
import time
start = time.time()
async with cfspider.astream("GET", url) as response:
print(f"状态码: {response.status_code}")
with open(filename, "wb") as f:
async for chunk in response.aiter_bytes(chunk_size=8192):
f.write(chunk)
elapsed = time.time() - start
file_size = os.path.getsize(filename)
speed = file_size / elapsed / 1024 # KB/s
print(f"下载完成: {filename}")
print(f"文件大小: {file_size / 1024:.1f} KB")
print(f"耗时: {elapsed:.2f}s")
print(f"速度: {speed:.1f} KB/s")
os.remove(filename)
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_session_stream():
"""测试 Session 流式下载"""
print("\n" + "="*60)
print("测试 6: Session 流式下载 - Workers API 代理")
print("="*60)
try:
async with cfspider.AsyncSession(cf_proxies=CF_WORKERS) as session:
async with session.stream("GET", "https://httpbin.org/bytes/5120") as response:
print(f"状态码: {response.status_code}")
print(f"HTTP 版本: {response.http_version}")
data = await response.aread()
print(f"数据大小: {len(data)} bytes")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def main():
print("="*60)
print("CFspider 文件下载测试")
print("="*60)
print(f"Workers 地址: {CF_WORKERS}")
results = []
results.append(await test_stream_download_no_proxy())
results.append(await test_stream_download_workers())
results.append(await test_download_image())
results.append(await test_download_text())
results.append(await test_large_download())
results.append(await test_session_stream())
# 结果汇总
print("\n" + "="*60)
print("测试结果汇总")
print("="*60)
tests = [
"流式下载 - 无代理",
"流式下载 - Workers API",
"下载图片文件",
"下载文本文件",
"大文件下载 (100KB)",
"Session 流式下载"
]
passed = 0
failed = 0
for i, (test, result) in enumerate(zip(tests, results)):
status = "✓ 通过" if result else "✗ 失败"
print(f"{i+1}. {test}: {status}")
if result:
passed += 1
else:
failed += 1
print(f"\n总计: {passed} 通过, {failed} 失败")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,239 +0,0 @@
"""
测试 cfspider httpx 异步功能
"""
import asyncio
import sys
sys.path.insert(0, '.')
import cfspider
# Workers 地址
CF_WORKERS = "https://ip.kami666.xyz"
async def test_async_no_proxy():
"""测试异步请求 - 无代理"""
print("\n" + "="*60)
print("测试 1: 异步请求 - 无代理模式")
print("="*60)
try:
response = await cfspider.aget("https://httpbin.org/ip")
print(f"状态码: {response.status_code}")
print(f"HTTP 版本: {response.http_version}")
print(f"响应: {response.text}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_async_workers_proxy():
"""测试异步请求 - Workers API 代理"""
print("\n" + "="*60)
print("测试 2: 异步请求 - Workers API 代理")
print("="*60)
try:
response = await cfspider.aget(
"https://httpbin.org/ip",
cf_proxies=CF_WORKERS
)
print(f"状态码: {response.status_code}")
print(f"HTTP 版本: {response.http_version}")
print(f"CF Colo: {response.cf_colo}")
print(f"CF Ray: {response.cf_ray}")
print(f"响应: {response.text}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_async_post():
"""测试异步 POST 请求"""
print("\n" + "="*60)
print("测试 3: 异步 POST 请求 - Workers API 代理")
print("="*60)
try:
response = await cfspider.apost(
"https://httpbin.org/post",
cf_proxies=CF_WORKERS,
json={"name": "cfspider", "version": "1.3.0", "feature": "httpx"}
)
print(f"状态码: {response.status_code}")
print(f"HTTP 版本: {response.http_version}")
data = response.json()
print(f"发送的 JSON: {data.get('json', {})}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_async_session():
"""测试异步 Session"""
print("\n" + "="*60)
print("测试 4: 异步 Session - Workers API 代理")
print("="*60)
try:
async with cfspider.AsyncSession(cf_proxies=CF_WORKERS) as session:
# 第一个请求
r1 = await session.get("https://httpbin.org/ip")
print(f"请求 1 - 状态码: {r1.status_code}, HTTP 版本: {r1.http_version}")
# 第二个请求
r2 = await session.post("https://httpbin.org/post", json={"test": 1})
print(f"请求 2 - 状态码: {r2.status_code}, HTTP 版本: {r2.http_version}")
# 第三个请求
r3 = await session.get("https://httpbin.org/headers")
print(f"请求 3 - 状态码: {r3.status_code}, HTTP 版本: {r3.http_version}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_async_session_no_proxy():
"""测试异步 Session - 无代理"""
print("\n" + "="*60)
print("测试 5: 异步 Session - 无代理模式")
print("="*60)
try:
async with cfspider.AsyncSession() as session:
r1 = await session.get("https://httpbin.org/ip")
print(f"请求 1 - 状态码: {r1.status_code}, HTTP 版本: {r1.http_version}")
print(f"响应: {r1.text}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def test_concurrent_requests():
"""测试并发请求"""
print("\n" + "="*60)
print("测试 6: 并发请求 - Workers API 代理")
print("="*60)
try:
urls = [
"https://httpbin.org/ip",
"https://httpbin.org/headers",
"https://httpbin.org/user-agent"
]
async def fetch(url):
return await cfspider.aget(url, cf_proxies=CF_WORKERS)
import time
start = time.time()
results = await asyncio.gather(*[fetch(url) for url in urls])
elapsed = time.time() - start
for i, r in enumerate(results):
print(f"请求 {i+1} - 状态码: {r.status_code}, HTTP 版本: {r.http_version}")
print(f"并发 3 个请求耗时: {elapsed:.2f}s")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_sync_http2():
"""测试同步请求 HTTP/2"""
print("\n" + "="*60)
print("测试 7: 同步请求 HTTP/2 - 无代理")
print("="*60)
try:
response = cfspider.get(
"https://httpbin.org/ip",
http2=True
)
print(f"状态码: {response.status_code}")
print(f"响应: {response.text}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_sync_http2_workers():
"""测试同步请求 HTTP/2 - Workers 代理"""
print("\n" + "="*60)
print("测试 8: 同步请求 HTTP/2 - Workers API 代理")
print("="*60)
try:
response = cfspider.get(
"https://httpbin.org/ip",
cf_proxies=CF_WORKERS,
http2=True
)
print(f"状态码: {response.status_code}")
print(f"CF Colo: {response.cf_colo}")
print(f"响应: {response.text}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
async def main():
print("="*60)
print("CFspider httpx 功能测试")
print("="*60)
print(f"Workers 地址: {CF_WORKERS}")
results = []
# 异步测试
results.append(await test_async_no_proxy())
results.append(await test_async_workers_proxy())
results.append(await test_async_post())
results.append(await test_async_session())
results.append(await test_async_session_no_proxy())
results.append(await test_concurrent_requests())
# 同步 HTTP/2 测试
results.append(test_sync_http2())
results.append(test_sync_http2_workers())
# 结果汇总
print("\n" + "="*60)
print("测试结果汇总")
print("="*60)
tests = [
"异步请求 - 无代理",
"异步请求 - Workers API",
"异步 POST - Workers API",
"异步 Session - Workers API",
"异步 Session - 无代理",
"并发请求 - Workers API",
"同步 HTTP/2 - 无代理",
"同步 HTTP/2 - Workers API"
]
passed = 0
failed = 0
for i, (test, result) in enumerate(zip(tests, results)):
status = "✓ 通过" if result else "✗ 失败"
print(f"{i+1}. {test}: {status}")
if result:
passed += 1
else:
failed += 1
print(f"\n总计: {passed} 通过, {failed} 失败")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,190 +0,0 @@
"""
测试 cfspider TLS 指纹模拟功能
"""
import sys
sys.path.insert(0, '.')
import cfspider
# Workers 地址
CF_WORKERS = "https://ip.kami666.xyz"
def test_impersonate_get():
"""测试 TLS 指纹模拟 GET 请求"""
print("\n" + "="*60)
print("测试 1: TLS 指纹模拟 GET 请求 - Chrome 131")
print("="*60)
try:
response = cfspider.impersonate_get(
"https://tls.browserleaks.com/json",
impersonate="chrome131"
)
print(f"状态码: {response.status_code}")
data = response.json()
print(f"JA3 Hash: {data.get('ja3_hash', 'N/A')}")
print(f"JA4: {data.get('ja4', 'N/A')}")
print(f"Akamai Hash: {data.get('akamai_hash', 'N/A')}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_impersonate_safari():
"""测试 Safari 指纹"""
print("\n" + "="*60)
print("测试 2: TLS 指纹模拟 GET 请求 - Safari 18")
print("="*60)
try:
response = cfspider.impersonate_get(
"https://tls.browserleaks.com/json",
impersonate="safari18_0"
)
print(f"状态码: {response.status_code}")
data = response.json()
print(f"JA3 Hash: {data.get('ja3_hash', 'N/A')}")
print(f"JA4: {data.get('ja4', 'N/A')}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_impersonate_firefox():
"""测试 Firefox 指纹"""
print("\n" + "="*60)
print("测试 3: TLS 指纹模拟 GET 请求 - Firefox 133")
print("="*60)
try:
response = cfspider.impersonate_get(
"https://tls.browserleaks.com/json",
impersonate="firefox133"
)
print(f"状态码: {response.status_code}")
data = response.json()
print(f"JA3 Hash: {data.get('ja3_hash', 'N/A')}")
print(f"JA4: {data.get('ja4', 'N/A')}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_impersonate_workers():
"""测试 TLS 指纹 + Workers 代理"""
print("\n" + "="*60)
print("测试 4: TLS 指纹 + Workers 代理")
print("="*60)
try:
response = cfspider.impersonate_get(
"https://httpbin.org/ip",
impersonate="chrome131",
cf_proxies=CF_WORKERS
)
print(f"状态码: {response.status_code}")
print(f"CF Colo: {response.cf_colo}")
print(f"响应: {response.text}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_impersonate_session():
"""测试 TLS 指纹会话"""
print("\n" + "="*60)
print("测试 5: TLS 指纹会话")
print("="*60)
try:
with cfspider.ImpersonateSession(impersonate="chrome131") as session:
r1 = session.get("https://httpbin.org/ip")
print(f"请求 1 - 状态码: {r1.status_code}")
r2 = session.post("https://httpbin.org/post", json={"test": 1})
print(f"请求 2 - 状态码: {r2.status_code}")
r3 = session.get("https://httpbin.org/headers")
print(f"请求 3 - 状态码: {r3.status_code}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_supported_browsers():
"""测试获取支持的浏览器列表"""
print("\n" + "="*60)
print("测试 6: 支持的浏览器列表")
print("="*60)
try:
browsers = cfspider.get_supported_browsers()
print(f"支持的浏览器数量: {len(browsers)}")
print(f"Chrome: {[b for b in browsers if 'chrome' in b]}")
print(f"Safari: {[b for b in browsers if 'safari' in b]}")
print(f"Firefox: {[b for b in browsers if 'firefox' in b]}")
print(f"Edge: {[b for b in browsers if 'edge' in b]}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def main():
print("="*60)
print("CFspider TLS 指纹模拟功能测试")
print("="*60)
print(f"Workers 地址: {CF_WORKERS}")
results = []
results.append(test_impersonate_get())
results.append(test_impersonate_safari())
results.append(test_impersonate_firefox())
results.append(test_impersonate_workers())
results.append(test_impersonate_session())
results.append(test_supported_browsers())
# 结果汇总
print("\n" + "="*60)
print("测试结果汇总")
print("="*60)
tests = [
"Chrome 131 指纹",
"Safari 18 指纹",
"Firefox 133 指纹",
"指纹 + Workers 代理",
"指纹会话",
"支持的浏览器列表"
]
passed = 0
failed = 0
for i, (test, result) in enumerate(zip(tests, results)):
status = "✓ 通过" if result else "✗ 失败"
print(f"{i+1}. {test}: {status}")
if result:
passed += 1
else:
failed += 1
print(f"\n总计: {passed} 通过, {failed} 失败")
if __name__ == "__main__":
main()

View File

@@ -1,137 +0,0 @@
"""
测试 cfspider.get() 直接使用 impersonate 参数
"""
import sys
sys.path.insert(0, '.')
import cfspider
CF_WORKERS = "https://ip.kami666.xyz"
def test_get_impersonate():
"""测试 get() 直接使用 impersonate"""
print("\n" + "="*60)
print("测试 1: cfspider.get() + impersonate='chrome131'")
print("="*60)
try:
response = cfspider.get(
"https://tls.browserleaks.com/json",
impersonate="chrome131"
)
print(f"状态码: {response.status_code}")
data = response.json()
print(f"JA3 Hash: {data.get('ja3_hash', 'N/A')}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_get_impersonate_workers():
"""测试 get() + impersonate + Workers 代理"""
print("\n" + "="*60)
print("测试 2: cfspider.get() + impersonate + cf_proxies")
print("="*60)
try:
response = cfspider.get(
"https://httpbin.org/ip",
impersonate="chrome131",
cf_proxies=CF_WORKERS
)
print(f"状态码: {response.status_code}")
print(f"CF Colo: {response.cf_colo}")
print(f"响应: {response.text}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_post_impersonate():
"""测试 post() + impersonate"""
print("\n" + "="*60)
print("测试 3: cfspider.post() + impersonate='safari18_0'")
print("="*60)
try:
response = cfspider.post(
"https://httpbin.org/post",
impersonate="safari18_0",
json={"test": "data"}
)
print(f"状态码: {response.status_code}")
data = response.json()
print(f"POST 数据: {data.get('json')}")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def test_different_browsers():
"""测试不同浏览器指纹"""
print("\n" + "="*60)
print("测试 4: 不同浏览器指纹对比")
print("="*60)
browsers = ["chrome131", "safari18_0", "firefox133"]
try:
for browser in browsers:
response = cfspider.get(
"https://tls.browserleaks.com/json",
impersonate=browser
)
data = response.json()
print(f"{browser}: JA3={data.get('ja3_hash', 'N/A')[:16]}...")
print("✓ 测试通过")
return True
except Exception as e:
print(f"✗ 测试失败: {e}")
return False
def main():
print("="*60)
print("cfspider.get() impersonate 参数测试")
print("="*60)
results = []
results.append(test_get_impersonate())
results.append(test_get_impersonate_workers())
results.append(test_post_impersonate())
results.append(test_different_browsers())
# 结果汇总
print("\n" + "="*60)
print("测试结果汇总")
print("="*60)
tests = [
"get() + impersonate",
"get() + impersonate + Workers",
"post() + impersonate",
"不同浏览器指纹"
]
passed = sum(results)
failed = len(results) - passed
for i, (test, result) in enumerate(zip(tests, results)):
status = "✓ 通过" if result else "✗ 失败"
print(f"{i+1}. {test}: {status}")
print(f"\n总计: {passed} 通过, {failed} 失败")
if __name__ == "__main__":
main()